HEX
HEX
Server: Apache/2.4.29 (Ubuntu)
System: Linux 2amigos-php74 5.4.0-1103-aws #111~18.04.1-Ubuntu SMP Tue May 23 20:04:10 UTC 2023 x86_64
User: squarehillcompany.com (1002)
PHP: 7.4.25
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: /var/www/vhosts/dial-copper-dev.2amigos.us/dialpad-copper-crm/api/db/rdbms.py
"""SQLAlchemy models for db
"""
import json
from uuid import uuid4
from datetime import datetime, timedelta
from typing import Optional, TypeVar, List, Dict, Set

from sqlalchemy.orm import Session, relationship
from sqlalchemy import Column, ForeignKey, and_, or_, desc
from sqlalchemy.types import (
    CHAR, String, Integer, TEXT, DATETIME
)

from api.db.db import Base, engine
from api.conf.config import ConfigWrapper

# Config file to determine runtime env
_CONFIG = ConfigWrapper.read_config()

def _generate_id(entity, db_session: Session) -> str:
    ID = str(uuid4())
    while(db_session.query(entity).filter(entity.ID == ID).first()):
        # This is virtually impossible but just in case
        ID = str(uuid4())
    return ID

# DialpadTokenStore
DTS = TypeVar('DialpadTokenStore')
class DialpadTokenStore(Base):
    """Dialpad session store for deleted storage and states"""

    __tablename__ = 'dialcopper_dpad_token'

    ID = Column(CHAR(36), primary_key=True, nullable=False)
    dialpad_user_id = Column(String(255), primary_key=False, nullable=False)
    dialpad_access_token = Column(TEXT, primary_key=False, nullable=False)

    def __init__(self, db_session: Session, **kwargs):
        self.ID: str = _generate_id(DialpadTokenStore, db_session)
        self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
        self.dialpad_access_token: str = kwargs.get('dialpad_access_token')

    def save(self, db_session: Session):
        """Saves this instance on db
        """
        match = db_session.query(DialpadTokenStore).filter(and_(
            DialpadTokenStore.dialpad_user_id == self.dialpad_user_id,
            DialpadTokenStore.dialpad_access_token == self.dialpad_access_token
        )).first()

        if match: return match

        db_session.add(self)
        db_session.commit()
        return self

    @staticmethod
    def delete(token: str, db_session: Session):
        match = db_session.query(DialpadTokenStore).filter(
            DialpadTokenStore.dialpad_access_token == token
        ).first()

        if not match: return None

        # Since the logout in dialpad invalidates all the active tokens
        # we should delete by dialpad user id (not that we're supporting full multi-session anyways...)
        db_session.query(DialpadTokenStore).filter(
            DialpadTokenStore.dialpad_user_id == match.dialpad_user_id
        ).delete()

        db_session.commit()

    @staticmethod
    def find_for_user(dialpad_id: str, db_session: Session) -> Optional[DTS]:
        """Mathing session
        """
        return db_session.query(DialpadTokenStore).filter(
                DialpadTokenStore.dialpad_user_id == dialpad_id
            ).first()

    @staticmethod
    def find_for_token(dialpad_token: str, db_session: Session) -> Optional[DTS]:
        """Mathing session
        """
        return db_session.query(DialpadTokenStore).filter(
                DialpadTokenStore.dialpad_access_token == dialpad_token
            ).first()

