import os import sys import json import logging from datetime import datetime from pathlib import Path import pyodbc import pymysql from dotenv import load_dotenv BASE_DIR = Path(__file__).resolve().parent ENV_PATH = BASE_DIR / ".env.local" QUERY_PATH = BASE_DIR / "query.sql" LOG_DIR = BASE_DIR / "logs" STATE_DIR = BASE_DIR / "state" STATE_FILE = STATE_DIR / "last_run.json" LOG_DIR.mkdir(exist_ok=True) STATE_DIR.mkdir(exist_ok=True) load_dotenv(ENV_PATH) logging.basicConfig( filename=LOG_DIR / "collector.log", level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) console = logging.StreamHandler(sys.stdout) console.setLevel(logging.INFO) console.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) logging.getLogger().addHandler(console) def get_env(name: str, required: bool = True, default: str | None = None) -> str | None: value = os.getenv(name, default) if required and not value: raise RuntimeError(f"Missing required environment variable: {name}") return value def connect_psa_sql(): server = get_env("PSA_SQL_SERVER") database = get_env("PSA_SQL_DATABASE") username = get_env("PSA_SQL_USERNAME") password = get_env("PSA_SQL_PASSWORD") driver = get_env("PSA_SQL_DRIVER", required=False, default="ODBC Driver 18 for SQL Server") trust_cert = get_env("PSA_SQL_TRUST_CERT", required=False, default="yes") conn_str = ( f"DRIVER={{{driver}}};" f"SERVER={server};" f"DATABASE={database};" f"UID={username};" f"PWD={password};" f"TrustServerCertificate={trust_cert};" ) return pyodbc.connect(conn_str) def connect_mariadb(): return pymysql.connect( host=get_env("MARIADB_HOST"), port=int(get_env("MARIADB_PORT", required=False, default="3306")), user=get_env("MARIADB_USERNAME"), password=get_env("MARIADB_PASSWORD"), database=get_env("MARIADB_DATABASE"), charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, autocommit=False, ) def read_query() -> str: if not QUERY_PATH.exists(): raise FileNotFoundError(f"Missing query file: {QUERY_PATH}") return QUERY_PATH.read_text(encoding="utf-8") def fetch_psa_rows(): query = read_query() logging.info("Connecting to PSA SQL source") with connect_psa_sql() as conn: cursor = conn.cursor() logging.info("Executing PSA query") cursor.execute(query) columns = [col[0] for col in cursor.description] rows = [dict(zip(columns, row)) for row in cursor.fetchall()] logging.info("Fetched %s rows from PSA SQL source", len(rows)) return rows def upsert_rows(rows): if not rows: logging.info("No rows to upsert") return 0 sql = """ INSERT INTO psa_ticket_fact ( `id`, `ticket_number`, `company_name`, `board`, `summary`, `status`, `date_opened`, `date_last_updated`, `date_closed`, `hours_actual`, `hours_billable`, `type`, `subtype`, `priority`, `ticket_owner`, `resolved_flag`, `closed_flag`, `collected_at` ) VALUES ( %(id)s, %(Ticket_Number)s, %(Company_Name)s, %(Board)s, %(Summary)s, %(Status)s, %(date_opened)s, %(date_last_updated)s, %(date_closed)s, %(Hours_Actual)s, %(Hours_Billable)s, %(Type)s, %(SubType)s, %(Priority)s, %(Ticket_Owner)s, %(Resolved_Flag)s, %(Closed_Flag)s, NOW() ) ON DUPLICATE KEY UPDATE `ticket_number` = VALUES(`ticket_number`), `company_name` = VALUES(`company_name`), `board` = VALUES(`board`), `summary` = VALUES(`summary`), `status` = VALUES(`status`), `date_opened` = VALUES(`date_opened`), `date_last_updated` = VALUES(`date_last_updated`), `date_closed` = VALUES(`date_closed`), `hours_actual` = VALUES(`hours_actual`), `hours_billable` = VALUES(`hours_billable`), `type` = VALUES(`type`), `subtype` = VALUES(`subtype`), `priority` = VALUES(`priority`), `ticket_owner` = VALUES(`ticket_owner`), `resolved_flag` = VALUES(`resolved_flag`), `closed_flag` = VALUES(`closed_flag`), `collected_at` = NOW(); """ logging.info("Connecting to MariaDB destination") with connect_mariadb() as conn: try: with conn.cursor() as cursor: cursor.executemany(sql, rows) conn.commit() logging.info("Upserted %s rows into MariaDB", len(rows)) return len(rows) except Exception: conn.rollback() raise def write_state(rows_fetched: int, rows_upserted: int): state = { "last_successful_run": datetime.now().isoformat(), "last_rows_fetched": rows_fetched, "last_rows_upserted": rows_upserted, } STATE_FILE.write_text(json.dumps(state, indent=2), encoding="utf-8") def main(): logging.info("Starting PSA gap analysis collector") try: rows = fetch_psa_rows() upserted = upsert_rows(rows) write_state(len(rows), upserted) logging.info( "Collector completed successfully. Rows fetched: %s. Rows upserted: %s", len(rows), upserted, ) except Exception as ex: logging.exception("Collector failed: %s", ex) sys.exit(1) if __name__ == "__main__": main()