File: /var/www/vhosts/dial-copper-dev.2amigos.us/dialpad-copper-crm/api/db/gds.py
"""GDS data store models
"""
import json
from uuid import uuid4
from copy import deepcopy
from datetime import datetime, timedelta
from typing import Optional, TypeVar, List, Dict, Set
from google.cloud.firestore import Client
def _add_entity(local_entity, gds_client: Client):
"""Adds/Updates entity on GDS
Args:
local_entity: Local instance of entity
gds_client (Client): Google data store client
"""
key = local_entity.ID
doc_ref = gds_client.collection(local_entity.__kind__).document(key)
doc_ref.set(deepcopy(local_entity.__dict__))
return local_entity
def _delete_entity(local_entity, gds_client: Client):
"""Adds entity to GDS
Args:
local_entity_id: ID attr of local instance of entity
gds_client (Client): Google data store client
"""
query = gds_client.collection(local_entity.__kind__).where(
"ID", "==", local_entity.ID
)
match = next(iter(query.get()), None)
if not match: return None
gds_client.collection(local_entity.__kind__).document(local_entity.ID).delete()
# DialpadTokenStore
DTS = TypeVar('DialpadTokenStore')
class DialpadTokenStore:
"""Dialpad session store for deleted storage and states"""
__kind__ = 'DialpadTokenStore'
def __init__(self, **kwargs):
self.ID: str = kwargs.get('ID', f'{self.__kind__}-{str(uuid4())}')
self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
self.dialpad_access_token: str = kwargs.get('dialpad_access_token')
def save(self, gds_client: Client):
"""Saves this instance on GDS"""
query = gds_client.collection(self.__kind__)
query = query.where('dialpad_user_id','==', self.dialpad_user_id)
query = query.where('dialpad_access_token','==', self.dialpad_access_token)
match = next(iter(query.get()), None)
return DialpadTokenStore(**match.to_dict()) if match else _add_entity(self, gds_client)
@staticmethod
def delete(token: str, gds_client: Client):
query = gds_client.collection(DialpadTokenStore.__kind__)
query = query.where('dialpad_access_token','==', token)
match = next(iter(query.get()), None)
if not match: return None
# Since the logout in dialpad invalidates all the active tokens
# we should delete by dialpad user id (not that we're supporting full multi-session anyways...)
query = gds_client.collection(DialpadTokenStore.__kind__)
query = query.where('dialpad_user_id','==', match.get("dialpad_user_id"))
for item in query.get():
instance = DialpadTokenStore(**item.to_dict())
gds_client.collection(instance.__kind__).document(
instance.ID
).delete()
@staticmethod
def find_for_user(dialpad_id: str, gds_client: Client) -> Optional[DTS]:
"""Mathing session
"""
query = gds_client.collection(DialpadTokenStore.__kind__)
query = query.where('dialpad_user_id','==', dialpad_id)
match = next(iter(query.get()), None)
return DialpadTokenStore(**match.to_dict()) if match and match.to_dict() else None
@staticmethod
def find_for_token(dialpad_token: str, gds_client: Client) -> Optional[DTS]:
"""Mathing session
"""
query = gds_client.collection(DialpadTokenStore.__kind__)
query = query.where('dialpad_access_token','==', dialpad_token)
match = next(iter(query.get()), None)
return DialpadTokenStore(**match.to_dict()) if match and match.to_dict() else None
# Matches
M = TypeVar('Match')
class Match:
"""Table to store matches between copper and dialpad
"""
__kind__ = 'Match'
def __init__(self, **kwargs):
self.ID: str = kwargs.get('ID', f'{self.__kind__}-{str(uuid4())}')
self.copper_id: int = kwargs.get('copper_id')
self.copper_type: str = kwargs.get('copper_type')
self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
self.dialpad_contact_id: str = kwargs.get('dialpad_contact_id')
def save(self, gds_client: Client):
"""Saves this instance on db"""
query = gds_client.collection(self.__kind__)
query = query.where('copper_id', '==', self.copper_id)
query = query.where('copper_type', '==', self.copper_type)
query = query.where('dialpad_user_id', '==', self.dialpad_user_id)
query = query.where('dialpad_contact_id', '==', self.dialpad_contact_id)
match = next(iter(query.get()), None)
return Match(**match.to_dict()) if match else _add_entity(self, gds_client)
@staticmethod
def delete(ID: str, gds_client: Client):
"""Deletes a match from data store"""
tmp = Match()
tmp.ID = ID
_delete_entity(tmp, gds_client)
@staticmethod
def find_match(dialpad_id: str, contact_id: str, gds_client: Client) -> Optional[M]:
"""Mathing contact"""
query = gds_client.collection(Match.__kind__)
query = query.where('dialpad_user_id', '==', dialpad_id)
query = query.where('dialpad_contact_id', '==', contact_id)
match = next(iter(query.get()), None)
return Match(**match.to_dict()) if match and match.to_dict() else None
# Dialpad event subscriptions
ES = TypeVar('Subscription')
class Subscription:
"""Storage for dialpad subscriptions
"""
__kind__ = 'Subscription'
def __init__(self, **kwargs):
self.ID: str = kwargs.get('ID', f'{self.__kind__}-{str(uuid4())}')
self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
self.dialpad_access_token: str = kwargs.get('dialpad_access_token')
self.subscription_id: str = kwargs.get('subscription_id')
def save(self, gds_client: Client) -> ES:
"""Saves this instance in DB
Returns:
Subscription: Saved instance for chaining
"""
match = Subscription.find_for_session(self.dialpad_user_id, gds_client)
if match: return match
return _add_entity(self, gds_client)
def delete(self, gds_client: Client):
_delete_entity(self, gds_client)
@staticmethod
def find_for_session(dialpad_user_id: str, gds_client: Client) -> Optional[ES]:
"""Finds a subscription for a given user session
Args:
dialpad_user_id (str): user id
Returns:
Subscription: Matching subscription
"""
query = gds_client.collection(Subscription.__kind__)
query = query.where('dialpad_user_id', '==', dialpad_user_id)
match = next(iter(query.get()), None)
return Subscription(**match.to_dict()) if match and match.to_dict() else None
# Dialpad company webhooks
DW = TypeVar('DialpadWebhook')
class DialpadWebhook:
"""Persistent store for dialpad webhooks"""
__kind__ = 'DialpadWebhook'
def __init__(self, **kwargs):
self.ID: str = kwargs.get('ID', f'{self.__kind__}-{str(uuid4())}')
self.hook_id: str = kwargs.get('hook_id')
self.webhook_secret: str = kwargs.get('webhook_secret')
self.dialpad_company_id: str = kwargs.get('dialpad_company_id')
def save(self, gds_client: Client) -> DW:
"""Saves this instance in DB
Returns:
DialpadWebhook: Saved instance for chaining
"""
match = DialpadWebhook.find_for_company(self.dialpad_company_id, gds_client)
if match: return match
return _add_entity(self, gds_client)
@staticmethod
def find_for_company(dialpad_company_id: str, gds_client: Client) -> Optional[DW]:
"""Finds a persisted webhook for a given company to avoid making unnecessary requests to dialpad API
Args:
dialpad_company_id (str): dialpad company id
gds_client (Client): Client for google firestore
Returns:
Optional[DialpadWebhook]: Matching webhook or None
"""
query = gds_client.collection(DialpadWebhook.__kind__)
query = query.where('dialpad_company_id', '==', dialpad_company_id)
match = next(iter(query.get()), None)
return DialpadWebhook(**match.to_dict()) if match and match.to_dict() else None
# Copper API Keys
CK = TypeVar('CopperKeyStore')
class CopperKeyStore:
"""Storage of Copper CRM Keys for dialpad users
"""
__kind__ = 'CopperKeyStore'
def __init__(self, **kwargs):
self.ID: str = kwargs.get('ID', f'{self.__kind__}-{str(uuid4())}')
self.dialpad_id: str = kwargs.get('dialpad_id')
self.copper_api_key: str = kwargs.get('copper_api_key')
self.copper_api_email: str = kwargs.get('copper_api_email')
@staticmethod
def find_for_dialpad(dialpad_id: str, gds_client: Client) -> Optional[CK]:
"""Finds a Copper CRM key info in the store for a given dialpad user id
Args:
dialpad_id ([type]): dialpad user id
gds_client (Client): api db session
Returns:
Optional[CK]: Copper key instance if present
"""
query = gds_client.collection(CopperKeyStore.__kind__)
query = query.where('dialpad_id', '==', dialpad_id)
match = next(iter(query.get()), None)
return CopperKeyStore(**match.to_dict()) if match and match.to_dict() else None
def save(self, gds_client: Client):
return _add_entity(self, gds_client)
def delete_record(self, gds_client: Client):
"""Deletes this instance
Args:
gds_client (Client): Local session
"""
_delete_entity(self, gds_client)
# Copper Activity types
AT = TypeVar('CopperActivityTypeStore')
class CopperActivityTypeStore:
"""Storage for Copper CRM activity types
"""
__kind__ = 'CopperActivityTypeStore'
USER_CATEGORY = 'user'
PHONE_CALL_TYPE = 'Phone Call'
def __init__(self, **kwargs):
self.ID: str = kwargs.get('ID', f'{self.__kind__}-{str(uuid4())}')
self.copper_id = kwargs.get('copper_id')
self.copper_name = kwargs.get('copper_name')
self.copper_category = kwargs.get('copper_category')
self.dialpad_company_id = kwargs.get('dialpad_company_id')
self.user_list = [] if not kwargs.get('user_list') else self.user_list
def save(self, gds_client: Client) -> AT:
"""Saves this instance to storage if absent
Args:
gds_client (Client): LocalDBSession
Returns:
AT : saved instance
"""
query = gds_client.collection(CopperActivityTypeStore.__kind__)
query = query.where('copper_name', '==',self.copper_name)
query = query.where('copper_category', '==',self.copper_category)
query = query.where('dialpad_company_id', '==',self.dialpad_company_id)
match = next(iter(query.get()), None)
return self if match else _add_entity(self, gds_client)
def add_user(self, gds_client: Client, dialpad_user_id: str) -> AT:
"""Adds user to store"""
query = gds_client.collection(CopperActivityTypeStoreUsers.__kind__)
query = query.where('type_store_id', '==',self.ID)
query = query.where('dialpad_user_id', '==',dialpad_user_id)
link = next(iter(query.get()), None)
if link: return self
link = _add_entity(CopperActivityTypeStoreUsers(
type_store_id = self.ID, dialpad_user_id = dialpad_user_id
), gds_client)
return self
@staticmethod
def find_for_phone_call(dialpad_company_id: str, gds_client: Client) -> Optional[AT]:
"""Finds a Copper CRM activity 'Phone Call' type by the dialpad company id
Args:
dialpad_id ([type]): dialpad company id
gds_client (Client): api db session
Returns:
Optional[AT]: Copper key instance if present
"""
query = gds_client.collection(CopperActivityTypeStore.__kind__)
query = query.where('dialpad_company_id', '==', dialpad_company_id)
query = query.where('copper_name', '==',CopperActivityTypeStore.PHONE_CALL_TYPE)
query = query.where('copper_category', '==',CopperActivityTypeStore.USER_CATEGORY)
match = next(iter(query.get()), None)
return CopperActivityTypeStore(**match.to_dict()) if match and match.to_dict() else None
@staticmethod
def find_for_user(dialpad_user_id: str, gds_client: Client) -> Optional[AT]:
"""Finds a Copper CRM activity 'Phone Call' type by the dialpad user id
Args:
dialpad_id (str): dialpad user id
gds_client (Client): api db session
Returns:
Optional[AT]: Copper key instance if present
"""
query = gds_client.collection(CopperActivityTypeStoreUsers.__kind__)
query = query.where('dialpad_user_id', '==', dialpad_user_id)
# First entry is enough
link = next(iter(query.get()), None)
if not link: return None
parsed_link = CopperActivityTypeStoreUsers(**link.to_dict())
match = gds_client.collection(CopperActivityTypeStore.__kind__).document(
parsed_link.type_store_id
).get()
return CopperActivityTypeStore(**match.to_dict()) if match and match.to_dict() else None
# Dialpad users related to Copper Activity type store companies
ATU = TypeVar('CopperActivityTypeStoreUsers')
class CopperActivityTypeStoreUsers:
"""Storage for Copper CRM activity types"""
__kind__ = 'CopperActivityTypeStoreUsers'
def __init__(self, **kwargs):
self.ID: str = kwargs.get('ID', f'{self.__kind__}-{str(uuid4())}')
self.type_store_id: str = kwargs.get('type_store_id')
self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
def save(self, gds_client: Client) -> ATU:
"""Saves this instance to storage if absent
Args:
gds_client (Client): LocalDBSession
Returns:
AT : saved instance
"""
query = gds_client.collection(CopperActivityTypeStoreUsers.__kind__)
query = query.where('dialpad_user_id', '==', self.dialpad_user_id)
match = next(iter(query.get()), None)
return CopperActivityTypeStoreUsers(**match.to_dict()) if match and match.to_dict() else None
# Call logging temp persistence
C = TypeVar('Call')
class Call:
"""Temporary storage for call events
"""
__kind__ = 'Call'
def __init__(self, **kwargs):
self.ID: str = kwargs.get('ID', f'{self.__kind__}-{str(uuid4())}')
self.call_id: str = kwargs.get('call_id')
self.subject: str = kwargs.get('subject')
self.created_at: str = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
self.description: str = kwargs.get('description')
self.transcript_url: int = kwargs.get('transcript_url')
self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
self.copper_activity_id: str = kwargs.get('copper_activity_id')
self.copper_activity_date: int = kwargs.get('copper_activity_date')
self.event_data: str = self._parse_event_data(kwargs.get('event_data'))
def _parse_event_data(self, event_data) -> str:
"""ctor helper method to make sure the event data remains a string
to keep current behavior.
Args:
event_data: event data value from at initialization
Returns:
str: A parsed json string
"""
if not event_data:
# json dumps for empty lists (first time)
return json.dumps([])
if isinstance(event_data, str):
# Keep str as is from previous iterations
return event_data
# At this point the event_data is a dict from a response
if isinstance(event_data, dict):
return json.dumps([event_data])
# A list of dicts
return json.dumps(event_data)
@property
def submitted(self) -> bool:
"""If this event was submitted successfuly to Copper CRM
Returns:
bool: True if copper_activity_id is not falsy
"""
return bool(self.copper_activity_id)
@property
def parsed_events(self) -> List[Dict]:
"""Parsed json for instance registered call events
Returns:
List[Dict]: List of registered call events
"""
return json.loads(self.event_data)
@property
def call_states(self) -> Set[str]:
"""Registered states for this call
Returns:
Set[str]: registered call states
"""
return { e['state'] for e in self.parsed_events }
def save(self, gds_client: Client):
match = Call.find_for_event(self.call_id, gds_client)
return match if match else _add_entity(self, gds_client)
def add_notes(self, subject: str, details: str, gds_client: Client, transcript: str = None):
"""Add user notes to persistent record
Args:
subject (str): User subject notes
details (str): User details notes
gds_client (Client): Local session
transcript (str, optional): Transcription url [Defaults to None].
"""
if subject:
self.subject = subject
if details:
self.description = f'{self.description}. {details}' if self.description else details
if transcript:
self.transcript_url = transcript
# Any modification commit
if (subject or details or transcript):
_add_entity(self, gds_client)
return self
def add_copper_activity_data(self, copper_activity_id: int, copper_activity_date: int, gds_client: Client):
"""Adds a copper activity id to the record
Args:
copper_activity_id (int): copper activity id
copper_activity_date (int): copper activity date as unix timestamp
gds_client (Client): local session
"""
self.copper_activity_id = copper_activity_id
self.copper_activity_date = copper_activity_date
return _add_entity(self, gds_client)
def add_event_data(self, event_data: dict, gds_client: Client):
"""Adds a new posted json event to the record json event log
Args:
event_data (dict): parsed posted json
gds_client (Client): LocalSession
"""
events = self.parsed_events
events.append(event_data)
self.event_data = json.dumps(events)
_add_entity(self, gds_client)
def update_copper_id(self, copper_activity_id: str, gds_client: Client):
if copper_activity_id == self.copper_activity_id: return False
self.copper_activity_id = copper_activity_id
_add_entity(self, gds_client)
return True
def delete_record(self, gds_client: Client):
"""Deletes this instance
Args:
gds_client (Client): Local session
"""
_delete_entity(self, gds_client)
@staticmethod
def find_for_event(call_id: str, gds_client: Client) -> Optional[C]:
"""Finds all call events received for a call id
Args:
call_id ([type]): dialpad call id
gds_client (Client): api db session
Returns:
List[Call]: Stored call events desc sorted by creation_date
"""
query = gds_client.collection(Call.__kind__)
query = query.where('call_id', '==', call_id)
match = next(iter(query.get()), None)
return Call(**match.to_dict()) if match and match.to_dict() else None
@staticmethod
def find_for_user(user_id: str, gds_client: Client, copper_ids: Set[int], limit: int = 5) -> List[C]:
"""Finds the latests 'limit' call events related to a user the
call events must have a subject and a registered activity in Copper
to appear in the search.
Args:
user_id (str): dialpad user id
gds_client (Client): api db session
Returns:
List[Call]: Stored call events desc sorted by creation_date
"""
matches: List[Call] = []
query = gds_client.collection(Call.__kind__)
uid = int(user_id) if user_id.isnumeric() else user_id
query = query.where('dialpad_user_id', '==', uid)
for m in query.get():
if not (m.get('subject') and m.get('copper_activity_date')):
# Ignore calls that aren't completed or that miss a subject
continue
if not m.get('copper_activity_id') in copper_ids:
# Ignore records that aren't in copper ids
continue
matches.append(Call(**m.to_dict()))
matches.sort(key=lambda m: m.copper_activity_date, reverse=True)
if len(matches) <= limit: return matches
# Delete old non active call records
for match in matches[limit:]:
if 'hangup' in match.call_states:
# Only delete old hanged up calls
_delete_entity(match, gds_client)
return matches[0:limit]
# Call logging temp persistence
P = TypeVar('PipelineStage')
class AccountPipelineStage:
"""Temporary storage for account pipeline stages
"""
__kind__ = 'AccountPipelineStage'
def __init__(self, **kwargs):
self.ID: str = kwargs.get('ID', f'{self.__kind__}-{str(uuid4())}')
self.copper_account_id: str = kwargs.get('copper_account_id')
self.copper_stages: str = self._parse_event_data(kwargs.get('copper_stages'))
def _parse_event_data(self, event_data) -> str:
"""ctor helper method to make sure the event data remains a string
to keep current behavior.
Args:
event_data: event data value from at initialization
Returns:
str: A parsed json string
"""
if not event_data:
# json dumps for empty lists (first time)
return json.dumps([])
if isinstance(event_data, str):
# Keep str as is from previous iterations
return event_data
# At this point the event_data is a dict from a response
return json.dumps(event_data)
@property
def parsed_stages(self) -> List[Dict]:
"""Parsed json for instance registered pipeline stages
Returns:
List[Dict]: List of registered call events
"""
return json.loads(self.copper_stages)
def save(self, gds_client: Client):
"""Saves instance on GDS store"""
match = AccountPipelineStage.find_for_account(
self.copper_account_id, gds_client
)
return match if match else _add_entity(self, gds_client)
def update_stages(self, stages: List[dict], gds_client: Client):
"""Updates this instace stages
Args:
stages (List[dict]): new stages json
gds_client (Client): LocalSession
"""
self.copper_stages = json.dumps(stages)
_add_entity(self, gds_client)
@staticmethod
def find_for_account(copper_account_id: str, gds_client: Client) -> Optional[P]:
"""Finds pipeline stage instance for copper account
Args:
copper_account_id (str): Copper CRM account id
gds_client (Client): api db session
Returns:
Optional[AccountPipelineStage]: Stored account stages
"""
query = gds_client.collection(AccountPipelineStage.__kind__)
query = query.where('copper_account_id', '==', copper_account_id)
match = next(iter(query.get()), None)
return AccountPipelineStage(**match.to_dict()) if match and match.to_dict() else None
# Call logging temp persistence
CD = TypeVar('CopperDeal')
class CopperDeal:
"""Temporary storage for account pipeline stages
"""
__kind__ = 'CopperDeal'
_DT_FORMAT = '%Y-%m-%dT%H:%M:%S'
def __init__(self, **kwargs):
self.ID: str = kwargs.get('ID', f'{self.__kind__}-{str(uuid4())}')
self.expires_at: str = self._build_expiration_time()
self.copper_deal_id: str = kwargs.get('copper_deal_id')
self.copper_entity_id: str = kwargs.get('copper_entity_id')
self.copper_entity_type: str = kwargs.get('copper_entity_type')
self.copper_response: str = self._parse_event_data(kwargs.get('copper_response'))
def _build_expiration_time(self) -> str:
"""Serializable expiration time as ISO datetime format"""
# Expire every 30 minutes
expires_at = (datetime.now() + timedelta(minutes = 30))
return expires_at.strftime(self._DT_FORMAT)
def _parse_event_data(self, event_data) -> str:
"""ctor helper method to make sure the event data remains a string
to keep current behavior.
Args:
event_data: event data value from at initialization
Returns:
str: A parsed json string
"""
if not event_data:
# json dumps for empty lists (first time)
return json.dumps(dict())
if isinstance(event_data, str):
# Keep str as is from previous iterations
return event_data
# At this point the event_data is a dict from a response
return json.dumps(event_data)
@property
def parsed_deal(self) -> Dict:
"""Parsed json for instance registered pipeline stages
Returns:
List[Dict]: List of registered call events
"""
return json.loads(self.copper_response)
@property
def expired(self) -> bool:
"""Flag to check if the record is expired
Returns:
bool: True if expired False otherwise
"""
expiration_time = datetime.strptime(self.expires_at, self._DT_FORMAT)
return expiration_time < datetime.now()
def save(self, gds_client: Client):
"""Saves entity on GDS store"""
match = CopperDeal.find_for_deal(self.copper_deal_id, gds_client)
return match if match else _add_entity(self, gds_client)
def update_copper_response(self, copper_response: dict, gds_client: Client):
"""Updates this instace stages
Args:
stages (List[dict]): new stages json
gds_client (Client): LocalSession
"""
self.copper_response = json.dumps(copper_response)
self.expires_at = self._build_expiration_time()
_add_entity(self, gds_client)
def delete_record(self, gds_client: Client):
"""Deletes this instance
Args:
gds_client (Client): Local session
"""
_delete_entity(self, gds_client)
@staticmethod
def find_for_deal(deal_id: str, gds_client: Client) -> Optional[CD]:
"""Finds a deal by copper deal id
Args:
deal_id (str): Copper CRM deal id
gds_client (Client): api db session
Returns:
Optional[AccountPipelineStage]: Stored account stages
"""
query = gds_client.collection(CopperDeal.__kind__)
query = query.where('copper_deal_id', '==', deal_id)
match = next(iter(query.get()), None)
return CopperDeal(**match.to_dict()) if match and match.to_dict() else None
@staticmethod
def find_for_entity(entity_id: str, entity_type: str, gds_client: Client) -> List[CD]:
"""All deals related to a user
Args:
entity_id (str): copper entity id
entity_type (str): copper entity type
gds_client (Client): local db session
Returns:
List[CD]: entity records
"""
query = gds_client.collection(CopperDeal.__kind__)
query = query.where('copper_entity_id', '==', entity_id)
query = query.where('copper_entity_type', '==', entity_type)
return [CopperDeal(**match.to_dict()) for match in query.get()]