# Matches
M = TypeVar('Match')
class Match(Base):
    """Table to store matches between copper and dialpad
    """
    __tablename__ = 'dialcopper_match'

    ID = Column(CHAR(36), primary_key=True, nullable=False)
    copper_id = Column(Integer, primary_key=False, nullable=False)
    copper_type = Column(String(255), primary_key=False, nullable=False)
    dialpad_user_id = Column(String(255), primary_key=False, nullable=False)
    dialpad_contact_id = Column(String(255), primary_key=False, nullable=False)

    def __init__(self, db_session: Session, **kwargs):
        self.ID: str = _generate_id(Match, db_session)
        self.copper_id: int = kwargs.get('copper_id')
        self.copper_type: str = kwargs.get('copper_type')
        self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
        self.dialpad_contact_id: str = kwargs.get('dialpad_contact_id')

    def save(self, db_session: Session):
        """Saves this instance on db
        """
        match = db_session.query(Match).filter(and_(
            Match.copper_id == self.copper_id,
            Match.copper_type == self.copper_type,
            Match.dialpad_user_id == self.dialpad_user_id,
            Match.dialpad_contact_id == self.dialpad_contact_id
        )).first()

        if match: return match

        db_session.add(self)
        db_session.commit()
        return self

    @staticmethod
    def delete(ID: str, db_session: Session):
        match = db_session.query(Match).filter(Match.ID == ID).first()
        if not match: return None

        db_session.delete(match)
        db_session.commit()

    @staticmethod
    def find_match(dialpad_id: str, contact_id: str, db_session: Session) -> Optional[M]:
        """Mathing contact
        """
        return db_session.query(Match).distinct().filter(and_(
                Match.dialpad_user_id == dialpad_id,
                Match.dialpad_contact_id == contact_id
            )).first()

# Dialpad event subscriptions
ES = TypeVar('Subscription')
class Subscription(Base):
    """Storage for dialpad subscriptions
    """
    __tablename__ = 'dialcopper_subscription'

    ID = Column(CHAR(36), primary_key=True, nullable=False)
    dialpad_user_id = Column(String(255), primary_key=False, nullable=False)
    dialpad_access_token = Column(String(255), primary_key=False, nullable=True)
    subscription_id = Column(String(255), primary_key=False, nullable=True)

    def __init__(self, db_session: Session, **kwargs):
        self.ID: str = _generate_id(Subscription, db_session)
        self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
        self.dialpad_access_token: str = kwargs.get('dialpad_access_token')
        self.subscription_id: str = kwargs.get('subscription_id')

    def save(self, db_session: Session) -> ES:
        """Saves this instance in DB

        Returns:
            Subscription: Saved instance for chaining
        """
        match = Subscription.find_for_session(self.dialpad_user_id, db_session)

        if match: return match

        db_session.add(self)
        db_session.commit()
        return self

    def delete(self, db_session: Session):
        db_session.delete(self)
        db_session.commit()

    @staticmethod
    def find_for_session(dialpad_user_id: str, db_session: Session) -> Optional[ES]:
        """Finds a subscription for a given user session

        Args:
            access_token (str): user's current access token
            dialpad_user_id (str): user id

        Returns:
            Subscription: Matching subscription
        """
        return db_session.query(Subscription).filter(
            Subscription.dialpad_user_id == dialpad_user_id
        ).first()

# Dialpad company webhooks
DW = TypeVar('DialpadWebhook')
class DialpadWebhook(Base):
    """Persistent store for dialpad webhooks"""

    __tablename__ = 'dialcopper_webhook'

    ID = Column(CHAR(36), primary_key=True, nullable=False)
    hook_id = Column(String(255), primary_key=False, nullable=False)
    dialpad_company_id = Column(String(255), primary_key=False, nullable=False)
    webhook_secret = Column(String(500), primary_key=False, nullable=False)

    def __init__(self, db_session: Session, **kwargs):
        self.ID: str = _generate_id(DialpadWebhook, db_session)
        self.hook_id: str = kwargs.get('hook_id')
        self.webhook_secret: str = kwargs.get('webhook_secret')
        self.dialpad_company_id: str = kwargs.get('dialpad_company_id')

    def save(self, db_session: Session) -> DW:
        """Saves this instance in DB

        Returns:
            DialpadWebhook: Saved instance for chaining
        """
        match = DialpadWebhook.find_for_company(self.dialpad_company_id, db_session)

        if match: return match

        db_session.add(self)
        db_session.commit()
        return self

    @staticmethod
    def find_for_company(dialpad_company_id: str, db_session: Session) -> Optional[DW]:
        """Finds a persisted webhook for a given company to avoid making unnecessary requests to dialpad API

        Args:
            dialpad_company_id (str): dialpad company id

        Returns:
            Optional[DialpadWebhook]: Matching webhook or None
        """
        return db_session.query(DialpadWebhook).filter(
            DialpadWebhook.dialpad_company_id == dialpad_company_id
        ).first()

