Source code for pyprediktorutilities.dwh.dwh

import pyodbc
import logging
import pandas as pd
from typing import List, Any
from pydantic import validate_call

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())


[docs] class Dwh: """Access a PowerView Data Warehouse or other SQL databases. Args: url (str): The URL of the sql server database (str): The name of the database username (str): The username password (str): The password Attributes: connection (pyodbc.Connection): The connection object cursor (pyodbc.Cursor): The cursor object """ @validate_call def __init__( self, url: str, database: str, username: str, password: str, driver_index: int = -1, ) -> None: """Class initializer. Args: url (str): The URL of the sql server database (str): The name of the database username (str): The username password (str): The password """ self.url = url self.driver = "" self.cursor = None self.database = database self.username = username self.password = password self.connection = None self.connection_string_template = ( f"UID={self.username};" + f"PWD={self.password};" + "DRIVER={};" + f"SERVER={self.url};" + f"DATABASE={self.database};" + "TrustServerCertificate=yes;" ) self.__set_driver(driver_index) self.connection_string = self.connection_string_template.format(self.driver) self.connection_attempts = 3 def __enter__(self): self.__connect() return self @validate_call def __exit__(self, exc_type, exc_val, exc_tb): if self.connection is not None: self.__disconnect() """ Public """
[docs] @validate_call def fetch(self, query: str, to_dataframe: bool = False) -> List[Any]: """Execute the SQL query to get results from DWH and return the data. Use that method for getting data. That means that if you use SELECT or you'd like to call a stored procedure that returns one or more sets of data, that is the correct method to use. Use that method to GET. Args: query (str): The SQL query to execute. to_dataframe (bool): If True, return the results as a list of DataFrames. Returns: List[Any]: The results of the query. If DWH returns multiple data sets, this method is going to return a list of result sets (lists). If DWH returns a single data set, the method is going to return a list representing the single result set. If to_dataframe is True, the data inside each data set is going to be in DataFrame format. """ self.__connect() try: self.cursor.execute(query) data_sets = [] while True: columns = [col[0] for col in self.cursor.description] data_set = [dict(zip(columns, row)) for row in self.cursor.fetchall()] if to_dataframe: data_sets.append(pd.DataFrame(data_set, columns=columns)) else: data_sets.append(data_set) if not self.cursor.nextset(): break return data_sets if len(data_sets) > 1 else data_sets[0] except Exception as e: logging.error(f"Failed to fetch data: {e}") raise finally: self.__disconnect() # prevent from leaving open transactions in DWH
[docs] @validate_call def execute(self, query: str, *args, **kwargs) -> List[Any]: """Execute the SQL query and return the results. For instance, if we create a new record in DWH by calling a stored procedure returning the id of the inserted element or in our query we use `SELECT SCOPE_IDENTITY() AS LastInsertedId;`, the DWH is going to return data after executing our write request. Please note that here we expect a single result set. Therefore DWH is obligated to return only one data set and also we're obligated to construct our query according to this requirement. Use that method to CREATE, UPDATE, DELETE or execute business logic. To NOT use for GET. Args: query (str): The SQL query to execute. *args: Variable length argument list to pass to cursor.execute(). **kwargs: Arbitrary keyword arguments to pass to cursor.execute(). Returns: List[Any]: The results of the query. """ self.__connect() try: self.cursor.execute(query, *args, **kwargs) # Check if the cursor has a description attribute, indicating a result set if self.cursor.description: result = self.cursor.fetchall() else: result = [] self.__commit() return result except Exception as e: logging.error(f"Failed to execute query: {e}") raise finally: self.__disconnect() # prevent from leaving open transactions in DWH
""" Private - Driver """ @validate_call def __set_driver(self, driver_index: int) -> None: """Sets the driver for the database connection. Args: driver (int): Index of the driver in the list of available drivers. If the index is -1 or in general below 0, pyPrediktorMapClient is going to choose the driver for you. Raises: ValueError: If no valid driver is found. """ drivers = self.__get_list_of_available_and_supported_pyodbc_drivers() available_drivers = drivers["available"] supported_drivers = drivers["supported"] if not supported_drivers: raise ValueError("No supported ODBC drivers found.") if not available_drivers: raise Exception("Connection to the database cannot be established.") if driver_index < 0: self.driver = available_drivers[0] elif driver_index >= len(available_drivers): raise ValueError( f"Driver index {driver_index} is out of range. Please use " f"the __get_list_of_available_and_supported_pyodbc_drivers() method " f"to list all available drivers." ) else: self.driver = available_drivers[driver_index] @validate_call def __get_list_of_supported_pyodbc_drivers(self) -> List[Any]: return pyodbc.drivers() @validate_call def __get_list_of_available_and_supported_pyodbc_drivers( self, ) -> dict: available_drivers = [] supported_drivers = self.__get_list_of_supported_pyodbc_drivers() for driver in supported_drivers: try: connection_string_with_assigned_driver = ( self.connection_string_template.format(driver) ) pyodbc.connect(connection_string_with_assigned_driver, timeout=3) available_drivers.append(driver) except pyodbc.Error as err: logger.info(f"Driver {driver} could not connect: {err}") drivers = {"available": available_drivers, "supported": supported_drivers} return drivers """ Private - Connector & Disconnector """ @validate_call def __connect(self) -> None: """Establishes a connection to the database.""" if self.connection: return logging.info("Initiating connection to the database...") attempt = 0 while attempt < self.connection_attempts: try: self.connection = pyodbc.connect(self.connection_string) if self.connection: self.cursor = self.connection.cursor() logging.info(f"Connected to the database on attempt {attempt + 1}") return else: logging.info(f"Connection is None on attempt {attempt + 1}") raise pyodbc.Error("Failed to connect to the database") # Exceptions once thrown there is no point attempting except pyodbc.ProgrammingError as err: logger.error( f"Programming Error {err.args[0] if err.args else 'No code'}: {err.args[1] if len(err.args) > 1 else 'No message'}" ) logger.warning( "There seems to be a problem with your code. Please " "check your code and try again." ) raise except ( pyodbc.DataError, pyodbc.IntegrityError, pyodbc.NotSupportedError, ) as err: logger.error( f"{type(err).__name__} {err.args[0] if err.args else 'No code'}: {err.args[1] if len(err.args) > 1 else 'No message'}" ) raise # Exceptions when thrown we can continue attempting except pyodbc.OperationalError as err: logger.error( f"Operational Error: {err.args[0] if err.args else 'No code'}: {err.args[1] if len(err.args) > 1 else 'No message'}" ) logger.warning( "Pyodbc is having issues with the connection. This " "could be due to the wrong driver being used. Please " "check your driver with " "the __get_list_of_available_and_supported_pyodbc_drivers() method " "and try again." ) attempt += 1 if self.__are_connection_attempts_reached(attempt): break except (pyodbc.DatabaseError, pyodbc.Error) as err: logger.error( f"{type(err).__name__} {err.args[0] if err.args else 'No code'}: {err.args[1] if len(err.args) > 1 else 'No message'}" ) attempt += 1 if self.__are_connection_attempts_reached(attempt): break if not self.connection: raise pyodbc.Error("Failed to connect to the database") @validate_call def __are_connection_attempts_reached(self, attempt) -> bool: if attempt != self.connection_attempts: logger.warning("Retrying connection...") return False logger.error( f"Failed to connect to the DataWarehouse after " f"{self.connection_attempts} attempts." ) return True @validate_call def __disconnect(self) -> None: """Closes the connection to the database.""" if self.connection: self.connection.close() self.cursor = None self.connection = None """ Private - Low level database operations """ @validate_call def __commit(self) -> None: """Commits any changes to the database.""" self.connection.commit()