File: /var/www/vhosts/dial-copper-dev.2amigos.us/dialpad-copper-crm/api/db/rdbms.py
"""SQLAlchemy models for db
"""
import json
from uuid import uuid4
from datetime import datetime, timedelta
from typing import Optional, TypeVar, List, Dict, Set
from sqlalchemy.orm import Session, relationship
from sqlalchemy import Column, ForeignKey, and_, or_, desc
from sqlalchemy.types import (
CHAR, String, Integer, TEXT, DATETIME
)
from api.db.db import Base, engine
from api.conf.config import ConfigWrapper
# Config file to determine runtime env
_CONFIG = ConfigWrapper.read_config()
def _generate_id(entity, db_session: Session) -> str:
ID = str(uuid4())
while(db_session.query(entity).filter(entity.ID == ID).first()):
# This is virtually impossible but just in case
ID = str(uuid4())
return ID
# DialpadTokenStore
DTS = TypeVar('DialpadTokenStore')
class DialpadTokenStore(Base):
"""Dialpad session store for deleted storage and states"""
__tablename__ = 'dialcopper_dpad_token'
ID = Column(CHAR(36), primary_key=True, nullable=False)
dialpad_user_id = Column(String(255), primary_key=False, nullable=False)
dialpad_access_token = Column(TEXT, primary_key=False, nullable=False)
def __init__(self, db_session: Session, **kwargs):
self.ID: str = _generate_id(DialpadTokenStore, db_session)
self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
self.dialpad_access_token: str = kwargs.get('dialpad_access_token')
def save(self, db_session: Session):
"""Saves this instance on db
"""
match = db_session.query(DialpadTokenStore).filter(and_(
DialpadTokenStore.dialpad_user_id == self.dialpad_user_id,
DialpadTokenStore.dialpad_access_token == self.dialpad_access_token
)).first()
if match: return match
db_session.add(self)
db_session.commit()
return self
@staticmethod
def delete(token: str, db_session: Session):
match = db_session.query(DialpadTokenStore).filter(
DialpadTokenStore.dialpad_access_token == token
).first()
if not match: return None
# Since the logout in dialpad invalidates all the active tokens
# we should delete by dialpad user id (not that we're supporting full multi-session anyways...)
db_session.query(DialpadTokenStore).filter(
DialpadTokenStore.dialpad_user_id == match.dialpad_user_id
).delete()
db_session.commit()
@staticmethod
def find_for_user(dialpad_id: str, db_session: Session) -> Optional[DTS]:
"""Mathing session
"""
return db_session.query(DialpadTokenStore).filter(
DialpadTokenStore.dialpad_user_id == dialpad_id
).first()
@staticmethod
def find_for_token(dialpad_token: str, db_session: Session) -> Optional[DTS]:
"""Mathing session
"""
return db_session.query(DialpadTokenStore).filter(
DialpadTokenStore.dialpad_access_token == dialpad_token
).first()
# Matches
M = TypeVar('Match')
class Match(Base):
"""Table to store matches between copper and dialpad
"""
__tablename__ = 'dialcopper_match'
ID = Column(CHAR(36), primary_key=True, nullable=False)
copper_id = Column(Integer, primary_key=False, nullable=False)
copper_type = Column(String(255), primary_key=False, nullable=False)
dialpad_user_id = Column(String(255), primary_key=False, nullable=False)
dialpad_contact_id = Column(String(255), primary_key=False, nullable=False)
def __init__(self, db_session: Session, **kwargs):
self.ID: str = _generate_id(Match, db_session)
self.copper_id: int = kwargs.get('copper_id')
self.copper_type: str = kwargs.get('copper_type')
self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
self.dialpad_contact_id: str = kwargs.get('dialpad_contact_id')
def save(self, db_session: Session):
"""Saves this instance on db
"""
match = db_session.query(Match).filter(and_(
Match.copper_id == self.copper_id,
Match.copper_type == self.copper_type,
Match.dialpad_user_id == self.dialpad_user_id,
Match.dialpad_contact_id == self.dialpad_contact_id
)).first()
if match: return match
db_session.add(self)
db_session.commit()
return self
@staticmethod
def delete(ID: str, db_session: Session):
match = db_session.query(Match).filter(Match.ID == ID).first()
if not match: return None
db_session.delete(match)
db_session.commit()
@staticmethod
def find_match(dialpad_id: str, contact_id: str, db_session: Session) -> Optional[M]:
"""Mathing contact
"""
return db_session.query(Match).distinct().filter(and_(
Match.dialpad_user_id == dialpad_id,
Match.dialpad_contact_id == contact_id
)).first()
# Dialpad event subscriptions
ES = TypeVar('Subscription')
class Subscription(Base):
"""Storage for dialpad subscriptions
"""
__tablename__ = 'dialcopper_subscription'
ID = Column(CHAR(36), primary_key=True, nullable=False)
dialpad_user_id = Column(String(255), primary_key=False, nullable=False)
dialpad_access_token = Column(String(255), primary_key=False, nullable=True)
subscription_id = Column(String(255), primary_key=False, nullable=True)
def __init__(self, db_session: Session, **kwargs):
self.ID: str = _generate_id(Subscription, db_session)
self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
self.dialpad_access_token: str = kwargs.get('dialpad_access_token')
self.subscription_id: str = kwargs.get('subscription_id')
def save(self, db_session: Session) -> ES:
"""Saves this instance in DB
Returns:
Subscription: Saved instance for chaining
"""
match = Subscription.find_for_session(self.dialpad_user_id, db_session)
if match: return match
db_session.add(self)
db_session.commit()
return self
def delete(self, db_session: Session):
db_session.delete(self)
db_session.commit()
@staticmethod
def find_for_session(dialpad_user_id: str, db_session: Session) -> Optional[ES]:
"""Finds a subscription for a given user session
Args:
access_token (str): user's current access token
dialpad_user_id (str): user id
Returns:
Subscription: Matching subscription
"""
return db_session.query(Subscription).filter(
Subscription.dialpad_user_id == dialpad_user_id
).first()
# Dialpad company webhooks
DW = TypeVar('DialpadWebhook')
class DialpadWebhook(Base):
"""Persistent store for dialpad webhooks"""
__tablename__ = 'dialcopper_webhook'
ID = Column(CHAR(36), primary_key=True, nullable=False)
hook_id = Column(String(255), primary_key=False, nullable=False)
dialpad_company_id = Column(String(255), primary_key=False, nullable=False)
webhook_secret = Column(String(500), primary_key=False, nullable=False)
def __init__(self, db_session: Session, **kwargs):
self.ID: str = _generate_id(DialpadWebhook, db_session)
self.hook_id: str = kwargs.get('hook_id')
self.webhook_secret: str = kwargs.get('webhook_secret')
self.dialpad_company_id: str = kwargs.get('dialpad_company_id')
def save(self, db_session: Session) -> DW:
"""Saves this instance in DB
Returns:
DialpadWebhook: Saved instance for chaining
"""
match = DialpadWebhook.find_for_company(self.dialpad_company_id, db_session)
if match: return match
db_session.add(self)
db_session.commit()
return self
@staticmethod
def find_for_company(dialpad_company_id: str, db_session: Session) -> Optional[DW]:
"""Finds a persisted webhook for a given company to avoid making unnecessary requests to dialpad API
Args:
dialpad_company_id (str): dialpad company id
Returns:
Optional[DialpadWebhook]: Matching webhook or None
"""
return db_session.query(DialpadWebhook).filter(
DialpadWebhook.dialpad_company_id == dialpad_company_id
).first()
# Copper API Keys
CK = TypeVar('CopperKeyStore')
class CopperKeyStore(Base):
"""Storage of Copper CRM Keys for dialpad users
"""
__tablename__ = 'copper_key_store'
ID = Column(CHAR(36), primary_key=True, nullable=False)
dialpad_id = Column(String(255), primary_key=False, nullable=False)
copper_api_key = Column(String(255), primary_key=False, nullable=True)
copper_api_email = Column(String(255), primary_key=False, nullable=True)
def __init__(self, db_session: Session, **kwargs):
self.ID: str = _generate_id(CopperKeyStore, db_session)
self.dialpad_id: str = kwargs.get('dialpad_id')
self.copper_api_key: str = kwargs.get('copper_api_key')
self.copper_api_email: str = kwargs.get('copper_api_email')
@staticmethod
def find_for_dialpad(dialpad_id: str, db_session: Session) -> Optional[CK]:
"""Finds a Copper CRM key info in the store for a given dialpad user id
Args:
dialpad_id ([type]): dialpad user id
db_session (Session): api db session
Returns:
Optional[CK]: Copper key instance if present
"""
return db_session.query(CopperKeyStore).filter(
CopperKeyStore.dialpad_id == dialpad_id
).first()
def save(self, db_session: Session):
db_session.add(self)
db_session.commit()
return self
def delete_record(self, db_session: Session):
"""Deletes this instance
Args:
db_session (Session): Local session
"""
db_session.delete(self)
db_session.commit()
# Copper Activity types
AT = TypeVar('CopperActivityTypeStore')
class CopperActivityTypeStore(Base):
"""Storage for Copper CRM activity types
"""
__tablename__ = 'copper_activity_type_store'
USER_CATEGORY = 'user'
PHONE_CALL_TYPE = 'Phone Call'
ID = Column(CHAR(36), primary_key=True, nullable=False)
copper_id = Column(Integer, primary_key=False, nullable=False)
dialpad_company_id = Column(String(255), primary_key=False, nullable=False)
copper_category = Column(String(255), primary_key=False, nullable=False)
copper_name = Column(String(255), primary_key=False, nullable=False)
user_list = relationship('CopperActivityTypeStoreUsers', backref='copper_activity_type_store', lazy = True)
def __init__(self, db_session: Session, **kwargs):
self.ID: str = _generate_id(CopperActivityTypeStore, db_session)
self.copper_id: int = kwargs.get('copper_id')
self.copper_name: str = kwargs.get('copper_name')
self.copper_category: str = kwargs.get('copper_category')
self.dialpad_company_id: str = kwargs.get('dialpad_company_id')
self.user_list: list = [] if not self.user_list else self.user_list
def save(self, db_session: Session) -> AT:
"""Saves this instance to storage if absent
Args:
db_session (Session): LocalDBSession
Returns:
AT : saved instance
"""
match = db_session.query(CopperActivityTypeStore).filter(and_(
CopperActivityTypeStore.dialpad_company_id == self.dialpad_company_id,
CopperActivityTypeStore.copper_category == self.copper_category,
CopperActivityTypeStore.copper_name == self.copper_name
)).first()
if match: return match
db_session.add(self)
db_session.commit()
return self
def add_user(self, db_session: Session, dialpad_user_id: str) -> AT:
link = db_session.query(CopperActivityTypeStoreUsers).filter(and_(
CopperActivityTypeStoreUsers.type_store_id == self.ID,
CopperActivityTypeStoreUsers.dialpad_user_id == dialpad_user_id
)).first()
if link: return self
self.user_list.append(CopperActivityTypeStoreUsers(
db_session, dialpad_user_id = dialpad_user_id
))
db_session.commit()
return self
@staticmethod
def find_for_phone_call(dialpad_company_id: str, db_session: Session) -> Optional[AT]:
"""Finds a Copper CRM activity 'Phone Call' type by the dialpad company id
Args:
dialpad_id ([type]): dialpad company id
db_session (Session): api db session
Returns:
Optional[AT]: Copper key instance if present
"""
return db_session.query(CopperActivityTypeStore).filter(and_(
CopperActivityTypeStore.dialpad_company_id == dialpad_company_id,
CopperActivityTypeStore.copper_name == CopperActivityTypeStore.PHONE_CALL_TYPE,
CopperActivityTypeStore.copper_category == CopperActivityTypeStore.USER_CATEGORY
)).first()
@staticmethod
def find_for_user(dialpad_user_id: str, db_session: Session) -> Optional[AT]:
"""Finds a Copper CRM activity 'Phone Call' type by the dialpad user id
Args:
dialpad_id (str): dialpad user id
db_session (Session): api db session
Returns:
Optional[AT]: Copper key instance if present
"""
return db_session.query(CopperActivityTypeStore).join(
CopperActivityTypeStoreUsers,
CopperActivityTypeStore.ID == CopperActivityTypeStoreUsers.type_store_id
).filter(and_(
CopperActivityTypeStoreUsers.dialpad_user_id == dialpad_user_id,
CopperActivityTypeStore.copper_name == CopperActivityTypeStore.PHONE_CALL_TYPE,
CopperActivityTypeStore.copper_category == CopperActivityTypeStore.USER_CATEGORY
)).first()
# Dialpad users related to Copper Activity type store companies
ATU = TypeVar('CopperActivityTypeStoreUsers')
class CopperActivityTypeStoreUsers(Base):
"""Storage for Copper CRM activity types"""
__tablename__ = 'copper_activity_type_store_users'
ID = Column(CHAR(36), primary_key=True, nullable=False)
type_store_id = Column(CHAR(36), ForeignKey('copper_activity_type_store.ID'), nullable = False)
dialpad_user_id = Column(String(255), primary_key=False, nullable=False)
def __init__(self, db_session: Session, **kwargs):
self.ID: str = _generate_id(CopperActivityTypeStoreUsers, db_session)
self.type_store_id = kwargs.get('type_store_id')
self.dialpad_user_id = kwargs.get('dialpad_user_id')
def save(self, db_session: Session) -> ATU:
"""Saves this instance to storage if absent
Args:
db_session (Session): LocalDBSession
Returns:
AT : saved instance
"""
match = db_session.query(CopperActivityTypeStoreUsers).filter(
CopperActivityTypeStoreUsers.dialpad_user_id == dialpad_user_id
).first()
if match: return match
db_session.add(self)
db_session.commit()
return self
# Call logging temp persistence
C = TypeVar('Call')
class Call(Base):
"""Temporary storage for call events
"""
__tablename__ = 'dialcopper_call'
ID = Column(CHAR(36), primary_key=True, nullable=False)
dialpad_user_id = Column(String(255), primary_key=False, nullable=False)
call_id = Column(String(255), primary_key=False, nullable=False)
created_at = Column(DATETIME, primary_key=False, nullable=False)
subject = Column(String(255), primary_key=False, nullable=True)
description = Column(TEXT, primary_key=False, nullable=True)
transcript_url = Column(TEXT, primary_key=False, nullable=True)
copper_activity_id = Column(String(255), primary_key=False, nullable=True)
copper_activity_date = Column(Integer, primary_key=False, nullable=True)
event_data = Column(TEXT, primary_key=False, nullable=False)
def __init__(self, db_session: Session, **kwargs):
self.ID: str = _generate_id(Call, db_session)
self.call_id: str = kwargs.get('call_id')
self.subject: str = kwargs.get('subject')
self.created_at: datetime = datetime.now()
self.description: str = kwargs.get('description')
self.transcript_url: int = kwargs.get('transcript_url')
self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
self.copper_activity_id: str = kwargs.get('copper_activity_id')
self.copper_activity_date: int = kwargs.get('copper_activity_date')
self.event_data: str = json.dumps([kwargs.get('event_data')] if kwargs.get('event_data') else [])
@property
def submitted(self) -> bool:
"""If this event was submitted successfuly to Copper CRM
Returns:
bool: True if copper_activity_id is not falsy
"""
return bool(self.copper_activity_id)
@property
def parsed_events(self) -> List[Dict]:
"""Parsed json for instance registered call events
Returns:
List[Dict]: List of registered call events
"""
return json.loads(self.event_data)
@property
def call_states(self) -> Set[str]:
"""Registered states for this call
Returns:
Set[str]: registered call states
"""
return { e['state'] for e in self.parsed_events }
def save(self, db_session: Session):
match = Call.find_for_event(self.call_id, db_session)
if match: return match
db_session.add(self)
db_session.commit()
return self
def add_notes(self, subject: str, details: str, db_session: Session, transcript: str = None):
"""Add user notes to persistent record
Args:
subject (str): User subject notes
details (str): User details notes
db_session (Session): Local session
transcript (str, optional): Transcription url [Defaults to None].
"""
if subject:
self.subject = subject
if details:
self.description = f'{self.description}. {details}' if self.description else details
if transcript:
self.transcript_url = transcript
# Any modification commit
if (subject or details or transcript): db_session.commit()
return self
def add_copper_activity_data(self, copper_activity_id: int, copper_activity_date: int, db_session: Session):
"""Adds a copper activity id to the record
Args:
copper_activity_id (int): copper activity id
copper_activity_date (int): copper activity date as unix timestamp
db_session (Session): local session
"""
self.copper_activity_id = copper_activity_id
self.copper_activity_date = copper_activity_date
db_session.commit()
return self
def add_event_data(self, event_data: dict, db_session: Session):
"""Adds a new posted json event to the record json event log
Args:
event_data (dict): parsed posted json
db_session (Session): LocalSession
"""
events = self.parsed_events
events.append(event_data)
self.event_data = json.dumps(events)
db_session.commit()
def update_copper_id(self, copper_activity_id: str, db_session: Session):
if copper_activity_id == self.copper_activity_id: return False
self.copper_activity_id = copper_activity_id
db_session.commit()
return True
def delete_record(self, db_session: Session):
"""Deletes this instance
Args:
db_session (Session): Local session
"""
db_session.delete(self)
db_session.commit()
@staticmethod
def find_for_event(call_id: str, db_session: Session) -> Optional[C]:
"""Finds all call events received for a call id
Args:
call_id ([type]): dialpad call id
db_session (Session): api db session
Returns:
List[Call]: Stored call events desc sorted by creation_date
"""
return db_session.query(Call).filter(Call.call_id == call_id).first()
@staticmethod
def find_for_user(user_id: str, db_session: Session, copper_ids: Set[int], limit: int = 5) -> List[C]:
"""Finds the latests 'limit' call events related to a user the
call events must have a subject and a registered activity in Copper
to appear in the search.
Args:
user_id (str): dialpad user id
db_session (Session): api db session
Returns:
List[Call]: Stored call events desc sorted by creation_date
"""
query = db_session.query(Call).filter(and_(
Call.subject != None,
Call.dialpad_user_id == user_id,
Call.copper_activity_date != None,
Call.copper_activity_id.in_(copper_ids)
))
query = query.order_by(desc(Call.copper_activity_date)).limit(limit)
return query.all()
# Call logging temp persistence
P = TypeVar('PipelineStage')
class AccountPipelineStage(Base):
"""Temporary storage for account pipeline stages
"""
__tablename__ = 'dialcopper_pipeline_stage'
ID = Column(CHAR(36), primary_key=True, nullable=False)
copper_account_id = Column(String(255), primary_key=False, nullable=False)
copper_stages = Column(TEXT, primary_key=False, nullable=False)
def __init__(self, db_session: Session, **kwargs):
self.ID: str = _generate_id(AccountPipelineStage, db_session)
self.copper_account_id: str = kwargs.get('copper_account_id')
self.copper_stages: str = json.dumps(kwargs.get('copper_stages') if kwargs.get('copper_stages') else [])
@property
def parsed_stages(self) -> List[Dict]:
"""Parsed json for instance registered pipeline stages
Returns:
List[Dict]: List of registered call events
"""
return json.loads(self.copper_stages)
def save(self, db_session: Session):
match = AccountPipelineStage.find_for_account(self.copper_account_id, db_session)
if match: return match
db_session.add(self)
db_session.commit()
return self
def update_stages(self, stages: List[dict], db_session: Session):
"""Updates this instace stages
Args:
stages (List[dict]): new stages json
db_session (Session): LocalSession
"""
self.copper_stages = json.dumps(stages)
db_session.commit()
@staticmethod
def find_for_account(copper_account_id: str, db_session: Session) -> Optional[P]:
"""Finds all call events received for a call id
Args:
copper_account_id (str): Copper CRM account id
db_session (Session): api db session
Returns:
Optional[AccountPipelineStage]: Stored account stages
"""
return db_session.query(AccountPipelineStage).filter(
AccountPipelineStage.copper_account_id == copper_account_id
).first()
# Call logging temp persistence
CD = TypeVar('CopperDeal')
class CopperDeal(Base):
"""Temporary storage for account pipeline stages
"""
__tablename__ = 'dialcopper_deal'
ID = Column(CHAR(36), primary_key=True, nullable=False)
copper_deal_id = Column(String(255), primary_key=False, nullable=False)
copper_entity_id = Column(String(255), primary_key=False, nullable=False)
copper_entity_type = Column(String(255), primary_key=False, nullable=False)
expires_at = Column(DATETIME, primary_key=False, nullable=False)
copper_response = Column(TEXT, primary_key=False, nullable=False)
def __init__(self, db_session: Session, **kwargs):
self.ID: str = _generate_id(CopperDeal, db_session)
# Expire every 30 minutes
self.expires_at = datetime.now() + timedelta(minutes = 30)
self.copper_deal_id: str = kwargs.get('copper_deal_id')
self.copper_entity_id: str = kwargs.get('copper_entity_id')
self.copper_entity_type: str = kwargs.get('copper_entity_type')
self.copper_response: str = json.dumps(kwargs.get('copper_response') if kwargs.get('copper_response') else dict())
@property
def parsed_deal(self) -> Dict:
"""Parsed json for instance registered pipeline stages
Returns:
List[Dict]: List of registered call events
"""
return json.loads(self.copper_response)
@property
def expired(self) -> bool:
"""Flag to check if the record is expired
Returns:
bool: True if expired False otherwise
"""
return self.expires_at < datetime.now()
def save(self, db_session: Session):
match = CopperDeal.find_for_deal(self.copper_deal_id, db_session)
if match: return match
db_session.add(self)
db_session.commit()
return self
def update_copper_response(self, copper_response: dict, db_session: Session):
"""Updates this instace stages
Args:
stages (List[dict]): new stages json
db_session (Session): LocalSession
"""
self.copper_response = json.dumps(copper_response)
self.expires_at = datetime.now() + timedelta(minutes = 30)
db_session.commit()
def delete_record(self, db_session: Session):
"""Deletes this instance
Args:
db_session (Session): Local session
"""
db_session.delete(self)
db_session.commit()
@staticmethod
def find_for_deal(deal_id: str, db_session: Session) -> Optional[CD]:
"""Finds a deal by copper deal id
Args:
copper_account_id (str): Copper CRM account id
db_session (Session): api db session
Returns:
Optional[AccountPipelineStage]: Stored account stages
"""
return db_session.query(CopperDeal).filter(
CopperDeal.copper_deal_id == deal_id
).first()
@staticmethod
def find_for_entity(entity_id: str, entity_type: str, db_session: Session) -> List[CD]:
"""All deals related to a user
Args:
entity_id (str): copper entity id
entity_type (str): copper entity type
db_session (Session): local db session
Returns:
List[CD]: entity records
"""
return db_session.query(CopperDeal).filter(and_(
CopperDeal.copper_entity_id == entity_id,
CopperDeal.copper_entity_type == entity_type
)).all()
if not _CONFIG.config_scope.is_server:
# Only create metadata on unit tests
# As GC doesn't support this
Base.metadata.create_all(engine)