# Copper API Keys
CK = TypeVar('CopperKeyStore')
class CopperKeyStore(Base):
    """Storage of Copper CRM Keys for dialpad users
    """

    __tablename__ = 'copper_key_store'

    ID = Column(CHAR(36), primary_key=True, nullable=False)
    dialpad_id = Column(String(255), primary_key=False, nullable=False)
    copper_api_key = Column(String(255), primary_key=False, nullable=True)
    copper_api_email = Column(String(255), primary_key=False, nullable=True)

    def __init__(self, db_session: Session, **kwargs):
        self.ID: str = _generate_id(CopperKeyStore, db_session)
        self.dialpad_id: str = kwargs.get('dialpad_id')
        self.copper_api_key: str = kwargs.get('copper_api_key')
        self.copper_api_email: str = kwargs.get('copper_api_email')

    @staticmethod
    def find_for_dialpad(dialpad_id: str, db_session: Session) -> Optional[CK]:
        """Finds a Copper CRM key info in the store for a given dialpad user id

        Args:
            dialpad_id ([type]): dialpad user id
            db_session (Session): api db session

        Returns:
            Optional[CK]: Copper key instance if present
        """
        return db_session.query(CopperKeyStore).filter(
            CopperKeyStore.dialpad_id == dialpad_id
        ).first()

    def save(self, db_session: Session):
        db_session.add(self)
        db_session.commit()
        return self

    def delete_record(self, db_session: Session):
        """Deletes this instance

        Args:
            db_session (Session): Local session
        """
        db_session.delete(self)
        db_session.commit()

# Copper Activity types
AT = TypeVar('CopperActivityTypeStore')
class CopperActivityTypeStore(Base):
    """Storage for Copper CRM activity types
    """

    __tablename__ = 'copper_activity_type_store'

    USER_CATEGORY = 'user'
    PHONE_CALL_TYPE = 'Phone Call'

    ID = Column(CHAR(36), primary_key=True, nullable=False)
    copper_id = Column(Integer, primary_key=False, nullable=False)
    dialpad_company_id = Column(String(255), primary_key=False, nullable=False)
    copper_category = Column(String(255), primary_key=False, nullable=False)
    copper_name = Column(String(255), primary_key=False, nullable=False)
    user_list = relationship('CopperActivityTypeStoreUsers', backref='copper_activity_type_store', lazy = True)

    def __init__(self, db_session: Session, **kwargs):
        self.ID: str = _generate_id(CopperActivityTypeStore, db_session)
        self.copper_id: int = kwargs.get('copper_id')
        self.copper_name: str = kwargs.get('copper_name')
        self.copper_category: str = kwargs.get('copper_category')
        self.dialpad_company_id: str = kwargs.get('dialpad_company_id')
        self.user_list: list = [] if not self.user_list else self.user_list

    def save(self, db_session: Session) -> AT:
        """Saves this instance to storage if absent

        Args:
            db_session (Session): LocalDBSession

        Returns:
           AT : saved instance
        """
        match = db_session.query(CopperActivityTypeStore).filter(and_(
            CopperActivityTypeStore.dialpad_company_id == self.dialpad_company_id,
            CopperActivityTypeStore.copper_category == self.copper_category,
            CopperActivityTypeStore.copper_name == self.copper_name
        )).first()

        if match: return match

        db_session.add(self)
        db_session.commit()
        return self

    def add_user(self, db_session: Session,  dialpad_user_id: str) -> AT:
        link = db_session.query(CopperActivityTypeStoreUsers).filter(and_(
            CopperActivityTypeStoreUsers.type_store_id == self.ID,
            CopperActivityTypeStoreUsers.dialpad_user_id == dialpad_user_id
        )).first()

        if link: return self

        self.user_list.append(CopperActivityTypeStoreUsers(
            db_session, dialpad_user_id = dialpad_user_id
        ))

        db_session.commit()
        return self

    @staticmethod
    def find_for_phone_call(dialpad_company_id: str, db_session: Session) -> Optional[AT]:
        """Finds a Copper CRM activity 'Phone Call' type by the dialpad company id

        Args:
            dialpad_id ([type]): dialpad company id
            db_session (Session): api db session

        Returns:
            Optional[AT]: Copper key instance if present
        """
        return db_session.query(CopperActivityTypeStore).filter(and_(
            CopperActivityTypeStore.dialpad_company_id == dialpad_company_id,
            CopperActivityTypeStore.copper_name == CopperActivityTypeStore.PHONE_CALL_TYPE,
            CopperActivityTypeStore.copper_category == CopperActivityTypeStore.USER_CATEGORY
        )).first()

    @staticmethod
    def find_for_user(dialpad_user_id: str, db_session: Session) -> Optional[AT]:
        """Finds a Copper CRM activity 'Phone Call' type by the dialpad user id

        Args:
            dialpad_id (str): dialpad user id
            db_session (Session): api db session

        Returns:
            Optional[AT]: Copper key instance if present
        """
        return db_session.query(CopperActivityTypeStore).join(
                CopperActivityTypeStoreUsers,
                CopperActivityTypeStore.ID == CopperActivityTypeStoreUsers.type_store_id
            ).filter(and_(
                CopperActivityTypeStoreUsers.dialpad_user_id == dialpad_user_id,
                CopperActivityTypeStore.copper_name == CopperActivityTypeStore.PHONE_CALL_TYPE,
                CopperActivityTypeStore.copper_category == CopperActivityTypeStore.USER_CATEGORY
        )).first()

