211 lines
5.5 KiB
Python
211 lines
5.5 KiB
Python
import os
|
|
import sys
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import pyodbc
|
|
import pymysql
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
ENV_PATH = BASE_DIR / ".env.local"
|
|
QUERY_PATH = BASE_DIR / "query.sql"
|
|
LOG_DIR = BASE_DIR / "logs"
|
|
STATE_DIR = BASE_DIR / "state"
|
|
STATE_FILE = STATE_DIR / "last_run.json"
|
|
|
|
LOG_DIR.mkdir(exist_ok=True)
|
|
STATE_DIR.mkdir(exist_ok=True)
|
|
|
|
load_dotenv(ENV_PATH)
|
|
|
|
|
|
logging.basicConfig(
|
|
filename=LOG_DIR / "collector.log",
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
)
|
|
|
|
console = logging.StreamHandler(sys.stdout)
|
|
console.setLevel(logging.INFO)
|
|
console.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
|
|
logging.getLogger().addHandler(console)
|
|
|
|
|
|
def get_env(name: str, required: bool = True, default: str | None = None) -> str | None:
|
|
value = os.getenv(name, default)
|
|
if required and not value:
|
|
raise RuntimeError(f"Missing required environment variable: {name}")
|
|
return value
|
|
|
|
|
|
def connect_psa_sql():
|
|
server = get_env("PSA_SQL_SERVER")
|
|
database = get_env("PSA_SQL_DATABASE")
|
|
username = get_env("PSA_SQL_USERNAME")
|
|
password = get_env("PSA_SQL_PASSWORD")
|
|
driver = get_env("PSA_SQL_DRIVER", required=False, default="ODBC Driver 18 for SQL Server")
|
|
trust_cert = get_env("PSA_SQL_TRUST_CERT", required=False, default="yes")
|
|
|
|
conn_str = (
|
|
f"DRIVER={{{driver}}};"
|
|
f"SERVER={server};"
|
|
f"DATABASE={database};"
|
|
f"UID={username};"
|
|
f"PWD={password};"
|
|
f"TrustServerCertificate={trust_cert};"
|
|
)
|
|
|
|
return pyodbc.connect(conn_str)
|
|
|
|
|
|
def connect_mariadb():
|
|
return pymysql.connect(
|
|
host=get_env("MARIADB_HOST"),
|
|
port=int(get_env("MARIADB_PORT", required=False, default="3306")),
|
|
user=get_env("MARIADB_USERNAME"),
|
|
password=get_env("MARIADB_PASSWORD"),
|
|
database=get_env("MARIADB_DATABASE"),
|
|
charset="utf8mb4",
|
|
cursorclass=pymysql.cursors.DictCursor,
|
|
autocommit=False,
|
|
)
|
|
|
|
|
|
def read_query() -> str:
|
|
if not QUERY_PATH.exists():
|
|
raise FileNotFoundError(f"Missing query file: {QUERY_PATH}")
|
|
return QUERY_PATH.read_text(encoding="utf-8")
|
|
|
|
|
|
def fetch_psa_rows():
|
|
query = read_query()
|
|
|
|
logging.info("Connecting to PSA SQL source")
|
|
with connect_psa_sql() as conn:
|
|
cursor = conn.cursor()
|
|
logging.info("Executing PSA query")
|
|
cursor.execute(query)
|
|
|
|
columns = [col[0] for col in cursor.description]
|
|
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
|
|
|
|
logging.info("Fetched %s rows from PSA SQL source", len(rows))
|
|
return rows
|
|
|
|
|
|
def upsert_rows(rows):
|
|
if not rows:
|
|
logging.info("No rows to upsert")
|
|
return 0
|
|
|
|
sql = """
|
|
INSERT INTO psa_ticket_fact (
|
|
`id`,
|
|
`ticket_number`,
|
|
`company_name`,
|
|
`board`,
|
|
`summary`,
|
|
`status`,
|
|
`date_opened`,
|
|
`date_last_updated`,
|
|
`date_closed`,
|
|
`hours_actual`,
|
|
`hours_billable`,
|
|
`type`,
|
|
`subtype`,
|
|
`priority`,
|
|
`ticket_owner`,
|
|
`resolved_flag`,
|
|
`closed_flag`,
|
|
`collected_at`
|
|
)
|
|
VALUES (
|
|
%(id)s,
|
|
%(Ticket_Number)s,
|
|
%(Company_Name)s,
|
|
%(Board)s,
|
|
%(Summary)s,
|
|
%(Status)s,
|
|
%(date_opened)s,
|
|
%(date_last_updated)s,
|
|
%(date_closed)s,
|
|
%(Hours_Actual)s,
|
|
%(Hours_Billable)s,
|
|
%(Type)s,
|
|
%(SubType)s,
|
|
%(Priority)s,
|
|
%(Ticket_Owner)s,
|
|
%(Resolved_Flag)s,
|
|
%(Closed_Flag)s,
|
|
NOW()
|
|
)
|
|
ON DUPLICATE KEY UPDATE
|
|
`ticket_number` = VALUES(`ticket_number`),
|
|
`company_name` = VALUES(`company_name`),
|
|
`board` = VALUES(`board`),
|
|
`summary` = VALUES(`summary`),
|
|
`status` = VALUES(`status`),
|
|
`date_opened` = VALUES(`date_opened`),
|
|
`date_last_updated` = VALUES(`date_last_updated`),
|
|
`date_closed` = VALUES(`date_closed`),
|
|
`hours_actual` = VALUES(`hours_actual`),
|
|
`hours_billable` = VALUES(`hours_billable`),
|
|
`type` = VALUES(`type`),
|
|
`subtype` = VALUES(`subtype`),
|
|
`priority` = VALUES(`priority`),
|
|
`ticket_owner` = VALUES(`ticket_owner`),
|
|
`resolved_flag` = VALUES(`resolved_flag`),
|
|
`closed_flag` = VALUES(`closed_flag`),
|
|
`collected_at` = NOW();
|
|
"""
|
|
|
|
logging.info("Connecting to MariaDB destination")
|
|
|
|
with connect_mariadb() as conn:
|
|
try:
|
|
with conn.cursor() as cursor:
|
|
cursor.executemany(sql, rows)
|
|
|
|
conn.commit()
|
|
logging.info("Upserted %s rows into MariaDB", len(rows))
|
|
return len(rows)
|
|
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
|
|
def write_state(rows_fetched: int, rows_upserted: int):
|
|
state = {
|
|
"last_successful_run": datetime.now().isoformat(),
|
|
"last_rows_fetched": rows_fetched,
|
|
"last_rows_upserted": rows_upserted,
|
|
}
|
|
|
|
STATE_FILE.write_text(json.dumps(state, indent=2), encoding="utf-8")
|
|
|
|
|
|
def main():
|
|
logging.info("Starting PSA gap analysis collector")
|
|
|
|
try:
|
|
rows = fetch_psa_rows()
|
|
upserted = upsert_rows(rows)
|
|
write_state(len(rows), upserted)
|
|
|
|
logging.info(
|
|
"Collector completed successfully. Rows fetched: %s. Rows upserted: %s",
|
|
len(rows),
|
|
upserted,
|
|
)
|
|
|
|
except Exception as ex:
|
|
logging.exception("Collector failed: %s", ex)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |