Automation Scripts

Production-grade Python automation for trading infrastructure

k8s_health_check.pyPython

Polls K8s API for pod health across trading namespaces. Auto-restarts pods in CrashLoopBackOff with configurable cooldown.

#!/usr/bin/env python3
"""
Kubernetes Pod Health Monitor โ€” trading namespaces
"""
import subprocess, json, time, logging
from dataclasses import dataclass

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)-8s %(message)s",
)
log = logging.getLogger(__name__)

NAMESPACES    = ["trading", "streaming", "monitoring"]
MAX_RESTARTS  = 3
CHECK_INTERVAL = 30  # seconds


@dataclass
class PodStatus:
    name: str
    namespace: str
    status: str
    restarts: int
    ready: bool


def get_pods(namespace: str) -> list[PodStatus]:
    out = subprocess.run(
        ["kubectl", "get", "pods", "-n", namespace, "-o", "json"],
        capture_output=True, text=True, check=True,
    ).stdout
    pods = []
    for item in json.loads(out)["items"]:
        containers = item["status"].get("containerStatuses", [])
        restarts = sum(c.get("restartCount", 0) for c in containers)
        ready    = all(c.get("ready", False) for c in containers)
        crash    = any(
            c.get("state", {}).get("waiting", {}).get("reason") == "CrashLoopBackOff"
            for c in containers
        )
        pods.append(PodStatus(
            name=item["metadata"]["name"],
            namespace=namespace,
            status="CrashLoopBackOff" if crash else item["status"].get("phase", "Unknown"),
            restarts=restarts,
            ready=ready,
        ))
    return pods


def restart_pod(name: str, namespace: str) -> None:
    log.warning("Restarting %s/%s", namespace, name)
    subprocess.run(
        ["kubectl", "delete", "pod", name, "-n", namespace, "--grace-period=0"],
        check=True,
    )


def check_health() -> None:
    for ns in NAMESPACES:
        for pod in get_pods(ns):
            if pod.status == "CrashLoopBackOff":
                log.error("CRASH: %s/%s (restarts=%d)", ns, pod.name, pod.restarts)
                if pod.restarts <= MAX_RESTARTS:
                    restart_pod(pod.name, ns)
                else:
                    log.error("Manual intervention required for %s", pod.name)


if __name__ == "__main__":
    log.info("K8s health monitor started (interval=%ds)", CHECK_INTERVAL)
    while True:
        try:
            check_health()
        except Exception as e:
            log.error("Check failed: %s", e)
        time.sleep(CHECK_INTERVAL)