# Dialpad users related to Copper Activity type store companies
ATU = TypeVar('CopperActivityTypeStoreUsers')
class CopperActivityTypeStoreUsers(Base):
    """Storage for Copper CRM activity types"""

    __tablename__ = 'copper_activity_type_store_users'

    ID = Column(CHAR(36), primary_key=True, nullable=False)
    type_store_id = Column(CHAR(36), ForeignKey('copper_activity_type_store.ID'), nullable = False)
    dialpad_user_id = Column(String(255), primary_key=False, nullable=False)

    def __init__(self, db_session: Session, **kwargs):
        self.ID: str = _generate_id(CopperActivityTypeStoreUsers, db_session)
        self.type_store_id = kwargs.get('type_store_id')
        self.dialpad_user_id = kwargs.get('dialpad_user_id')

    def save(self, db_session: Session) -> ATU:
        """Saves this instance to storage if absent

        Args:
            db_session (Session): LocalDBSession

        Returns:
           AT : saved instance
        """
        match = db_session.query(CopperActivityTypeStoreUsers).filter(
            CopperActivityTypeStoreUsers.dialpad_user_id == dialpad_user_id
        ).first()

        if match: return match

        db_session.add(self)
        db_session.commit()
        return self

# Call logging temp persistence
C = TypeVar('Call')
class Call(Base):
    """Temporary storage for call events
    """
    __tablename__ = 'dialcopper_call'

    ID = Column(CHAR(36), primary_key=True, nullable=False)
    dialpad_user_id = Column(String(255), primary_key=False, nullable=False)
    call_id = Column(String(255), primary_key=False, nullable=False)
    created_at = Column(DATETIME, primary_key=False, nullable=False)
    subject = Column(String(255), primary_key=False, nullable=True)
    description = Column(TEXT, primary_key=False, nullable=True)
    transcript_url = Column(TEXT, primary_key=False, nullable=True)
    copper_activity_id = Column(String(255), primary_key=False, nullable=True)
    copper_activity_date = Column(Integer, primary_key=False, nullable=True)
    event_data = Column(TEXT, primary_key=False, nullable=False)

    def __init__(self, db_session: Session, **kwargs):
        self.ID: str = _generate_id(Call, db_session)
        self.call_id: str = kwargs.get('call_id')
        self.subject: str = kwargs.get('subject')
        self.created_at: datetime = datetime.now()
        self.description: str = kwargs.get('description')
        self.transcript_url: int = kwargs.get('transcript_url')
        self.dialpad_user_id: str = kwargs.get('dialpad_user_id')
        self.copper_activity_id: str = kwargs.get('copper_activity_id')
        self.copper_activity_date: int = kwargs.get('copper_activity_date')
        self.event_data: str = json.dumps([kwargs.get('event_data')] if kwargs.get('event_data') else [])

    @property
    def submitted(self) -> bool:
        """If this event was submitted successfuly to Copper CRM

        Returns:
            bool: True if copper_activity_id is not falsy
        """
        return bool(self.copper_activity_id)

    @property
    def parsed_events(self) -> List[Dict]:
        """Parsed json for instance registered call events

        Returns:
            List[Dict]: List of registered call events
        """
        return json.loads(self.event_data)

    @property
    def call_states(self) -> Set[str]:
        """Registered states for this call

        Returns:
            Set[str]: registered call states
        """
        return { e['state'] for e in self.parsed_events  }

    def save(self, db_session: Session):
        match = Call.find_for_event(self.call_id, db_session)

        if match: return match

        db_session.add(self)
        db_session.commit()
        return self

    def add_notes(self, subject: str, details: str, db_session: Session, transcript: str = None):
        """Add user notes to persistent record

        Args:
            subject (str): User subject notes
            details (str): User details notes
            db_session (Session): Local session
            transcript (str, optional): Transcription url [Defaults to None].
        """
        if subject:
            self.subject = subject
        if details:
            self.description = f'{self.description}. {details}' if self.description else details
        if transcript:
            self.transcript_url = transcript

        # Any modification commit
        if (subject or details or transcript): db_session.commit()

        return self

    def add_copper_activity_data(self, copper_activity_id: int, copper_activity_date: int, db_session: Session):
        """Adds a copper activity id to the record

        Args:
            copper_activity_id (int): copper activity id
            copper_activity_date (int): copper activity date as unix timestamp
            db_session (Session): local session
        """
        self.copper_activity_id = copper_activity_id
        self.copper_activity_date = copper_activity_date
        db_session.commit()
        return self

    def add_event_data(self, event_data: dict, db_session: Session):
        """Adds a new posted json event to the record json event log

        Args:
            event_data (dict): parsed posted json
            db_session (Session): LocalSession
        """
        events = self.parsed_events
        events.append(event_data)
        self.event_data = json.dumps(events)
        db_session.commit()

    def update_copper_id(self, copper_activity_id: str, db_session: Session):
        if copper_activity_id == self.copper_activity_id: return False

        self.copper_activity_id = copper_activity_id
        db_session.commit()

        return True

    def delete_record(self, db_session: Session):
        """Deletes this instance

        Args:
            db_session (Session): Local session
        """
        db_session.delete(self)
        db_session.commit()

    @staticmethod
    def find_for_event(call_id: str, db_session: Session) -> Optional[C]:
        """Finds all call events received for a call id

        Args:
            call_id ([type]): dialpad call id
            db_session (Session): api db session

        Returns:
            List[Call]: Stored call events desc sorted by creation_date
        """
        return db_session.query(Call).filter(Call.call_id == call_id).first()

    @staticmethod
    def find_for_user(user_id: str, db_session: Session, copper_ids: Set[int], limit: int = 5) -> List[C]:
        """Finds the latests 'limit' call events related to a user the
           call events must have a subject and a registered activity in Copper
           to appear in the search.

        Args:
            user_id (str): dialpad user id
            db_session (Session): api db session

        Returns:
            List[Call]: Stored call events desc sorted by creation_date
        """
        query = db_session.query(Call).filter(and_(
                Call.subject != None,
                Call.dialpad_user_id == user_id,
                Call.copper_activity_date != None,
                Call.copper_activity_id.in_(copper_ids)
            ))

        query = query.order_by(desc(Call.copper_activity_date)).limit(limit)
        return query.all()

