Files
PSA-Gap-Analysis/collector.py

211 lines
5.5 KiB
Python

import os
import sys
import json
import logging
from datetime import datetime
from pathlib import Path
import pyodbc
import pymysql
from dotenv import load_dotenv
BASE_DIR = Path(__file__).resolve().parent
ENV_PATH = BASE_DIR / ".env.local"
QUERY_PATH = BASE_DIR / "query.sql"
LOG_DIR = BASE_DIR / "logs"
STATE_DIR = BASE_DIR / "state"
STATE_FILE = STATE_DIR / "last_run.json"
LOG_DIR.mkdir(exist_ok=True)
STATE_DIR.mkdir(exist_ok=True)
load_dotenv(ENV_PATH)
logging.basicConfig(
filename=LOG_DIR / "collector.log",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
console = logging.StreamHandler(sys.stdout)
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
logging.getLogger().addHandler(console)
def get_env(name: str, required: bool = True, default: str | None = None) -> str | None:
value = os.getenv(name, default)
if required and not value:
raise RuntimeError(f"Missing required environment variable: {name}")
return value
def connect_psa_sql():
server = get_env("PSA_SQL_SERVER")
database = get_env("PSA_SQL_DATABASE")
username = get_env("PSA_SQL_USERNAME")
password = get_env("PSA_SQL_PASSWORD")
driver = get_env("PSA_SQL_DRIVER", required=False, default="ODBC Driver 18 for SQL Server")
trust_cert = get_env("PSA_SQL_TRUST_CERT", required=False, default="yes")
conn_str = (
f"DRIVER={{{driver}}};"
f"SERVER={server};"
f"DATABASE={database};"
f"UID={username};"
f"PWD={password};"
f"TrustServerCertificate={trust_cert};"
)
return pyodbc.connect(conn_str)
def connect_mariadb():
return pymysql.connect(
host=get_env("MARIADB_HOST"),
port=int(get_env("MARIADB_PORT", required=False, default="3306")),
user=get_env("MARIADB_USERNAME"),
password=get_env("MARIADB_PASSWORD"),
database=get_env("MARIADB_DATABASE"),
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
autocommit=False,
)
def read_query() -> str:
if not QUERY_PATH.exists():
raise FileNotFoundError(f"Missing query file: {QUERY_PATH}")
return QUERY_PATH.read_text(encoding="utf-8")
def fetch_psa_rows():
query = read_query()
logging.info("Connecting to PSA SQL source")
with connect_psa_sql() as conn:
cursor = conn.cursor()
logging.info("Executing PSA query")
cursor.execute(query)
columns = [col[0] for col in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
logging.info("Fetched %s rows from PSA SQL source", len(rows))
return rows
def upsert_rows(rows):
if not rows:
logging.info("No rows to upsert")
return 0
sql = """
INSERT INTO psa_ticket_fact (
`id`,
`ticket_number`,
`company_name`,
`board`,
`summary`,
`status`,
`date_opened`,
`date_last_updated`,
`date_closed`,
`hours_actual`,
`hours_billable`,
`type`,
`subtype`,
`priority`,
`ticket_owner`,
`resolved_flag`,
`closed_flag`,
`collected_at`
)
VALUES (
%(id)s,
%(Ticket_Number)s,
%(Company_Name)s,
%(Board)s,
%(Summary)s,
%(Status)s,
%(date_opened)s,
%(date_last_updated)s,
%(date_closed)s,
%(Hours_Actual)s,
%(Hours_Billable)s,
%(Type)s,
%(SubType)s,
%(Priority)s,
%(Ticket_Owner)s,
%(Resolved_Flag)s,
%(Closed_Flag)s,
NOW()
)
ON DUPLICATE KEY UPDATE
`ticket_number` = VALUES(`ticket_number`),
`company_name` = VALUES(`company_name`),
`board` = VALUES(`board`),
`summary` = VALUES(`summary`),
`status` = VALUES(`status`),
`date_opened` = VALUES(`date_opened`),
`date_last_updated` = VALUES(`date_last_updated`),
`date_closed` = VALUES(`date_closed`),
`hours_actual` = VALUES(`hours_actual`),
`hours_billable` = VALUES(`hours_billable`),
`type` = VALUES(`type`),
`subtype` = VALUES(`subtype`),
`priority` = VALUES(`priority`),
`ticket_owner` = VALUES(`ticket_owner`),
`resolved_flag` = VALUES(`resolved_flag`),
`closed_flag` = VALUES(`closed_flag`),
`collected_at` = NOW();
"""
logging.info("Connecting to MariaDB destination")
with connect_mariadb() as conn:
try:
with conn.cursor() as cursor:
cursor.executemany(sql, rows)
conn.commit()
logging.info("Upserted %s rows into MariaDB", len(rows))
return len(rows)
except Exception:
conn.rollback()
raise
def write_state(rows_fetched: int, rows_upserted: int):
state = {
"last_successful_run": datetime.now().isoformat(),
"last_rows_fetched": rows_fetched,
"last_rows_upserted": rows_upserted,
}
STATE_FILE.write_text(json.dumps(state, indent=2), encoding="utf-8")
def main():
logging.info("Starting PSA gap analysis collector")
try:
rows = fetch_psa_rows()
upserted = upsert_rows(rows)
write_state(len(rows), upserted)
logging.info(
"Collector completed successfully. Rows fetched: %s. Rows upserted: %s",
len(rows),
upserted,
)
except Exception as ex:
logging.exception("Collector failed: %s", ex)
sys.exit(1)
if __name__ == "__main__":
main()