Automation Scripts
Production-grade Python automation for trading infrastructure
k8s_health_check.pyPython
Polls K8s API for pod health across trading namespaces. Auto-restarts pods in CrashLoopBackOff with configurable cooldown.
#!/usr/bin/env python3
"""
Kubernetes Pod Health Monitor โ trading namespaces
"""
import subprocess, json, time, logging
from dataclasses import dataclass
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
)
log = logging.getLogger(__name__)
NAMESPACES = ["trading", "streaming", "monitoring"]
MAX_RESTARTS = 3
CHECK_INTERVAL = 30 # seconds
@dataclass
class PodStatus:
name: str
namespace: str
status: str
restarts: int
ready: bool
def get_pods(namespace: str) -> list[PodStatus]:
out = subprocess.run(
["kubectl", "get", "pods", "-n", namespace, "-o", "json"],
capture_output=True, text=True, check=True,
).stdout
pods = []
for item in json.loads(out)["items"]:
containers = item["status"].get("containerStatuses", [])
restarts = sum(c.get("restartCount", 0) for c in containers)
ready = all(c.get("ready", False) for c in containers)
crash = any(
c.get("state", {}).get("waiting", {}).get("reason") == "CrashLoopBackOff"
for c in containers
)
pods.append(PodStatus(
name=item["metadata"]["name"],
namespace=namespace,
status="CrashLoopBackOff" if crash else item["status"].get("phase", "Unknown"),
restarts=restarts,
ready=ready,
))
return pods
def restart_pod(name: str, namespace: str) -> None:
log.warning("Restarting %s/%s", namespace, name)
subprocess.run(
["kubectl", "delete", "pod", name, "-n", namespace, "--grace-period=0"],
check=True,
)
def check_health() -> None:
for ns in NAMESPACES:
for pod in get_pods(ns):
if pod.status == "CrashLoopBackOff":
log.error("CRASH: %s/%s (restarts=%d)", ns, pod.name, pod.restarts)
if pod.restarts <= MAX_RESTARTS:
restart_pod(pod.name, ns)
else:
log.error("Manual intervention required for %s", pod.name)
if __name__ == "__main__":
log.info("K8s health monitor started (interval=%ds)", CHECK_INTERVAL)
while True:
try:
check_health()
except Exception as e:
log.error("Check failed: %s", e)
time.sleep(CHECK_INTERVAL)