# Call logging temp persistence
P = TypeVar('PipelineStage')
class AccountPipelineStage(Base):
    """Temporary storage for account pipeline stages
    """
    __tablename__ = 'dialcopper_pipeline_stage'

    ID = Column(CHAR(36), primary_key=True, nullable=False)
    copper_account_id = Column(String(255), primary_key=False, nullable=False)
    copper_stages = Column(TEXT, primary_key=False, nullable=False)

    def __init__(self, db_session: Session, **kwargs):
        self.ID: str = _generate_id(AccountPipelineStage, db_session)
        self.copper_account_id: str = kwargs.get('copper_account_id')
        self.copper_stages: str = json.dumps(kwargs.get('copper_stages') if kwargs.get('copper_stages') else [])

    @property
    def parsed_stages(self) -> List[Dict]:
        """Parsed json for instance registered pipeline stages

        Returns:
            List[Dict]: List of registered call events
        """
        return json.loads(self.copper_stages)

    def save(self, db_session: Session):
        match = AccountPipelineStage.find_for_account(self.copper_account_id, db_session)

        if match: return match

        db_session.add(self)
        db_session.commit()
        return self

    def update_stages(self, stages: List[dict], db_session: Session):
        """Updates this instace stages

        Args:
            stages (List[dict]): new stages json
            db_session (Session): LocalSession
        """
        self.copper_stages = json.dumps(stages)
        db_session.commit()

    @staticmethod
    def find_for_account(copper_account_id: str, db_session: Session) -> Optional[P]:
        """Finds all call events received for a call id

        Args:
            copper_account_id (str): Copper CRM account id
            db_session (Session): api db session

        Returns:
            Optional[AccountPipelineStage]: Stored account stages
        """
        return db_session.query(AccountPipelineStage).filter(
            AccountPipelineStage.copper_account_id == copper_account_id
        ).first()

# Call logging temp persistence
CD = TypeVar('CopperDeal')
class CopperDeal(Base):
    """Temporary storage for account pipeline stages
    """
    __tablename__ = 'dialcopper_deal'

    ID = Column(CHAR(36), primary_key=True, nullable=False)
    copper_deal_id = Column(String(255), primary_key=False, nullable=False)
    copper_entity_id = Column(String(255), primary_key=False, nullable=False)
    copper_entity_type = Column(String(255), primary_key=False, nullable=False)
    expires_at = Column(DATETIME, primary_key=False, nullable=False)
    copper_response = Column(TEXT, primary_key=False, nullable=False)

    def __init__(self, db_session: Session, **kwargs):
        self.ID: str = _generate_id(CopperDeal, db_session)
        # Expire every 30 minutes
        self.expires_at = datetime.now() + timedelta(minutes = 30)
        self.copper_deal_id: str = kwargs.get('copper_deal_id')
        self.copper_entity_id: str = kwargs.get('copper_entity_id')
        self.copper_entity_type: str = kwargs.get('copper_entity_type')
        self.copper_response: str = json.dumps(kwargs.get('copper_response') if kwargs.get('copper_response') else dict())

    @property
    def parsed_deal(self) -> Dict:
        """Parsed json for instance registered pipeline stages

        Returns:
            List[Dict]: List of registered call events
        """
        return json.loads(self.copper_response)

    @property
    def expired(self) -> bool:
        """Flag to check if the record is expired

            Returns:
                bool: True if expired False otherwise
        """
        return self.expires_at < datetime.now()

    def save(self, db_session: Session):
        match = CopperDeal.find_for_deal(self.copper_deal_id, db_session)

        if match: return match

        db_session.add(self)
        db_session.commit()
        return self

    def update_copper_response(self, copper_response: dict, db_session: Session):
        """Updates this instace stages

        Args:
            stages (List[dict]): new stages json
            db_session (Session): LocalSession
        """
        self.copper_response = json.dumps(copper_response)
        self.expires_at = datetime.now() + timedelta(minutes = 30)
        db_session.commit()

    def delete_record(self, db_session: Session):
        """Deletes this instance

        Args:
            db_session (Session): Local session
        """
        db_session.delete(self)
        db_session.commit()

    @staticmethod
    def find_for_deal(deal_id: str, db_session: Session) -> Optional[CD]:
        """Finds a deal by copper deal id

        Args:
            copper_account_id (str): Copper CRM account id
            db_session (Session): api db session

        Returns:
            Optional[AccountPipelineStage]: Stored account stages
        """
        return db_session.query(CopperDeal).filter(
            CopperDeal.copper_deal_id == deal_id
        ).first()

    @staticmethod
    def find_for_entity(entity_id: str, entity_type: str, db_session: Session) -> List[CD]:
        """All deals related to a user

        Args:
            entity_id (str): copper entity id
            entity_type (str): copper entity type
            db_session (Session): local db session

        Returns:
            List[CD]: entity records
        """
        return db_session.query(CopperDeal).filter(and_(
            CopperDeal.copper_entity_id == entity_id,
            CopperDeal.copper_entity_type == entity_type
        )).all()

if not _CONFIG.config_scope.is_server:
    # Only create metadata on unit tests
    # As GC doesn't support this
    Base.metadata.create_all(engine)