#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
##
# rfxn-local-check.py
#             (C) 2026, rfxn.com - forged in prod - <ryan@rfxn.com>
# This program may be freely redistributed under the terms of the GNU GPL v2
##
#
"""
rfxn-local-check.py - comprehensive Copy Fail bug-class auditor.

Covers the full cf-class:
  cf1 (CVE-2026-31431) - algif_aead AEAD scratch-write
  cf2 ("Electric Boogaloo") - xfrm-ESP skip_cow path
  Dirty Frag - xfrm-ESP and RxRPC pcbc(fcrypt) on splice'd frag

Five scoring layers (ENV, KERNEL, MITIGATION, HARDENING, DETECTION) suitable
for fleet auditing across RHEL/CentOS/Alma/Rocky 8-10 (and Debian/Ubuntu
where applicable - apparmor userns posture is detected).

SAFE BY DESIGN
  - Only writes to mkdtemp() sentinel files; never touches /usr/bin or /etc.
  - Page-cache reads only; never modifies any system file's contents.
  - Runs unprivileged (some checks degrade gracefully without root).
  - Trigger probe targets a freshly-created sentinel, not /usr/bin/su.

USAGE
  ./rfxn-local-check.py                       # human-readable
  ./rfxn-local-check.py --json                # SIEM ingestion (+ posture)
  ./rfxn-local-check.py --verbose             # show passing checks
  ./rfxn-local-check.py --skip-trigger        # skip AF_ALG probe
  ./rfxn-local-check.py --skip-hardening      # skip suid/page-cache audit
  ./rfxn-local-check.py --category KERNEL,MITIGATION
  ./rfxn-local-check.py --emit-remediation    # bash-script of fixes

EXIT CODES
  0 - clean (no vulnerability, mitigations adequate)
  1 - test framework error
  2 - VULNERABLE (trigger probe confirmed, no userspace mitigation)
  3 - vulnerable kernel but at least one userspace mitigation active
  4 - mitigation/hardening gaps (not actively exploitable as observed)

JSON OUTPUT (--json)
  Includes posture.bug_classes_covered (array of mitigated class IDs for
  SIEM filtering) and posture.bug_classes (per-class boolean map for
  finer-grained dashboards). See README.md for schema.
"""

import argparse
import ctypes
import ctypes.util
import errno
import glob
import hashlib
import json
import os
import re
import socket
import stat
import struct
import subprocess
import sys
import tempfile
import time
from typing import Any, Callable, Dict, List, Optional, Tuple

__version__ = "3.0.2"

# --- splice(2) wrapper ----------------------------------------------------
# os.splice was added in Python 3.10. EL7/8/9 default Pythons are 3.6/3.6/3.9
# so we need a ctypes fallback to actually run the trigger probe across the
# fleet. This calls splice(2) via libc directly.

_libc = None
def _get_libc():
    global _libc
    if _libc is None:
        path = ctypes.util.find_library("c") or "libc.so.6"
        _libc = ctypes.CDLL(path, use_errno=True)
        _libc.splice.argtypes = [
            ctypes.c_int, ctypes.POINTER(ctypes.c_longlong),
            ctypes.c_int, ctypes.POINTER(ctypes.c_longlong),
            ctypes.c_size_t, ctypes.c_uint,
        ]
        _libc.splice.restype = ctypes.c_ssize_t
    return _libc

def do_splice(fd_in, fd_out, length, offset_src=None):
    """Cross-Python splice() wrapper. Uses os.splice (3.10+) or libc fallback."""
    if hasattr(os, "splice"):
        if offset_src is not None:
            return os.splice(fd_in, fd_out, length, offset_src=offset_src)
        return os.splice(fd_in, fd_out, length)
    libc = _get_libc()
    off_ptr = (ctypes.byref(ctypes.c_longlong(offset_src))
               if offset_src is not None else None)
    n = libc.splice(fd_in, off_ptr, fd_out, None, length, 0)
    if n < 0:
        err = ctypes.get_errno()
        raise OSError(err, os.strerror(err))
    return n

# --- Constants -------------------------------------------------------------

AF_ALG                    = 38
SOL_ALG                   = 279
ALG_SET_KEY               = 1
ALG_SET_IV                = 2
ALG_SET_OP                = 3
ALG_SET_AEAD_ASSOCLEN     = 4
ALG_OP_DECRYPT            = 0
CRYPTO_AUTHENC_KEYA_PARAM = 1
ALG_NAME = "authencesn(hmac(sha256),cbc(aes))"
PAGE     = 4096
ASSOCLEN = 8
CRYPTLEN = 16
TAGLEN   = 16
MARKER   = b"PWND"

# Privilege-sensitive files where page-cache corruption = privesc.
# v2.0.0: extended for cf2/dirtyfrag-ESP target /usr/bin/su (Theori PoC
# overwrites first 192 bytes with a static root-shell ELF) and the PAM
# stacks dirtyfrag-RxRPC manipulates (nullok auth bypass after
# /etc/passwd corruption). Per Sysdig + Ventura Systems deep-dives, the
# dynamic linker and ld.so.preload are also viable cf-class targets.
PRIV_CONFIG_FILES = [
    "/etc/passwd",
    "/etc/group",
    "/etc/sudoers",
    "/etc/security/access.conf",
    "/etc/pam.d/su",
    "/etc/pam.d/sshd",
    "/etc/pam.d/login",
    "/etc/pam.d/system-auth",
    "/etc/pam.d/password-auth",
    "/etc/pam.d/common-auth",
    "/etc/nsswitch.conf",
    "/etc/ssh/sshd_config",
    "/etc/ld.so.preload",
    "/usr/bin/su",
    "/lib64/ld-linux-x86-64.so.2",
]
PRIV_CONFIG_ROOT_FILES = ["/etc/shadow", "/etc/gshadow"]

AUTO_DETECT_PATH = "/var/lib/rfxn-defense/auto-detect.json"
AUTO_DETECT_SCHEMA_VERSION = "2"

# v2.0.0: cf-class kernel-module entry-points the modprobe subpackage
# should cover. Used by check_modprobe_blacklist_extended.
CF_CLASS_MODULES = [
    "algif_aead", "authenc", "authencesn", "af_alg",      # cf1
    "esp4", "esp6", "xfrm_user", "xfrm_algo",             # cf2 / dirtyfrag-ESP
    "rxrpc",                                              # dirtyfrag-RxRPC
]

# v2.0.0: tenant-facing systemd units that should carry the cf-class
# RestrictAddressFamilies + RestrictNamespaces drop-in.
CF_CLASS_TENANT_UNITS = ["sshd", "user@", "cron", "crond", "atd"]
# Optional opt-in units (container runtimes, INFO not WARN if missing).
CF_CLASS_OPTIONAL_UNITS = ["containerd", "docker", "podman"]

# --- Color/output ----------------------------------------------------------

class C:
    RESET   = "\033[0m"
    BOLD    = "\033[1m"
    DIM     = "\033[2m"
    RED     = "\033[31m"
    GREEN   = "\033[32m"
    YELLOW  = "\033[33m"
    BLUE    = "\033[34m"
    MAGENTA = "\033[35m"
    CYAN    = "\033[36m"

USE_COLOR = sys.stdout.isatty() and os.environ.get("NO_COLOR") is None

def colorize(s: str, color: str) -> str:
    return color + s + C.RESET if USE_COLOR else s

# --- Progress emitter -----------------------------------------------------
# Some checks (trigger probe, getcap -r, page-cache hashing) can take several
# seconds. Without progress output the script looks hung. We emit progress to
# stderr so JSON output on stdout stays clean and machine-parseable.

class Progress:
    def __init__(self, enabled=True):
        self.enabled = enabled and sys.stderr.isatty() and \
                       os.environ.get("NO_COLOR") is None
        self.plain = enabled and not self.enabled  # non-tty: plain stderr lines
        self.start = time.monotonic()
        self.current = None

    def step(self, label):
        """Begin a labeled step. Updates the in-place progress line on TTY,
        emits a plain line on non-TTY, no-op when disabled."""
        self.current = label
        elapsed = time.monotonic() - self.start
        if self.enabled:
            # Carriage-return overwrite on TTY
            sys.stderr.write("\r\033[K[{:5.1f}s] {}".format(elapsed, label))
            sys.stderr.flush()
        elif self.plain:
            sys.stderr.write("[{:5.1f}s] {}\n".format(elapsed, label))
            sys.stderr.flush()

    def done(self):
        """Clear the progress line at the end."""
        if self.enabled:
            sys.stderr.write("\r\033[K")
            sys.stderr.flush()

PROGRESS = Progress(enabled=False)  # set up properly in main()

# --- Result type -----------------------------------------------------------

class Status:
    OK    = "ok"
    WARN  = "warn"
    FAIL  = "fail"
    VULN  = "vulnerable"
    SKIP  = "skip"
    ERROR = "error"
    INFO  = "info"

STATUS_GLYPH = {
    Status.OK:    ("[+]", C.GREEN),
    Status.WARN:  ("[!]", C.YELLOW),
    Status.FAIL:  ("[-]", C.RED),
    Status.VULN:  ("[X]", C.RED),
    Status.SKIP:  ("[~]", C.DIM),
    Status.ERROR: ("[?]", C.MAGENTA),
    Status.INFO:  ("[i]", C.BLUE),
}

class Check:
    def __init__(self, name, category, status, message,
                 details=None, remediation=None):
        self.name = name
        self.category = category
        self.status = status
        self.message = message
        self.details = details or {}
        self.remediation = remediation

    def to_dict(self):
        d = {"name": self.name, "category": self.category,
             "status": self.status, "message": self.message}
        if self.details:
            d["details"] = self.details
        if self.remediation:
            d["remediation"] = self.remediation
        return d

    def render(self, verbose=False):
        if not verbose and self.status == Status.OK:
            return None
        glyph, color = STATUS_GLYPH[self.status]
        line = "{} [{}] {}: {}".format(
            colorize(glyph, color), self.category, self.name, self.message)
        if verbose and self.remediation and \
                self.status not in (Status.OK, Status.INFO):
            line += "\n    " + colorize("→ " + self.remediation, C.CYAN)
        return line

# --- Helpers ---------------------------------------------------------------

def run_cmd(cmd, timeout=5):
    """Run command, return (rc, stdout, stderr) - never raises.
    Uses explicit PIPE rather than capture_output for Python 3.6 compat (EL7)."""
    try:
        p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                           timeout=timeout)
        return p.returncode, p.stdout, p.stderr
    except FileNotFoundError:
        return -1, b"", b"command not found"
    except subprocess.TimeoutExpired:
        return -1, b"", b"timeout"
    except Exception as e:
        return -1, b"", str(e).encode()

def read_file_safe(path, max_bytes=65536):
    try:
        with open(path, "rb") as f:
            return f.read(max_bytes)
    except (IOError, OSError):
        return None

def read_text_safe(path, max_bytes=65536):
    data = read_file_safe(path, max_bytes)
    if data is None:
        return None
    try:
        return data.decode("utf-8", errors="replace")
    except Exception:
        return None

def is_root():
    return os.geteuid() == 0

_SYSTEMD_RUNNING = None
def systemd_running():
    global _SYSTEMD_RUNNING
    if _SYSTEMD_RUNNING is None:
        _SYSTEMD_RUNNING = os.path.exists("/run/systemd/system")
    return _SYSTEMD_RUNNING

_SYSTEMD_VERSION = None
def systemd_version():
    """Return integer systemd version, or None if not detectable.

    `systemctl --version` first line is e.g. 'systemd 219' (EL7) or
    'systemd 252 (252.34-1.el9_5)' (EL9). RestrictNamespaces= was
    introduced in v235; older systemd silently ignores the directive
    at unit-start time."""
    global _SYSTEMD_VERSION
    if _SYSTEMD_VERSION is not None:
        return _SYSTEMD_VERSION if _SYSTEMD_VERSION >= 0 else None
    rc, out, _err = run_cmd(["systemctl", "--version"], timeout=3)
    if rc != 0:
        _SYSTEMD_VERSION = -1
        return None
    first = out.decode("utf-8", errors="replace").splitlines()[:1]
    if not first:
        _SYSTEMD_VERSION = -1
        return None
    m = re.match(r"^\s*systemd\s+(\d+)", first[0])
    if not m:
        _SYSTEMD_VERSION = -1
        return None
    _SYSTEMD_VERSION = int(m.group(1))
    return _SYSTEMD_VERSION

def _algif_aead_state():
    """Returns one of: 'builtin', 'loaded_module', 'absent'.
    'absent' means either not built at all, or built as a module that hasn't
    been loaded yet - both cases mean the kernel could load it on AF_ALG
    socket creation if it's a module."""
    if not os.path.exists("/sys/module/algif_aead"):
        return "absent"
    modules_text = read_text_safe("/proc/modules") or ""
    is_module = any(line.startswith("algif_aead ")
                    for line in modules_text.splitlines())
    return "loaded_module" if is_module else "builtin"

# --- Environment checks ----------------------------------------------------

def check_environment():
    out = []
    uname = os.uname()
    out.append(Check(
        "kernel_info", "ENV", Status.INFO,
        "{} {} {}".format(uname.sysname, uname.release, uname.machine),
        details={"sysname": uname.sysname, "release": uname.release,
                 "machine": uname.machine, "nodename": uname.nodename},
    ))

    distro, distro_ver = "unknown", "unknown"
    osr = read_text_safe("/etc/os-release")
    if osr:
        for line in osr.splitlines():
            if line.startswith("ID="):
                distro = line.split("=", 1)[1].strip().strip('"')
            elif line.startswith("VERSION_ID="):
                distro_ver = line.split("=", 1)[1].strip().strip('"')
    out.append(Check(
        "distro_info", "ENV", Status.INFO,
        "{} {}".format(distro, distro_ver),
        details={"id": distro, "version_id": distro_ver},
    ))
    out.append(Check(
        "privilege", "ENV", Status.INFO,
        "uid={} euid={}{}".format(
            os.getuid(), os.geteuid(),
            "" if is_root() else "  (some checks limited without root)"),
        details={"uid": os.getuid(), "euid": os.geteuid()},
    ))
    return out

# --- Kernel vulnerability probes -------------------------------------------

def check_af_alg_socket():
    """AF_ALG family reachable to caller?"""
    try:
        s = socket.socket(AF_ALG, socket.SOCK_SEQPACKET, 0)
        s.close()
        return Check(
            "af_alg_socket", "KERNEL", Status.WARN,
            "AF_ALG socket family is reachable",
            remediation="Block via LD_PRELOAD shim, seccomp filter, "
                        "modprobe blacklist (effective when algif_aead is a "
                        "loadable module - not when builtin), or "
                        "kernel.modules_disabled=1.",
        )
    except OSError as e:
        return Check(
            "af_alg_socket", "KERNEL", Status.OK,
            "AF_ALG unreachable: {}".format(e.strerror or str(e)),
            details={"errno": e.errno},
        )

def check_authencesn_cipher():
    """Vulnerable cipher loadable?"""
    try:
        s = socket.socket(AF_ALG, socket.SOCK_SEQPACKET, 0)
        try:
            s.bind(("aead", ALG_NAME))
            return Check(
                "authencesn_cipher", "KERNEL", Status.WARN,
                "Vulnerable cipher loadable: {}".format(ALG_NAME),
            )
        finally:
            s.close()
    except OSError as e:
        return Check(
            "authencesn_cipher", "KERNEL", Status.OK,
            "Cipher unavailable: {}".format(e.strerror or str(e)),
            details={"errno": e.errno},
        )

def check_algif_aead_state():
    """Module loaded, builtin, or absent?"""
    state = _algif_aead_state()
    if state == "absent":
        return Check(
            "algif_aead_state", "KERNEL", Status.OK,
            "algif_aead absent from kernel (not built or module not loaded)",
            details={"state": "absent"},
        )
    if state == "loaded_module":
        return Check(
            "algif_aead_state", "KERNEL", Status.WARN,
            "algif_aead loaded as kernel module (can be unloaded with rmmod)",
            details={"state": "loaded_module"},
            remediation="rmmod algif_aead && add 'install algif_aead /bin/false' "
                        "to /etc/modprobe.d/ to block reload.",
        )
    # builtin
    return Check(
        "algif_aead_state", "KERNEL", Status.WARN,
        "algif_aead built into kernel (cannot be unloaded - common on RHEL "
        "and other distros that compile crypto user-API in)",
        details={"state": "builtin"},
        remediation="No module-level mitigation possible on this kernel; use "
                    "LD_PRELOAD shim, seccomp, or kernel patch.",
    )

def _build_keyblob(authkey, enckey):
    rtattr = struct.pack("HH", 8, CRYPTO_AUTHENC_KEYA_PARAM)
    keyparam = struct.pack(">I", len(enckey))
    return rtattr + keyparam + authkey + enckey

def trigger_probe():
    """rootsecdev-style sentinel-file trigger; safe by design."""
    tmp = tempfile.mkdtemp(prefix="rfxn-defense-")
    target = os.path.join(tmp, "sentinel.bin")
    try:
        sentinel = (b"COPYFAIL-SENTINEL-UNCORRUPTED!!\n" * (PAGE // 32))[:PAGE]
        with open(target, "wb") as f:
            f.write(sentinel)
        fd = os.open(target, os.O_RDONLY)
        try:
            os.read(fd, PAGE)
            os.lseek(fd, 0, os.SEEK_SET)

            try:
                master = socket.socket(AF_ALG, socket.SOCK_SEQPACKET, 0)
            except OSError as e:
                return Check("trigger_probe", "KERNEL", Status.OK,
                             "AF_ALG unreachable, trigger inert: {}".format(
                                 e.strerror))
            try:
                try:
                    master.bind(("aead", ALG_NAME))
                except OSError as e:
                    return Check("trigger_probe", "KERNEL", Status.OK,
                                 "Cipher unavailable, trigger inert: {}".format(
                                     e.strerror))
                master.setsockopt(SOL_ALG, ALG_SET_KEY,
                                  _build_keyblob(b"\x00"*32, b"\x00"*16))
                op, _ = master.accept()
                try:
                    aad = b"\x00"*4 + MARKER
                    cmsg = [
                        (SOL_ALG, ALG_SET_OP,
                         struct.pack("I", ALG_OP_DECRYPT)),
                        (SOL_ALG, ALG_SET_IV,
                         struct.pack("I", 16) + b"\x00"*16),
                        (SOL_ALG, ALG_SET_AEAD_ASSOCLEN,
                         struct.pack("I", ASSOCLEN)),
                    ]
                    op.sendmsg([aad], cmsg, socket.MSG_MORE)

                    pr, pw = os.pipe()
                    try:
                        try:
                            n = do_splice(fd, pw, CRYPTLEN+TAGLEN, offset_src=0)
                            if n != CRYPTLEN+TAGLEN:
                                return Check("trigger_probe", "KERNEL",
                                             Status.ERROR,
                                             "splice file->pipe short: {}".format(n))
                            n = do_splice(pr, op.fileno(), n)
                            if n != CRYPTLEN+TAGLEN:
                                return Check("trigger_probe", "KERNEL",
                                             Status.ERROR,
                                             "splice pipe->op short: {}".format(n))
                        except OSError as e:
                            if e.errno in (errno.EOPNOTSUPP, errno.ENOTSUP):
                                return Check("trigger_probe", "KERNEL", Status.OK,
                                             "splice into AF_ALG unsupported on "
                                             "this kernel - vector unreachable",
                                             details={"errno": e.errno})
                            return Check("trigger_probe", "KERNEL", Status.ERROR,
                                         "splice failed: {}".format(e.strerror))
                    finally:
                        os.close(pr)
                        os.close(pw)

                    try:
                        op.recv(ASSOCLEN+CRYPTLEN+TAGLEN)
                    except OSError as e:
                        if e.errno not in (errno.EBADMSG, errno.EINVAL):
                            return Check("trigger_probe", "KERNEL", Status.ERROR,
                                         "AEAD recv failed: {}".format(e.strerror))
                finally:
                    op.close()
            finally:
                master.close()

            os.lseek(fd, 0, os.SEEK_SET)
            after = os.read(fd, PAGE)
        finally:
            os.close(fd)

        marker_off = after.find(MARKER)
        marker_orig = sentinel.find(MARKER)
        diffs = sum(1 for i in range(PAGE) if after[i] != sentinel[i])

        if marker_off >= 0 and marker_orig < 0:
            return Check("trigger_probe", "KERNEL", Status.VULN,
                         "VULNERABLE - marker landed at offset {} in sentinel "
                         "page cache; {} bytes corrupted".format(marker_off, diffs),
                         details={"marker_offset": marker_off,
                                  "bytes_changed": diffs},
                         remediation="Apply kernel patch (mainline a664bf3d603d) "
                                     "and reboot. Interim: LD_PRELOAD shim or seccomp.")
        if diffs > 0:
            return Check("trigger_probe", "KERNEL", Status.VULN,
                         "VULNERABLE - page cache corrupted ({} bytes) but "
                         "marker placement non-canonical".format(diffs),
                         details={"bytes_changed": diffs},
                         remediation="Apply kernel patch and reboot.")
        return Check("trigger_probe", "KERNEL", Status.OK,
                     "page cache intact after trigger; kernel appears patched")
    except Exception as e:
        return Check("trigger_probe", "KERNEL", Status.ERROR,
                     "{}: {}".format(type(e).__name__, e))
    finally:
        try: os.remove(target)
        except OSError: pass
        try: os.rmdir(tmp)
        except OSError: pass

# --- Mitigation checks -----------------------------------------------------

def check_ld_so_preload():
    content = read_text_safe("/etc/ld.so.preload")
    if content is None:
        return Check("ld_so_preload", "MITIGATION", Status.FAIL,
                     "/etc/ld.so.preload absent",
                     remediation="Build no-afalg.so and add to /etc/ld.so.preload.")
    if "no-afalg" not in content:
        return Check("ld_so_preload", "MITIGATION", Status.FAIL,
                     "/etc/ld.so.preload present but no AF_ALG shim referenced",
                     remediation="Add no-afalg.so path to /etc/ld.so.preload.")
    for line in content.splitlines():
        line = line.strip()
        if "no-afalg" in line and os.path.exists(line):
            return Check("ld_so_preload", "MITIGATION", Status.OK,
                         "AF_ALG shim referenced and file exists: {}".format(line),
                         details={"path": line})
    return Check("ld_so_preload", "MITIGATION", Status.FAIL,
                 "shim referenced in ld.so.preload but file missing on disk",
                 remediation="Reinstall the no-afalg shim package.")

def check_shim_blocks_af_alg():
    """Subprocess test: does AF_ALG socket creation actually fail?"""
    code = (
        "import socket, sys\n"
        "try:\n"
        "    s = socket.socket(38, socket.SOCK_SEQPACKET, 0); s.close()\n"
        "    print('UNBLOCKED')\n"
        "except PermissionError:\n"
        "    print('BLOCKED')\n"
        "except OSError as e:\n"
        "    print('ERR:{}'.format(e.errno))\n"
    )
    rc, out, err = run_cmd([sys.executable, "-c", code], timeout=5)
    out_s = out.decode("utf-8", errors="replace").strip()
    if out_s == "BLOCKED":
        return Check("shim_blocks_af_alg", "MITIGATION", Status.OK,
                     "AF_ALG socket creation blocked at userspace layer (EPERM)")
    if out_s == "UNBLOCKED":
        return Check("shim_blocks_af_alg", "MITIGATION", Status.FAIL,
                     "AF_ALG socket created successfully - no userspace block",
                     remediation="Verify shim loaded: ldd $(which python3) | grep no-afalg")
    return Check("shim_blocks_af_alg", "MITIGATION", Status.INFO,
                 "AF_ALG result: {}".format(out_s))

def check_modprobe_blacklist():
    pat = re.compile(
        r"^\s*(install|blacklist)\s+(af_alg|algif_aead|algif_skcipher|"
        r"algif_hash|algif_rng)\b", re.MULTILINE)
    matches = []
    paths = (glob.glob("/etc/modprobe.d/*.conf") +
             glob.glob("/usr/lib/modprobe.d/*.conf") +
             glob.glob("/lib/modprobe.d/*.conf"))
    for path in paths:
        text = read_text_safe(path) or ""
        for m in pat.finditer(text):
            matches.append("{} ({})".format(m.group(0).strip(), path))
    if matches:
        return Check("modprobe_blacklist", "MITIGATION", Status.OK,
                     "AF_ALG family blacklisted ({} entries)".format(len(matches)),
                     details={"entries": matches[:8]})

    # No blacklist found. Severity depends on whether the module is currently
    # loadable. Even on builtin kernels we still flag this as a defense-in-depth
    # gap - costs nothing and protects against kernel rebuilds, kernel swaps,
    # or scenarios where someone replaces the running kernel with one that
    # builds algif_aead as a module.
    state = _algif_aead_state()
    blacklist_lines = (
        "install af_alg /bin/false\n"
        "install algif_aead /bin/false\n"
        "install algif_skcipher /bin/false\n"
        "install algif_hash /bin/false\n"
        "install algif_rng /bin/false")

    if state == "loaded_module":
        return Check(
            "modprobe_blacklist", "MITIGATION", Status.WARN,
            "No AF_ALG modprobe blacklist; algif_aead is currently loaded "
            "as a module (real exposure - can be reloaded after rmmod)",
            details={"algif_aead_state": state},
            remediation="Write to /etc/modprobe.d/99-no-afalg.conf:\n" +
                        blacklist_lines + "\nthen rmmod algif_aead.")
    if state == "absent":
        return Check(
            "modprobe_blacklist", "MITIGATION", Status.WARN,
            "No AF_ALG modprobe blacklist; algif_aead can be auto-loaded on "
            "AF_ALG socket creation if built as a module",
            details={"algif_aead_state": state},
            remediation="Write to /etc/modprobe.d/99-no-afalg.conf:\n" +
                        blacklist_lines)
    # builtin
    return Check(
        "modprobe_blacklist", "MITIGATION", Status.INFO,
        "No AF_ALG modprobe blacklist; algif_aead is builtin (blacklist "
        "doesn't help current kernel but recommended as defense in depth)",
        details={"algif_aead_state": state},
        remediation="Add anyway as defense in depth - costs nothing and "
                    "protects against kernel rebuilds/swaps. "
                    "/etc/modprobe.d/99-no-afalg.conf:\n" + blacklist_lines)

def check_modules_disabled():
    val = read_text_safe("/proc/sys/kernel/modules_disabled")
    if val is None:
        return Check("modules_disabled", "MITIGATION", Status.SKIP,
                     "/proc/sys/kernel/modules_disabled unreadable")
    if val.strip() == "1":
        return Check("modules_disabled", "MITIGATION", Status.OK,
                     "kernel.modules_disabled=1 (no module loading possible)")
    return Check("modules_disabled", "MITIGATION", Status.INFO,
                 "kernel.modules_disabled=0",
                 remediation="Consider sysctl kernel.modules_disabled=1 after boot "
                             "as general hardening (irreversible until reboot).")

def _af_alg_blocked_by_restrict(raf_value):
    """Returns True if a RestrictAddressFamilies value blocks AF_ALG.

    systemd semantics:
      RestrictAddressFamilies=AF_X AF_Y     -> allowlist; only AF_X/AF_Y allowed
      RestrictAddressFamilies=~AF_X AF_Y    -> blocklist; AF_X/AF_Y denied
      RestrictAddressFamilies=              -> no restriction
    AF_ALG is blocked when listed in a ~-prefixed blocklist OR absent from
    a non-prefixed allowlist."""
    if not raf_value:
        return False
    raf = raf_value.strip()
    if raf.startswith("~"):
        return "AF_ALG" in raf
    return "AF_ALG" not in raf

def check_systemd_restrict_address_families():
    """Sample common daemons for AF_ALG seccomp via systemd.

    Hosting daemons (sshd, web, mail, scheduler) and container/orchestration
    runtimes are all candidates - any process that forks off code run by
    less-trusted principals benefits from the AF_ALG cut at the unit level."""
    if not systemd_running():
        return Check("systemd_restrict", "MITIGATION", Status.SKIP,
                     "systemd not running")
    daemons = [
        # Login / shell exposure
        "sshd",
        # Web stack (PHP-FPM, mod_php, CGI all run user-supplied code)
        "httpd", "nginx", "apache2", "php-fpm",
        # Mail (filter scripts, sieve, scanner integrations)
        "exim", "postfix", "dovecot",
        # DNS, DB
        "named", "mariadb", "mysqld", "postgresql",
        # Container / orchestration runtimes
        "containerd", "docker", "podman", "kubelet", "cri-o",
        # CI/CD agents that run untrusted PR/job code
        "gitlab-runner", "jenkins", "actions-runner",
        # Batch / HPC
        "slurmd", "slurmctld",
        # Cron-likes
        "crond", "atd",
    ]
    findings_ok = []
    findings_missing_arch = []
    daemons_loaded = 0
    for d in daemons:
        rc, out, err = run_cmd(["systemctl", "show",
                                "-p", "RestrictAddressFamilies",
                                "-p", "SystemCallArchitectures",
                                "{}.service".format(d)], timeout=3)
        if rc != 0:
            continue
        text = out.decode("utf-8", errors="replace")
        kv = {}
        for line in text.splitlines():
            if "=" in line:
                k, v = line.split("=", 1)
                kv[k] = v
        raf = kv.get("RestrictAddressFamilies", "")
        sca = kv.get("SystemCallArchitectures", "")
        # Both empty = unit not loaded; skip silently.
        if not raf and not sca:
            continue
        daemons_loaded += 1
        if not _af_alg_blocked_by_restrict(raf):
            continue
        if "native" in sca:
            findings_ok.append(d)
        else:
            findings_missing_arch.append(d)

    if findings_ok and not findings_missing_arch:
        return Check("systemd_restrict", "MITIGATION", Status.OK,
                     "{} daemons block AF_ALG with SystemCallArchitectures=native".format(
                         len(findings_ok)),
                     details={"protected": findings_ok})
    if findings_ok or findings_missing_arch:
        msg_parts = []
        if findings_ok:
            msg_parts.append("{} fully protected".format(len(findings_ok)))
        if findings_missing_arch:
            msg_parts.append("{} block AF_ALG but missing SystemCallArchitectures=native".format(
                len(findings_missing_arch)))
        return Check("systemd_restrict", "MITIGATION", Status.WARN,
                     "; ".join(msg_parts) +
                     " - 32-bit compat syscalls may bypass filter",
                     details={"protected": findings_ok,
                              "needs_arch_directive": findings_missing_arch},
                     remediation="Add 'SystemCallArchitectures=native' alongside "
                                 "'RestrictAddressFamilies=~AF_ALG' in the drop-in.")
    return Check("systemd_restrict", "MITIGATION", Status.FAIL,
                 "no AF_ALG-restricting drop-ins on any of {} loaded daemons".format(
                     daemons_loaded),
                 remediation="Add drop-ins like /etc/systemd/system/<svc>.service.d/"
                             "no-afalg.conf with [Service] RestrictAddressFamilies="
                             "~AF_ALG and SystemCallArchitectures=native. "
                             "High-leverage candidates: sshd, user@.service, "
                             "container runtimes, CI runners.")

def check_user_service_dropin():
    """user@.service is the systemd template that spawns per-user systemd
    instances. A RestrictAddressFamilies drop-in here propagates the seccomp
    filter to every login session AND rootless podman/container - one of the
    highest-leverage mitigation points on a multi-user box."""
    if not systemd_running():
        return Check("user_service_dropin", "MITIGATION", Status.SKIP,
                     "systemd not running")
    paths = (glob.glob("/etc/systemd/system/user@.service.d/*.conf") +
             glob.glob("/usr/lib/systemd/system/user@.service.d/*.conf") +
             glob.glob("/lib/systemd/system/user@.service.d/*.conf"))
    for p in paths:
        text = read_text_safe(p) or ""
        if "RestrictAddressFamilies" in text and "AF_ALG" in text:
            return Check("user_service_dropin", "MITIGATION", Status.OK,
                         "user@.service has AF_ALG restriction: {}".format(p),
                         details={"path": p})
    return Check("user_service_dropin", "MITIGATION", Status.WARN,
                 "no user@.service drop-in restricting AF_ALG - high-leverage "
                 "mitigation point missing",
                 remediation="Create /etc/systemd/system/user@.service.d/no-afalg.conf:\n"
                             "[Service]\n"
                             "RestrictAddressFamilies=~AF_ALG\n"
                             "SystemCallArchitectures=native\n"
                             "Propagates to every login session and rootless "
                             "podman container. New sessions inherit; existing "
                             "lingering instances need restart.")

def _daemon_has_afalg_dropin(daemon):
    """Does <daemon>.service have a drop-in that blocks AF_ALG?"""
    raf_re = re.compile(
        r"^\s*RestrictAddressFamilies\s*=\s*(.+)$", re.MULTILINE)
    for unit_dir in ("/etc/systemd/system", "/usr/lib/systemd/system",
                     "/lib/systemd/system"):
        for f in glob.glob(unit_dir + "/" + daemon + ".service.d/*.conf"):
            text = read_text_safe(f) or ""
            m = raf_re.search(text)
            if m and _af_alg_blocked_by_restrict(m.group(1).strip()):
                return f
    return None

def check_seccomp_runtime():
    """Verify seccomp filter is actually loaded into running daemons by
    reading /proc/PID/status - Seccomp=2 means filter mode active. Catches
    the case where a drop-in exists but the daemon was never restarted to
    pick it up."""
    if not systemd_running():
        return Check("seccomp_runtime", "DETECTION", Status.SKIP,
                     "systemd not running")
    daemons = ["sshd", "httpd", "nginx", "containerd", "docker"]
    findings = []
    for d in daemons:
        rc, out, _ = run_cmd(["systemctl", "show", "{}.service".format(d),
                              "-p", "MainPID", "--value"], timeout=3)
        if rc != 0:
            continue
        pid = out.decode("utf-8", errors="replace").strip()
        if not pid or pid == "0":
            continue
        status = read_text_safe("/proc/{}/status".format(pid))
        if not status:
            continue
        m = re.search(r"^Seccomp:\s+(\d+)", status, re.MULTILINE)
        if not m:
            continue
        mode = int(m.group(1))
        # 0 = disabled, 1 = strict, 2 = filter
        dropin = _daemon_has_afalg_dropin(d)
        findings.append((d, pid, mode, dropin))
    if not findings:
        return Check("seccomp_runtime", "DETECTION", Status.SKIP,
                     "no relevant running daemons to inspect")
    filtered = [d for d, _, m, _ in findings if m == 2]
    unfiltered = [(d, dp) for d, _, m, dp in findings if m != 2]
    stale = [d for d, dp in unfiltered if dp is not None]
    no_dropin = [d for d, dp in unfiltered if dp is None]
    if filtered and not unfiltered:
        return Check("seccomp_runtime", "DETECTION", Status.OK,
                     "all {} inspected daemons have seccomp filter active".format(
                         len(filtered)),
                     details={"filtered": filtered})
    # Build remediation tailored to which sub-cause applies.
    rem_parts = []
    if stale:
        rem_parts.append("Stale daemons (drop-in exists but not loaded): "
                         "systemctl restart " + " ".join(stale))
    if no_dropin:
        rem_parts.append("No drop-in for: " + ", ".join(no_dropin) +
                         ". Add /etc/systemd/system/<svc>.service.d/"
                         "no-afalg.conf with [Service] "
                         "RestrictAddressFamilies=~AF_ALG and "
                         "SystemCallArchitectures=native, then daemon-reload "
                         "+ restart.")
    details = {"filtered": filtered,
               "unfiltered_stale_dropin": stale,
               "unfiltered_no_dropin": no_dropin}
    if filtered and unfiltered:
        return Check("seccomp_runtime", "DETECTION", Status.WARN,
                     "{} daemons have seccomp filter, {} do not".format(
                         len(filtered), len(unfiltered)),
                     details=details,
                     remediation=" ".join(rem_parts))
    return Check("seccomp_runtime", "DETECTION", Status.WARN,
                 "{} running daemons have NO seccomp filter loaded".format(
                     len(unfiltered)),
                 details=details,
                 remediation=" ".join(rem_parts))

def check_dropin_freshness():
    """Detect stale daemons: drop-in file is newer than the running daemon's
    process start time. Means the file changed but the daemon was never
    restarted to pick it up - filter isn't active despite the file existing."""
    if not systemd_running():
        return Check("dropin_freshness", "MITIGATION", Status.SKIP,
                     "systemd not running")
    daemons_to_dropins = {}
    raf_re = re.compile(
        r"^\s*RestrictAddressFamilies\s*=\s*(.+)$", re.MULTILINE)
    for unit_dir in ("/etc/systemd/system", "/usr/lib/systemd/system",
                     "/lib/systemd/system"):
        for d in glob.glob(unit_dir + "/*.service.d"):
            unit_name = os.path.basename(d).replace(".service.d", "")
            for f in glob.glob(d + "/*.conf"):
                text = read_text_safe(f) or ""
                # Only consider drop-ins whose actual RestrictAddressFamilies=
                # directive blocks AF_ALG. Substring matching here previously
                # mis-categorised drop-ins that merely mentioned AF_ALG in a
                # comment.
                m = raf_re.search(text)
                if not m:
                    continue
                if _af_alg_blocked_by_restrict(m.group(1).strip()):
                    daemons_to_dropins.setdefault(unit_name, []).append(f)
    if not daemons_to_dropins:
        return Check("dropin_freshness", "MITIGATION", Status.SKIP,
                     "no AF_ALG drop-ins to verify")
    stale = []
    fresh = []
    for daemon, dropins in daemons_to_dropins.items():
        rc, out, _ = run_cmd(["systemctl", "show", "{}.service".format(daemon),
                              "-p", "MainPID", "--value"], timeout=3)
        if rc != 0:
            continue
        pid = out.decode("utf-8", errors="replace").strip()
        if not pid or pid == "0":
            continue
        try:
            proc_start = os.stat("/proc/" + pid).st_mtime
        except OSError:
            continue
        for dp in dropins:
            try:
                dropin_mtime = os.stat(dp).st_mtime
            except OSError:
                continue
            if dropin_mtime > proc_start:
                stale.append("{} (drop-in {} newer than pid {})".format(
                    daemon, dp, pid))
            else:
                fresh.append(daemon)
    if stale:
        return Check("dropin_freshness", "MITIGATION", Status.WARN,
                     "{} stale daemons (drop-in newer than running process)".format(
                         len(stale)),
                     details={"stale": stale, "fresh": fresh},
                     remediation="Restart affected daemons so they pick up the "
                                 "AF_ALG restriction: systemctl restart <name>")
    return Check("dropin_freshness", "MITIGATION", Status.OK,
                 "all {} daemons with AF_ALG drop-ins are running with the "
                 "drop-in active".format(len(fresh)))

# --- Hardening checks ------------------------------------------------------

# Canonical SUID set across mainstream distros. Used as a "this is expected"
# allow-list when classifying setuid binaries, and as a fallback list when
# we can't run find as root. Membership here means "expected to exist as
# setuid on at least one mainstream distro" - missing or non-setuid on a
# given host is fine.
EXPECTED_SUID = set([
    "/usr/bin/su", "/bin/su", "/usr/bin/sudo",
    "/usr/bin/passwd", "/usr/bin/chsh", "/usr/bin/chage", "/usr/bin/chfn",
    "/usr/bin/gpasswd", "/usr/bin/newgrp", "/usr/bin/pkexec",
    "/usr/bin/mount", "/usr/bin/umount", "/bin/mount", "/bin/umount",
    "/usr/bin/at", "/usr/bin/crontab",
    "/usr/bin/fusermount", "/usr/bin/fusermount3",
    "/usr/bin/fusermount-glusterfs",
    "/usr/lib/polkit-1/polkit-agent-helper-1",
    "/usr/sbin/unix_chkpwd", "/usr/sbin/pam_timestamp_check",
    "/usr/sbin/mount.nfs", "/usr/sbin/usernetctl", "/usr/sbin/userhelper",
    "/usr/sbin/grub2-set-bootflag",
    "/usr/bin/ksu",
    "/usr/bin/keybase-redirector",
    # Debian/Ubuntu
    "/usr/lib/openssh/ssh-keysign",
    "/usr/lib/eject/dmcrypt-get-device",
    "/usr/lib/dbus-1.0/dbus-daemon-launch-helper",
    # Fedora/RHEL
    "/usr/libexec/openssh/ssh-keysign",
    "/usr/libexec/dbus-1/dbus-daemon-launch-helper",
    "/usr/libexec/qemu-bridge-helper",
    "/usr/libexec/Xorg.wrap",
    "/usr/libexec/spice-gtk-x86_64/spice-client-glib-usb-acl-helper",
    # Vendor / third-party desktop
    "/usr/lib64/chromium-browser/chrome-sandbox",
    "/usr/share/antigravity/chrome-sandbox",
    "/opt/google/chrome/chrome-sandbox",
    "/opt/keybase/chrome-sandbox",
    "/usr/bin/vmware-user-suid-wrapper",
])

# System filesystem prefixes worth scanning for SUID outliers. We deliberately
# do NOT include /home, /var, /tmp, or container/snapshot mount roots:
# those produce noise from docker overlay layers, tarball extracts, snap
# images, and user-controlled content that isn't actually executable from
# the host's privilege boundary.
SUID_SCAN_ROOTS = ["/usr", "/opt", "/usr/local", "/sbin", "/bin"]

# Path substrings that, if present, mean a result is from an isolated
# container/snapshot layer rather than a host-reachable binary. Belt-and-
# suspenders for sites that have docker storage on the same FS as /usr.
SUID_PATH_EXCLUDES = (
    "/overlay2/", "/overlayfs/", "/snapshots/", "/containerd/",
    "/.snapshots/", "/.zfs/snapshot/", "/btrfs/subvol/",
    "/var/lib/containers/", "/var/lib/docker/",
)

def _scan_suid_inventory():
    """Return a list of paths whose setuid bit is set, under SUID_SCAN_ROOTS.

    Root: uses find for completeness. Container/snapshot paths are filtered
    out post-find, since some sites have docker storage on the same device
    as /usr and -xdev wouldn't catch them.

    Non-root or find failure: falls back to stat-checking EXPECTED_SUID. The
    fallback is best-effort - it cannot discover non-canonical setuid
    binaries planted by an attacker. That is documented as a limitation.
    """
    discovered = []
    if is_root():
        roots = [r for r in SUID_SCAN_ROOTS
                 if os.path.isdir(r) and not os.path.islink(r)]
        if roots:
            cmd = (["find"] + roots
                   + ["-xdev", "-type", "f", "-perm", "-4000",
                      "-printf", "%p\\n"])
            rc, out, _ = run_cmd(cmd, timeout=30)
            if rc == 0:
                for line in out.decode("utf-8", "replace").splitlines():
                    p = line.strip()
                    if not p:
                        continue
                    if any(ex in p for ex in SUID_PATH_EXCLUDES):
                        continue
                    discovered.append(p)
    if not discovered:
        for p in EXPECTED_SUID:
            try:
                st = os.stat(p)
            except OSError:
                continue
            if st.st_mode & stat.S_ISUID:
                discovered.append(p)
    return discovered

def check_suid_inventory():
    """Consolidated SUID audit (replaces N near-duplicate per-binary lines).

    Verdict:
      OK   - all setuid binaries are canonical and have nominal modes
      WARN - non-canonical paths or unusual modes (4777, 6755 with group
             write, etc.) found
    """
    paths = _scan_suid_inventory()
    confirmed = []
    for p in paths:
        try:
            st = os.stat(p)
        except OSError:
            continue
        # Defence against TOCTOU: only count files where the setuid bit is
        # actually set right now. Skip silently otherwise.
        if not (st.st_mode & stat.S_ISUID):
            continue
        confirmed.append((p, st.st_mode & 0o7777))
    if not confirmed:
        return Check("suid_inventory", "HARDENING", Status.OK,
                     "no setuid binaries found in {}".format(SUID_SCAN_ROOTS))
    unexpected = []
    odd_mode = []
    # Modes considered "nominal" for an expected setuid: setuid set, user
    # rwx, group/other r-x or x-only, optional setgid. We flag world/group
    # writable explicitly as anomalous.
    for p, mode in sorted(confirmed):
        if mode & 0o022:  # group-write or world-write set on a setuid binary
            odd_mode.append((p, oct(mode)))
            continue
        if p not in EXPECTED_SUID:
            unexpected.append((p, oct(mode)))
    msg = "{} setuid binaries inventoried".format(len(confirmed))
    details = {"total": len(confirmed), "scan_roots": SUID_SCAN_ROOTS,
               "canonical": len(confirmed) - len(unexpected) - len(odd_mode),
               "unexpected": [{"path": p, "mode": m} for p, m in unexpected],
               "odd_mode": [{"path": p, "mode": m} for p, m in odd_mode]}
    if unexpected or odd_mode:
        parts = []
        if unexpected:
            parts.append("{} non-canonical".format(len(unexpected)))
        if odd_mode:
            parts.append("{} group/world-writable".format(len(odd_mode)))
        return Check("suid_inventory", "HARDENING", Status.WARN,
                     msg + " (" + ", ".join(parts) + ")",
                     details=details,
                     remediation="Audit non-canonical entries; each is a "
                                 "page-cache substitution target equivalent "
                                 "to root if exploited via CVE-2026-31431.")
    return Check("suid_inventory", "HARDENING", Status.OK,
                 msg + " (all canonical, modes nominal)",
                 details=details)

def _hash_pagecache(path):
    try:
        with open(path, "rb") as f:
            return hashlib.sha256(f.read()).hexdigest()
    except (IOError, OSError):
        return None

def _hash_direct(path):
    """Read via dd iflag=direct to bypass page cache. Returns None if O_DIRECT
    isn't supported (e.g., tmpfs, some network filesystems). The caller treats
    None as 'cannot verify' rather than guessing."""
    rc, out, err = run_cmd(
        ["dd", "if=" + path, "iflag=direct", "bs=4096", "status=none"],
        timeout=10)
    if rc == 0:
        return hashlib.sha256(out).hexdigest()
    return None

def check_page_cache_integrity():
    out = []
    files = list(PRIV_CONFIG_FILES)
    if is_root():
        files += PRIV_CONFIG_ROOT_FILES
    for path in files:
        if not os.path.exists(path):
            continue
        cached = _hash_pagecache(path)
        if cached is None:
            continue
        direct = _hash_direct(path)
        if direct is None:
            out.append(Check("pagecache:" + path, "HARDENING", Status.SKIP,
                             "couldn't read {} via O_DIRECT".format(path)))
            continue
        if cached == direct:
            out.append(Check("pagecache:" + path, "HARDENING", Status.OK,
                             "{} page cache matches disk".format(path),
                             details={"path": path, "sha256": cached}))
            continue
        # Divergence is a *potential* IOC, not a confirmed CVE-2026-31431
        # exploitation: it can also arise from a concurrent writer between
        # the two reads, an active fsync window, or filesystem-level
        # caching quirks (overlay/btrfs snapshots). Trigger_probe is the
        # only check that produces a definitive VULN verdict.
        # Re-read after a short delay to filter out transient writer races;
        # only stable divergence is reported.
        time.sleep(0.2)
        cached2 = _hash_pagecache(path)
        direct2 = _hash_direct(path)
        if cached2 is None or direct2 is None or cached2 == direct2:
            out.append(Check("pagecache:" + path, "HARDENING", Status.OK,
                             "{} page cache transient divergence; "
                             "stable on re-read".format(path),
                             details={"path": path,
                                      "transient_first_sha256": cached,
                                      "stable_sha256": cached2 or direct2}))
            continue
        out.append(Check(
            "pagecache:" + path, "HARDENING", Status.WARN,
            "page cache differs from disk on {} (stable across re-read): "
            "cached={} disk={}".format(path, cached2[:16], direct2[:16]),
            details={"path": path, "cached_sha256": cached2,
                     "disk_sha256": direct2},
            remediation="Stable divergence is a potential page-cache "
                        "substitution IOC. Evict cache (vmtouch -e {}), "
                        "snapshot the host for forensics, and correlate "
                        "with audit logs for AF_ALG/splice activity.".format(path)))
    return out

def check_file_capabilities():
    """getcap -r / for non-suid privilege-bearing binaries."""
    if not any(os.path.exists(p) for p in
               ["/usr/sbin/getcap", "/sbin/getcap", "/usr/bin/getcap"]):
        return Check("file_capabilities", "HARDENING", Status.SKIP,
                     "getcap not installed")
    rc, out, err = run_cmd(["getcap", "-r", "/usr", "/bin", "/sbin"], timeout=20)
    if rc != 0:
        return Check("file_capabilities", "HARDENING", Status.SKIP,
                     "getcap failed: {}".format(err.decode("utf-8", "replace").strip()))
    text = out.decode("utf-8", errors="replace")
    risky = []
    for line in text.splitlines():
        if any(c in line for c in ("cap_setuid", "cap_sys_admin", "cap_dac_override",
                                    "cap_sys_module", "cap_sys_ptrace")):
            risky.append(line.strip())
    if risky:
        return Check("file_capabilities", "HARDENING", Status.WARN,
                     "{} binaries hold privilege-bearing file caps".format(len(risky)),
                     details={"entries": risky[:10]},
                     remediation="Audit each - these are page-cache substitution "
                                 "targets equivalent to suid root.")
    return Check("file_capabilities", "HARDENING", Status.OK,
                 "no privilege-bearing file capabilities found")

# --- Detection readiness ---------------------------------------------------

def check_auditd():
    out = []
    rc, sout, serr = run_cmd(["systemctl", "is-active", "auditd"], timeout=3)
    active = sout.decode("utf-8", "replace").strip() == "active"
    out.append(Check("auditd_running", "DETECTION",
                     Status.OK if active else Status.WARN,
                     "auditd is {}".format("active" if active else "NOT active"),
                     remediation=None if active
                     else "systemctl enable --now auditd"))
    if active and is_root():
        rc, sout, serr = run_cmd(["auditctl", "-l"], timeout=3)
        rules = sout.decode("utf-8", "replace")
        # v3.0.1: auditctl -l renders a0= in hex (a0=0x26) but our
        # shipped /etc/audit/rules.d/99-rfxn-defense.rules uses
        # decimal (a0=38). Match by the canonical key name instead -
        # works regardless of how auditctl formats the arg value.
        # AF_ALG=38=0x26, AF_KEY=15=0xF, AF_RXRPC=33=0x21, AF_RDS=21=0x15.
        has_afalg   = "key=rfxn_afalg"        in rules
        has_afkey   = "key=rfxn_afkey"        in rules
        has_afrxrpc = "key=rfxn_afrxrpc"      in rules
        has_afrds   = "key=rfxn_afrds"        in rules
        has_pidfd   = "key=rfxn_pidfd_getfd"  in rules
        has_su_exec = "/usr/bin/su" in rules and "execve" in rules
        out.append(Check("audit_rule_af_alg", "DETECTION",
                         Status.OK if has_afalg else Status.FAIL,
                         "AF_ALG socket audit rule (rfxn_afalg): {}".format(
                             "present" if has_afalg else "MISSING"),
                         remediation=None if has_afalg else
                         "dnf install rfxn-defense-audit"))
        out.append(Check("audit_rule_af_key", "DETECTION",
                         Status.OK if has_afkey else Status.WARN,
                         "AF_KEY socket audit rule (rfxn_afkey): {}".format(
                             "present" if has_afkey else "missing"),
                         remediation=None if has_afkey else
                         "dnf install rfxn-defense-audit"))
        out.append(Check("audit_rule_af_rxrpc", "DETECTION",
                         Status.OK if has_afrxrpc else Status.WARN,
                         "AF_RXRPC socket audit rule (rfxn_afrxrpc): {}".format(
                             "present" if has_afrxrpc else "missing"),
                         remediation=None if has_afrxrpc else
                         "dnf install rfxn-defense-audit"))
        out.append(Check("audit_rule_af_rds", "DETECTION",
                         Status.OK if has_afrds else Status.WARN,
                         "AF_RDS socket audit rule (rfxn_afrds): {}".format(
                             "present" if has_afrds else "missing"),
                         remediation=None if has_afrds else
                         "dnf install rfxn-defense-audit"))
        out.append(Check("audit_rule_pidfd_getfd", "DETECTION",
                         Status.OK if has_pidfd else Status.WARN,
                         "pidfd_getfd audit rule (rfxn_pidfd_getfd): {}".format(
                             "present" if has_pidfd else "missing"),
                         remediation=None if has_pidfd else
                         "dnf install rfxn-defense-audit"))
        out.append(Check("audit_rule_su_exec", "DETECTION",
                         Status.OK if has_su_exec else Status.WARN,
                         "/usr/bin/su execve audit rule: {}".format(
                             "present" if has_su_exec else "missing"),
                         remediation=None if has_su_exec else
                         "auditctl -a always,exit -F arch=b64 -S execve "
                         "-F path=/usr/bin/su -k su_exec"))
    return out

def _scan_text_for_ioc(text):
    """Returns (shim_blocks, afalg_audit, addl_audit) counts in text.

    v3.0.1: counts both legacy (afalg_attempt) and current (rfxn_afalg)
    key names. Third tuple element now counts any rfxn_* tripwire hit
    (rfxn_afkey, rfxn_afrxrpc, rfxn_afrds, rfxn_pidfd_getfd) so the
    IOC summary surfaces a hit on any rfxn_* key even when AF_ALG
    itself wasn't the trigger.
    """
    shim_blocks = 0
    if "no-afalg" in text and "blocked AF_ALG" in text:
        shim_blocks = text.count("blocked AF_ALG")
    afalg = text.count("afalg_attempt") + text.count("rfxn_afalg")
    addl  = (text.count("rfxn_afkey") + text.count("rfxn_afrxrpc")
             + text.count("rfxn_afrds") + text.count("rfxn_pidfd_getfd"))
    return (shim_blocks, afalg, addl)

def check_recent_ioc_signals():
    """Look for shim/audit IOCs in recent logs (root-only, non-fatal).

    Sources, in order:
      - /var/log/secure (RHEL/Fedora rsyslog)
      - /var/log/auth.log (Debian/Ubuntu rsyslog)
      - journalctl (systemd-journald, fallback for journald-only hosts)
      - /var/log/audit/audit.log (auditd)
    """
    if not is_root():
        return Check("recent_iocs", "DETECTION", Status.SKIP,
                     "root needed to scan auth/audit logs")
    findings = []
    sources_seen = []
    shim_total = afalg_total = splice_total = 0

    for log_path in ("/var/log/secure", "/var/log/auth.log"):
        text = read_text_safe(log_path, max_bytes=512*1024)
        if text is None:
            continue
        sources_seen.append(log_path)
        sb, _, _ = _scan_text_for_ioc(text)
        if sb:
            shim_total += sb
            findings.append("{}: shim blocked {} AF_ALG attempts".format(
                log_path, sb))

    # Journald fallback: only consult journalctl if neither rsyslog file
    # surfaced shim activity (avoids double-counting on hosts that have
    # both rsyslog and persistent journal).
    if shim_total == 0 and os.path.exists("/run/systemd/journal/socket"):
        rc, out, _ = run_cmd(
            ["journalctl", "--no-pager", "-q", "--since", "-7d",
             "-t", "no-afalg"], timeout=10)
        if rc == 0:
            jtext = out.decode("utf-8", "replace")
            sources_seen.append("journalctl -t no-afalg")
            sb, _, _ = _scan_text_for_ioc(jtext)
            if sb:
                shim_total += sb
                findings.append("journalctl: shim blocked {} AF_ALG attempts "
                                "(last 7 days)".format(sb))

    audit_text = read_text_safe("/var/log/audit/audit.log",
                                max_bytes=1024*1024)
    if audit_text is not None:
        sources_seen.append("/var/log/audit/audit.log")
        _, af, sp = _scan_text_for_ioc(audit_text)
        if af:
            afalg_total += af
            findings.append("auditd logged {} afalg_attempt events".format(af))
        if sp:
            splice_total += sp
            findings.append("auditd logged {} splice events".format(sp))

    if not sources_seen:
        return Check("recent_iocs", "DETECTION", Status.SKIP,
                     "no readable auth/audit log sources found")
    if findings:
        return Check("recent_iocs", "DETECTION", Status.WARN,
                     "; ".join(findings),
                     details={"shim_blocks": shim_total,
                              "afalg_audit_events": afalg_total,
                              "splice_audit_events": splice_total,
                              "sources": sources_seen},
                     remediation="Investigate uids/pids and correlate with "
                                 "su/sshd authentication events.")
    return Check("recent_iocs", "DETECTION", Status.OK,
                 "no AF_ALG IOC signals in {} log source(s)".format(
                     len(sources_seen)),
                 details={"sources": sources_seen})

# --- v2.0.0 cf-class extensions -------------------------------------------
# Below: new check functions added in v2.0.0 to cover cf2 (xfrm-ESP) and
# Dirty Frag (xfrm-ESP + RxRPC). All follow the existing pattern: return a
# Check or list[Check]; never raise into the caller; stdlib-only.

def _module_state(name):
    """Classify a kernel module: 'loaded', 'builtin', 'modular', 'absent'."""
    # /sys/module/<name> exists if loaded OR builtin
    sys_path = "/sys/module/" + name
    if os.path.isdir(sys_path):
        # Built-in modules don't have a refcnt file; loaded modules do.
        if os.path.exists(sys_path + "/refcnt") or \
           os.path.exists(sys_path + "/holders"):
            return "loaded"
        return "builtin"
    # Not loaded; check if loadable
    rel = os.uname().release
    for base in ("/lib/modules/" + rel, "/usr/lib/modules/" + rel):
        if not os.path.isdir(base):
            continue
        # modules.dep is the canonical source; cheap to grep
        dep = base + "/modules.dep"
        if os.path.isfile(dep):
            text = read_text_safe(dep, max_bytes=1 << 20) or ""
            for line in text.splitlines():
                # entry: kernel/.../foo.ko: deps...
                stem = line.split(":", 1)[0]
                bn = os.path.basename(stem)
                if bn.startswith(name + "."):
                    return "modular"
    return "absent"

def check_xfrm_modules():
    """cf2 / Dirty Frag-ESP: report on the xfrm-ESP entry-point modules."""
    states = {m: _module_state(m) for m in
              ("esp4", "esp6", "xfrm_user", "xfrm_algo")}
    loaded = [m for m, s in states.items() if s == "loaded"]
    modular = [m for m, s in states.items() if s == "modular"]
    builtin = [m for m, s in states.items() if s == "builtin"]
    if loaded:
        return Check("xfrm_modules", "KERNEL", Status.WARN,
                     "xfrm-ESP modules currently loaded: {}".format(
                         ", ".join(loaded)),
                     details=states,
                     remediation="rmmod {}".format(" ".join(loaded)) +
                                 " (and add to /etc/modprobe.d/99-rfxn-defense.conf "
                                 "to prevent reload).")
    if modular:
        return Check("xfrm_modules", "KERNEL", Status.INFO,
                     "xfrm-ESP modules loadable but not loaded: {}".format(
                         ", ".join(modular)),
                     details=states)
    if builtin:
        return Check("xfrm_modules", "KERNEL", Status.WARN,
                     "xfrm-ESP modules built into kernel: {}".format(
                         ", ".join(builtin)),
                     details=states,
                     remediation="kernel-rebuild required to remove; rely on "
                                 "systemd RestrictNamespaces=~user ~net to cut "
                                 "the unshare path instead.")
    return Check("xfrm_modules", "KERNEL", Status.OK,
                 "xfrm-ESP modules absent or unbuildable on this kernel",
                 details=states)

def check_rxrpc_module():
    """Dirty Frag-RxRPC: report on rxrpc.ko reachability."""
    state = _module_state("rxrpc")
    # /proc/net/protocols is the canonical "is the protocol family
    # registered?" check - avoids socket() autoload side-effect.
    proto_text = read_text_safe("/proc/net/protocols") or ""
    rxrpc_registered = bool(re.search(r"^RXRPC\b", proto_text, re.MULTILINE))
    details = {"module_state": state, "protocol_registered": rxrpc_registered}
    if state == "loaded" or rxrpc_registered:
        return Check("rxrpc_module", "KERNEL", Status.WARN,
                     "rxrpc module loaded / protocol registered "
                     "(dirtyfrag-RxRPC reachable)",
                     details=details,
                     remediation="rmmod rxrpc && add to "
                                 "/etc/modprobe.d/99-rfxn-defense.conf")
    if state == "modular":
        return Check("rxrpc_module", "KERNEL", Status.INFO,
                     "rxrpc loadable but not loaded "
                     "(may auto-load on socket(AF_RXRPC, ...) call)",
                     details=details)
    if state == "builtin":
        return Check("rxrpc_module", "KERNEL", Status.WARN,
                     "rxrpc built into kernel (dirtyfrag-RxRPC reachable)",
                     details=details)
    return Check("rxrpc_module", "KERNEL", Status.OK,
                 "rxrpc absent and unbuildable on this kernel",
                 details=details)

def check_modprobe_blacklist_extended():
    """v2.0.0: aggregate modprobe coverage across the full cf-class list."""
    pat = re.compile(
        r"^\s*(install|blacklist)\s+(\S+)\b", re.MULTILINE)
    paths = (glob.glob("/etc/modprobe.d/*.conf") +
             glob.glob("/usr/lib/modprobe.d/*.conf") +
             glob.glob("/lib/modprobe.d/*.conf"))
    covered = set()
    for path in paths:
        text = read_text_safe(path) or ""
        for m in pat.finditer(text):
            mod = m.group(2)
            if mod in CF_CLASS_MODULES:
                covered.add(mod)
    missing = [m for m in CF_CLASS_MODULES if m not in covered]
    if not missing:
        return Check("modprobe_extended", "MITIGATION", Status.OK,
                     "all {} cf-class entry-point modules blacklisted".format(
                         len(CF_CLASS_MODULES)),
                     details={"covered": sorted(covered)})
    return Check("modprobe_extended", "MITIGATION", Status.WARN,
                 "{} cf-class modules not blacklisted: {}".format(
                     len(missing), ", ".join(missing)),
                 details={"covered": sorted(covered),
                          "missing": missing},
                 remediation="Install rfxn-defense-modprobe, OR write "
                             "/etc/modprobe.d/99-rfxn-defense.conf with "
                             "'install <mod> /bin/false' for each missing.")

def _rpm_q_installed(pkg):
    """Returncode-based check: rpm -q <pkg> exits 0 iff installed.
    Per D-45 / reviewer M-9: file-existence checks misclassify hosts
    where a subpackage is installed but its drop-in file got
    hand-removed or its scriptlet failed silently. rpm -q is the
    authoritative source."""
    try:
        rc = subprocess.run(["rpm", "-q", pkg],
                            stdout=subprocess.DEVNULL,
                            stderr=subprocess.DEVNULL).returncode
        return rc == 0
    except (OSError, subprocess.SubprocessError):
        return False

def check_auto_detect_state():
    """v2.0.1: report auto-detection result from
    /var/lib/rfxn-defense/auto-detect.json (written by detect.sh).

    OK if no workloads detected (or force_full set).
    INFO if workloads detected and conditional mitigations suppressed
         (the package working as designed).
    WARN if the JSON is missing on a host where the modprobe or
         systemd subpackage is installed (scriptlet failed silently),
         OR if the schema_version is unrecognized (D-53 / M-3).
    SKIP if neither subpackage is installed (auditor-only install).

    Rev 2 fixup: SKIP test uses rpm -q (D-45 / M-9) not file-existence;
    schema rejection emits posture.auto_detect.schema_unrecognized=true
    (D-53 / M-3)."""
    have_modprobe = _rpm_q_installed("rfxn-defense-modprobe")
    have_systemd = _rpm_q_installed("rfxn-defense-systemd")

    if not (have_modprobe or have_systemd):
        return Check("auto_detect_state", "MITIGATION", Status.SKIP,
                     "auditor-only install (no -modprobe or -systemd)")

    try:
        with open(AUTO_DETECT_PATH, "r") as f:
            data = json.load(f)
    except FileNotFoundError:
        return Check("auto_detect_state", "MITIGATION", Status.WARN,
                     "auto-detect.json missing; %posttrans likely failed",
                     remediation="Run: /usr/sbin/rfxn-redetect")
    except (json.JSONDecodeError, OSError) as e:
        return Check("auto_detect_state", "MITIGATION", Status.WARN,
                     "auto-detect.json unreadable: {}".format(e),
                     remediation="Run: /usr/sbin/rfxn-redetect")

    # json.load() succeeds for any valid JSON value: list, null, string,
    # number. Only a top-level object exposes .get(). Reject everything
    # else as "schema unrecognized" rather than letting AttributeError
    # crash the auditor.
    if not isinstance(data, dict):
        return Check("auto_detect_state", "MITIGATION", Status.WARN,
                     "auto-detect.json is not a JSON object",
                     details={
                         "path": str(AUTO_DETECT_PATH),
                         "type": type(data).__name__,
                         "schema_unrecognized": True,
                     },
                     remediation="Run: /usr/sbin/rfxn-redetect")

    schema = data.get("schema_version")
    if schema != AUTO_DETECT_SCHEMA_VERSION:
        # D-53 / M-3: structured field for SIEM filtering.
        return Check("auto_detect_state", "MITIGATION", Status.WARN,
                     "auto-detect.json schema {} unrecognized "
                     "(expected {})".format(schema, AUTO_DETECT_SCHEMA_VERSION),
                     details={
                         "schema": schema,
                         "schema_unrecognized": True,
                     })

    detected = data.get("detected") or {}
    if not isinstance(detected, dict):
        detected = {}
    detected_workloads = sorted([
        k for k, v in detected.items()
        if isinstance(v, dict) and v.get("present") is True
    ])
    suppressed = data.get("suppressed") or {}
    if not isinstance(suppressed, dict):
        suppressed = {}
    # v2.1.1: suppressed.sysctl_iouring is a dict {suppressed, reason};
    # all other entries are plain bool. Handle both shapes.
    def _is_supp(v):
        if v is True:
            return True
        if isinstance(v, dict) and v.get("suppressed") is True:
            return True
        return False
    suppressed_mits = sorted([
        k for k, v in suppressed.items() if _is_supp(v)
    ])
    force_full = bool(data.get("force_full"))

    details = {
        "detected_workloads": detected_workloads,
        "suppressed_mitigations": suppressed_mits,
        "force_full": force_full,
        "schema_unrecognized": False,
    }

    if force_full:
        return Check("auto_detect_state", "MITIGATION", Status.OK,
                     "force-full sentinel active; all mitigations applied",
                     details=details)
    if not detected_workloads:
        return Check("auto_detect_state", "MITIGATION", Status.OK,
                     "auto-detect: no conflicting workloads",
                     details=details)
    return Check("auto_detect_state", "MITIGATION", Status.INFO,
                 "auto-detect: {} ({} mitigation(s) suppressed)".format(
                     ", ".join(detected_workloads), len(suppressed_mits)),
                 details=details)

def check_rds_modprobe():
    """v2.1.0 MITIGATION: AF_RDS / rds.ko blacklist for the pintheft class.

    Decision matrix (drop-in 99-rfxn-defense-rds.conf):
      suppressed.modprobe_rds=true  -> INFO (Oracle/RDS workload detected)
      file present + not suppressed -> OK
      file absent + -modprobe       -> FAIL (subpackage installed, drop
                                             missing -> scriptlet failed)
      file absent + no -modprobe    -> SKIP (auditor-only install)
    Missing suppressed.modprobe_rds key is treated as false (operators
    who upgraded RPMs before %posttrans re-ran)."""
    drop_path = "/etc/modprobe.d/99-rfxn-defense-rds.conf"
    drop_present = os.path.isfile(drop_path)

    suppressed_rds = False
    try:
        with open(AUTO_DETECT_PATH, "r") as f:
            data = json.load(f)
        if isinstance(data, dict):
            sup = data.get("suppressed") or {}
            if isinstance(sup, dict):
                suppressed_rds = bool(sup.get("modprobe_rds", False))
    except (FileNotFoundError, json.JSONDecodeError, OSError):
        suppressed_rds = False

    if suppressed_rds:
        return Check("rds_modprobe", "MITIGATION", Status.INFO,
                     "AF_RDS modprobe suppressed - Oracle/RDS workload "
                     "detected",
                     details={"path": drop_path, "suppressed": True})

    if drop_present:
        return Check("rds_modprobe", "MITIGATION", Status.OK,
                     "AF_RDS modprobe drop-in present: {}".format(drop_path),
                     details={"path": drop_path})

    if _rpm_q_installed("rfxn-defense-modprobe"):
        return Check("rds_modprobe", "MITIGATION", Status.FAIL,
                     "AF_RDS modprobe drop-in missing despite -modprobe "
                     "subpackage installed (scriptlet likely failed)",
                     details={"path": drop_path},
                     remediation="Run: /usr/sbin/rfxn-redetect ; verify "
                                 "{} exists and contains 'install rds "
                                 "/bin/false'".format(drop_path))

    return Check("rds_modprobe", "MITIGATION", Status.SKIP,
                 "auditor-only install (no -modprobe subpackage); AF_RDS "
                 "drop-in not expected",
                 details={"path": drop_path})

def check_af_rds_restrict():
    """v2.1.0 MITIGATION: AF_RDS systemd restriction for the pintheft class.

    Reads sshd.service via systemctl cat as the representative tenant unit.
    Falls back to the on-disk drop-in if systemctl is unavailable."""
    raf_re = re.compile(r"RestrictAddressFamilies\s*=.*~AF_RDS")

    rc, out, err = run_cmd(["systemctl", "cat", "sshd.service"], timeout=3)
    text = ""
    source = None
    if rc == 0 and out:
        text = out.decode("utf-8", errors="replace")
        source = "systemctl cat sshd.service"
    else:
        dropin = "/etc/systemd/system/sshd.service.d/10-rfxn-defense.conf"
        fallback = read_text_safe(dropin) or ""
        if fallback:
            text = fallback
            source = dropin

    if source is None:
        return Check("af_rds_restrict", "MITIGATION", Status.SKIP,
                     "sshd.service unavailable via systemctl and no "
                     "rfxn-defense drop-in on disk")

    if raf_re.search(text):
        return Check("af_rds_restrict", "MITIGATION", Status.OK,
                     "sshd.service blocks AF_RDS via RestrictAddressFamilies",
                     details={"source": source})

    return Check("af_rds_restrict", "MITIGATION", Status.FAIL,
                 "sshd.service does not restrict AF_RDS (pintheft "
                 "primitive reachable inside tenant sessions)",
                 details={"source": source},
                 remediation="Install rfxn-defense-systemd, OR add to "
                             "/etc/systemd/system/sshd.service.d/"
                             "10-rfxn-defense.conf: [Service] "
                             "RestrictAddressFamilies=~AF_RDS")

def _unit_namespaces_blocked(rn_value):
    """Returns True if RestrictNamespaces blocks BOTH user and net.

    systemd semantics:
      RestrictNamespaces=~user net   -> deny-list; user and net denied
      RestrictNamespaces=user net    -> allow-list; only user/net allowed (rare)
      RestrictNamespaces=yes/no      -> blanket deny/allow (rare)
    The cf2/dirtyfrag-ESP exploit needs unshare(CLONE_NEWUSER|CLONE_NEWNET);
    cutting either kills the chain. We require BOTH to flag OK."""
    if not rn_value:
        return False
    rn = rn_value.strip()
    if rn in ("yes", "true", "1"):
        return True  # blanket deny-all
    if rn in ("no", "false", "0", ""):
        return False
    if rn.startswith("~"):
        # deny-list: user AND net must be in the list
        body = rn[1:]
        toks = re.split(r"[\s,]+", body)
        return "user" in toks and "net" in toks
    # allow-list: user AND net must be ABSENT
    toks = re.split(r"[\s,]+", rn)
    return "user" not in toks and "net" not in toks

_USERNS_DROPIN_GLOB = "/etc/systemd/system/*.service.d/15-rfxn-defense-userns.conf"

def check_systemd_restrict_namespaces():
    """v2.0.0: per-unit RestrictNamespaces coverage for cf2/dirtyfrag-ESP.

    v3.0.2: gate on systemd version. RestrictNamespaces= was introduced
    in systemd v235; older systemd (EL7 ships v219) silently ignores
    the directive at unit-start. On those hosts the rfxn-defense-systemd
    package no longer ships the 15-userns drop-in (spec-gated). If a
    stale drop-in remains from a prior install, surface it as FAIL so
    the operator knows the directive is inert."""
    if not systemd_running():
        return Check("systemd_restrict_namespaces", "MITIGATION", Status.SKIP,
                     "systemd not running")

    sv = systemd_version()
    if sv is not None and sv < 235:
        stale = sorted(glob.glob(_USERNS_DROPIN_GLOB))
        if stale:
            return Check("systemd_restrict_namespaces", "MITIGATION", Status.FAIL,
                         "systemd {} ignores RestrictNamespaces= (introduced "
                         "in v235); {} stale drop-in(s) inert".format(
                             sv, len(stale)),
                         details={"systemd_version": sv,
                                  "stale_dropins": stale},
                         remediation="dnf upgrade rfxn-defense-systemd to "
                                     ">=3.0.2 (gates the drop-in out on EL7) "
                                     "OR `rm -f {}`".format(_USERNS_DROPIN_GLOB))
        return Check("systemd_restrict_namespaces", "MITIGATION", Status.SKIP,
                     "systemd {} predates RestrictNamespaces= (v235); "
                     "this layer not shipped on this distro".format(sv),
                     details={"systemd_version": sv})

    findings_ok = []
    findings_missing = []
    findings_optional_ok = []
    for d in CF_CLASS_TENANT_UNITS + CF_CLASS_OPTIONAL_UNITS:
        unit = d if d.endswith("@") else d
        rc, out, err = run_cmd(["systemctl", "show",
                                "-p", "RestrictNamespaces",
                                "{}.service".format(unit)], timeout=3)
        if rc != 0:
            continue
        kv = {}
        for line in out.decode("utf-8", errors="replace").splitlines():
            if "=" in line:
                k, v = line.split("=", 1)
                kv[k] = v
        rn = kv.get("RestrictNamespaces", "")
        if not rn:
            # Unit has no RestrictNamespaces directive set
            if d in CF_CLASS_TENANT_UNITS:
                findings_missing.append(d)
            continue
        if _unit_namespaces_blocked(rn):
            if d in CF_CLASS_TENANT_UNITS:
                findings_ok.append(d)
            else:
                findings_optional_ok.append(d)
        else:
            # v3.0.1: rn parsed but does not block both user+net (e.g.
            # systemd reported "no" because our 15-* drop-in had a
            # parse error, or operator override at 20- neutralised it).
            # Previously silently dropped; now flagged as missing so
            # the auditor surfaces the gap.
            if d in CF_CLASS_TENANT_UNITS:
                findings_missing.append(d)

    details = {"protected_tenant_units": findings_ok,
               "missing_tenant_units": findings_missing,
               "protected_optional_units": findings_optional_ok,
               "systemd_version": sv}
    if findings_ok and not findings_missing:
        return Check("systemd_restrict_namespaces", "MITIGATION", Status.OK,
                     "{} tenant units block user+net namespaces".format(
                         len(findings_ok)),
                     details=details)
    if findings_ok and findings_missing:
        return Check("systemd_restrict_namespaces", "MITIGATION", Status.WARN,
                     "partial coverage: {} protected, {} missing".format(
                         len(findings_ok), len(findings_missing)),
                     details=details,
                     remediation="Install rfxn-defense-systemd, OR add "
                                 "drop-ins under /etc/systemd/system/<unit>"
                                 ".service.d/10-rfxn-defense.conf with "
                                 "RestrictNamespaces=~user ~net")
    if findings_missing:
        return Check("systemd_restrict_namespaces", "MITIGATION", Status.WARN,
                     "no tenant unit blocks user+net namespaces "
                     "(cf2 / dirtyfrag-ESP unshare prerequisite reachable)",
                     details=details,
                     remediation="dnf install rfxn-defense-systemd, OR "
                                 "add drop-ins manually.")
    return Check("systemd_restrict_namespaces", "MITIGATION", Status.SKIP,
                 "no tenant units found in this systemd instance",
                 details=details)

# Files / globs to scan for PAM `pam_unix.so` `nullok` (dirtyfrag-RxRPC
# weaponizes empty-password auth via /etc/passwd corruption).
PAM_NULLOK_PATHS = [
    "/etc/pam.d/system-auth",
    "/etc/pam.d/password-auth",
    "/etc/pam.d/common-auth",
    "/etc/pam.d/login",
    "/etc/pam.d/sshd",
    "/etc/pam.d/passwd",
    "/etc/pam.d/su",
]
PAM_NULLOK_GLOBS = [
    "/etc/pam.d/cpanel*",
    "/etc/pam.d/plesk*",
]
_PAM_NULLOK_RE = re.compile(
    r"^\s*[a-z]+\s+(?:\[[^\]]+\]|sufficient|required|requisite|optional|include|substack)?"
    r"\s*pam_unix\.so\b[^#\n]*\bnullok\b",
    re.MULTILINE)

def check_pam_nullok():
    """dirtyfrag-RxRPC: scan PAM stacks for pam_unix.so nullok.

    The Dirty Frag RxRPC variant overwrites /etc/passwd line 1 to
    'root::0:0:...' (empty password field) and relies on pam_unix.so
    nullok to accept it. Stock RHEL authselect profiles do NOT include
    nullok; cPanel/Plesk PAM stacks have historically re-introduced it."""
    findings = []
    paths = list(PAM_NULLOK_PATHS)
    for g in PAM_NULLOK_GLOBS:
        paths += glob.glob(g)
    seen = set()
    for path in paths:
        if path in seen or not os.path.isfile(path):
            continue
        seen.add(path)
        text = read_text_safe(path) or ""
        for m in _PAM_NULLOK_RE.finditer(text):
            findings.append({"path": path,
                             "line": m.group(0).strip()[:120]})
    if not findings:
        return Check("pam_nullok", "DETECTION", Status.OK,
                     "no pam_unix.so nullok in scanned PAM stacks ({})".format(
                         len(seen)))
    return Check("pam_nullok", "DETECTION", Status.WARN,
                 "{} pam_unix.so nullok occurrence(s) - empty-password auth "
                 "accepted (dirtyfrag-RxRPC weapon)".format(len(findings)),
                 details={"occurrences": findings[:8],
                          "files_scanned": len(seen)},
                 remediation="Remove 'nullok' from pam_unix.so lines in the "
                             "listed PAM files. On RHEL with authselect, run "
                             "'authselect select sssd' or your site profile "
                             "to restore the canonical no-nullok stack.")

def check_unprivileged_userns_sysctl():
    """v2.0.0 HARDENING: report userns sysctl posture.

    INFO only - the -sysctl subpackage shipped the userns drop-in in
    v2.0.2 (suppressed by detect.sh when rootless containers / Flatpak
    / firejail / browser detected; v3.0.2 also gates it out entirely on
    EL7). Turning userns off breaks rootless podman, browser sandboxes,
    and flatpak; the check informs the operator without recommending
    action.

    v3.0.2: distinguish "blocked by rfxn-defense sysctl drop-in" from
    "blocked by kernel/distro default". On EL7 the kernel default is
    max_user_namespaces=0 and the rfxn-defense sysctl is gated out of
    the package; reporting OK without context made the operator believe
    rfxn-defense was doing the work."""
    rhel = read_text_safe("/proc/sys/user/max_user_namespaces")
    deb = read_text_safe("/proc/sys/kernel/unprivileged_userns_clone")
    parts = []
    if rhel is not None:
        parts.append("user.max_user_namespaces={}".format(rhel.strip()))
    if deb is not None:
        parts.append("kernel.unprivileged_userns_clone={}".format(deb.strip()))
    if not parts:
        return Check("userns_sysctl", "HARDENING", Status.SKIP,
                     "userns sysctls unreadable")
    rhel_blocked = (rhel is not None and rhel.strip() == "0")
    deb_blocked  = (deb  is not None and deb.strip()  == "0")
    rfxn_dropin = "/etc/sysctl.d/99-rfxn-defense-userns.conf"
    rfxn_present = os.path.isfile(rfxn_dropin)
    details = {"sysctls": parts,
               "rfxn_sysctl_dropin_present": rfxn_present,
               "rfxn_sysctl_dropin_path": rfxn_dropin}
    if rhel_blocked or deb_blocked:
        if rfxn_present:
            msg = ("unprivileged userns disabled by rfxn-defense sysctl "
                   "drop-in - cf2 / dirtyfrag-ESP unshare prerequisite blocked")
        else:
            msg = ("unprivileged userns disabled by kernel/distro default "
                   "(rfxn-defense sysctl drop-in NOT present; relying on "
                   "kernel default - operator override at runtime would "
                   "re-enable without rfxn-defense pulling it back)")
        return Check("userns_sysctl", "HARDENING", Status.OK, msg,
                     details=details)
    return Check("userns_sysctl", "HARDENING", Status.INFO,
                 "unprivileged userns enabled: " + "; ".join(parts) +
                 " (cf2 / dirtyfrag-ESP unshare prerequisite reachable)",
                 details=details)

def check_ptrace_scope():
    """v2.1.0 HARDENING: report kernel.yama.ptrace_scope posture.

    ptrace_scope=2 (admin-only) blocks the keysign-pwn class of cross-
    process /proc/PID/mem and ptrace(PTRACE_ATTACH) reach into agents
    holding unwrapped keys (gpg-agent, ssh-agent). =1 (restricted) still
    permits same-uid attach via prctl(PR_SET_PTRACER); =0 is unrestricted."""
    val = read_text_safe("/proc/sys/kernel/yama/ptrace_scope")
    if val is None:
        return Check("ptrace_scope", "HARDENING", Status.SKIP,
                     "/proc/sys/kernel/yama/ptrace_scope unreadable "
                     "(Yama LSM not enabled)")
    v = val.strip()
    if v == "2":
        return Check("ptrace_scope", "HARDENING", Status.OK,
                     "kernel.yama.ptrace_scope=2 (admin-only attach)",
                     details={"value": v})
    if v == "1":
        return Check("ptrace_scope", "HARDENING", Status.WARN,
                     "kernel.yama.ptrace_scope=1 (same-uid attach still "
                     "permitted; keysign-pwn agent reach partial)",
                     details={"value": v},
                     remediation="sysctl -w kernel.yama.ptrace_scope=2 ; "
                                 "ensure /etc/sysctl.d/"
                                 "99-rfxn-defense-ptrace.conf is loaded "
                                 "(v3.0.2: file ships without the `-` prefix "
                                 "so EL7 procps-ng 3.3.10 honours the key; "
                                 "older 3.0.x silently no-op'd on EL7)")
    return Check("ptrace_scope", "HARDENING", Status.FAIL,
                 "kernel.yama.ptrace_scope={} (unrestricted; keysign-pwn "
                 "agent reach unblocked)".format(v),
                 details={"value": v},
                 remediation="sysctl -w kernel.yama.ptrace_scope=2 ; "
                             "ensure /etc/sysctl.d/"
                             "99-rfxn-defense-ptrace.conf is loaded "
                             "(v3.0.2: file ships without the `-` prefix "
                             "so EL7 procps-ng 3.3.10 honours the key; "
                             "older 3.0.x silently no-op'd on EL7)")

def check_io_uring_disabled():
    """v2.1.1 MITIGATION: PinTheft secondary mitigation reporter.

    kernel.io_uring_disabled=2 closes io_uring as an exfil/IPC primitive.
    Auto-applied on hosts where no io_uring workload is detected and
    kernel >= 6.6. Suppressed elsewhere (rootless, userns consumers,
    io_uring workload signals, kernel too old).
    """
    raw = read_text_safe("/var/lib/rfxn-defense/auto-detect.json")
    try:
        detect_state = json.loads(raw) if raw else {}
    except (json.JSONDecodeError, TypeError):
        detect_state = {}
    sup_blob = (detect_state or {}).get("suppressed", {}).get("sysctl_iouring")
    # sysctl_iouring is a dict {suppressed, reason} (v2.1.1+) or
    # absent (older detect.sh), handle both.
    if isinstance(sup_blob, dict):
        suppressed = bool(sup_blob.get("suppressed", False))
        reason = sup_blob.get("reason", "unknown")
    else:
        suppressed = None
        reason = "no_detect_state"

    kernel_val = read_text_safe("/proc/sys/kernel/io_uring_disabled")
    key_present = kernel_val is not None

    if not key_present:
        # Kernel < 6.6 or built without io_uring. Sysctl drop-in is a
        # no-op courtesy of the '-' prefix.
        return Check("io_uring_disabled", "MITIGATION", Status.SKIP,
                     "/proc/sys/kernel/io_uring_disabled not present "
                     "(kernel < 6.6 or CONFIG_IO_URING=n)",
                     details={"detect_reason": reason})

    v = kernel_val.strip()
    if v in ("1", "2"):
        return Check("io_uring_disabled", "MITIGATION", Status.OK,
                     "kernel.io_uring_disabled={} (PinTheft secondary "
                     "mitigation active)".format(v),
                     details={"value": v, "detect_reason": reason})

    # v == "0", io_uring enabled.
    if suppressed is True:
        return Check("io_uring_disabled", "MITIGATION", Status.INFO,
                     "io_uring_disabled=0; auto-suppressed (reason={}) "
                     "- PinTheft secondary mitigation off by design".format(reason),
                     details={"value": v, "detect_reason": reason})
    if suppressed is False:
        return Check("io_uring_disabled", "MITIGATION", Status.FAIL,
                     "io_uring_disabled=0 but auto-detect chose to apply "
                     "(reason={}); /etc/sysctl.d/99-rfxn-defense-iouring.conf "
                     "missing or unreadable".format(reason),
                     details={"value": v, "detect_reason": reason},
                     remediation="sysctl -p /etc/sysctl.d/99-rfxn-defense-iouring.conf "
                                 "; run rfxn-redetect")
    # suppressed is None, older detect.sh or no install.
    return Check("io_uring_disabled", "MITIGATION", Status.INFO,
                 "io_uring_disabled=0; detect.sh state unavailable",
                 details={"value": v, "detect_reason": reason})

def check_apparmor_userns_restrict():
    """ENV: Ubuntu/Debian apparmor userns posture.

    Ubuntu's apparmor_restrict_unprivileged_userns gates apparmor's
    permission to confine unprivileged user-namespace creation. cf2
    / Dirty Frag's aa-rootns harness defeats this via change_onexec
    profile-hops, so apparmor alone is not sufficient. Detection
    matters because an operator may believe they're protected when
    they're only partially so."""
    val = read_text_safe("/proc/sys/kernel/apparmor_restrict_unprivileged_userns")
    if val is None:
        return None  # Not Ubuntu/Debian, or apparmor not loaded
    v = val.strip()
    if v == "1":
        return Check("apparmor_userns_restrict", "ENV", Status.INFO,
                     "apparmor_restrict_unprivileged_userns=1 "
                     "(bypassable via change_onexec into permissive profiles "
                     "- see Dirty Frag aa-rootns)",
                     details={"value": v})
    return Check("apparmor_userns_restrict", "ENV", Status.INFO,
                 "apparmor_restrict_unprivileged_userns={} (no apparmor "
                 "userns gating)".format(v),
                 details={"value": v})

def _has_non_admin_login_users():
    """Heuristic: are there interactive users not in wheel/admin?

    cf2/dirtyfrag-ESP target /usr/bin/su; chmod 4750 is the obvious
    hardening. But on cPanel/hosting nodes there are many tenant
    users with shells who legitimately use su; chmod-ing 4750 breaks
    them. We suppress the recommendation when this heuristic detects
    such a fleet shape."""
    pwd = read_text_safe("/etc/passwd") or ""
    grp = read_text_safe("/etc/group") or ""
    wheel_members = set()
    for line in grp.splitlines():
        parts = line.split(":")
        if len(parts) >= 4 and parts[0] in ("wheel", "admin", "sudo"):
            for u in parts[3].split(","):
                u = u.strip()
                if u:
                    wheel_members.add(u)
    interactive_shells = ("/bin/bash", "/bin/sh", "/bin/zsh", "/bin/ash",
                          "/bin/dash", "/usr/bin/bash", "/usr/bin/zsh")
    non_admin_login_count = 0
    for line in pwd.splitlines():
        parts = line.split(":")
        if len(parts) < 7:
            continue
        user, _, uid_s, _, _, _, shell = parts[:7]
        try:
            uid = int(uid_s)
        except ValueError:
            continue
        if uid < 1000:                # system user
            continue
        if shell.strip() not in interactive_shells:
            continue
        if user in wheel_members:
            continue
        non_admin_login_count += 1
    return non_admin_login_count

def check_su_target_hardening():
    """HARDENING: /usr/bin/su mode + ownership; recommend conditional chmod.

    Recommendation suppressed when /etc/passwd analysis shows non-wheel
    interactive users (cPanel-shaped fleet)."""
    p = "/usr/bin/su"
    try:
        st = os.stat(p)
    except OSError:
        return Check("su_target_hardening", "HARDENING", Status.SKIP,
                     "/usr/bin/su not found")
    mode = stat.S_IMODE(st.st_mode)
    is_setuid = bool(mode & stat.S_ISUID)
    other_exec = bool(mode & stat.S_IXOTH)
    group = st.st_gid
    details = {
        "path": p,
        "mode_octal": oct(mode),
        "uid": st.st_uid,
        "gid": st.st_gid,
        "setuid": is_setuid,
        "world_executable": other_exec,
    }
    if not is_setuid:
        return Check("su_target_hardening", "HARDENING", Status.OK,
                     "/usr/bin/su is not setuid (already hardened)",
                     details=details)
    if not other_exec:
        return Check("su_target_hardening", "HARDENING", Status.OK,
                     "/usr/bin/su is mode {} - non-world-exec".format(
                         oct(mode)),
                     details=details)
    # Setuid root + world-executable. Decide whether to recommend hardening.
    non_admin = _has_non_admin_login_users()
    details["non_admin_login_users"] = non_admin
    if non_admin > 0:
        return Check("su_target_hardening", "HARDENING", Status.INFO,
                     "/usr/bin/su is setuid + world-exec (cf2 / dirtyfrag-ESP "
                     "target). Hardening NOT recommended: {} non-admin "
                     "interactive users present (chmod 4750 would break their "
                     "su workflow)".format(non_admin),
                     details=details)
    return Check("su_target_hardening", "HARDENING", Status.WARN,
                 "/usr/bin/su is setuid + world-exec (cf2 / dirtyfrag-ESP "
                 "target). No non-admin interactive users detected.",
                 details=details,
                 remediation="chmod 4750 /usr/bin/su && chgrp wheel /usr/bin/su "
                             "(restrict invocation to wheel group). Confirm "
                             "no automation depends on tenant su before applying.")

def check_auditd_rules_extended():
    """DETECTION: report on cf-class auditd rule keys.

    Rules are NOT installed by the package - emitted by --emit-remediation
    for operator review. Keys: cf_userns (unshare CLONE_NEWUSER),
    cf_addkey (rxrpc key registration), plus the existing afalg_attempt
    from v1.0.1."""
    rc, out, err = run_cmd(["auditctl", "-l"], timeout=3)
    if rc != 0:
        return Check("audit_rules_extended", "DETECTION", Status.SKIP,
                     "auditctl unavailable or returned error")
    text = out.decode("utf-8", errors="replace") if out else ""
    keys_found = set()
    for line in text.splitlines():
        m = re.search(r"-k\s+(\S+)", line)
        if m:
            keys_found.add(m.group(1))
    # cf-class audit keys. Names align with what --emit-remediation
    # writes to /etc/audit/rules.d/rfxn-defense.rules. Operators may run
    # different keys; we report on absence of OUR canonical set, not
    # on absence of any related rule.
    cf_keys = ["afalg_attempt", "cf_userns", "cf_addkey", "cf_xfrm_nl",
               "splice_tenant"]
    missing = [k for k in cf_keys if k not in keys_found]
    if not missing:
        return Check("audit_rules_extended", "DETECTION", Status.OK,
                     "all cf-class audit rules present: {}".format(
                         ", ".join(cf_keys)),
                     details={"present": sorted(keys_found & set(cf_keys))})
    return Check("audit_rules_extended", "DETECTION", Status.INFO,
                 "cf-class audit rules missing: {}".format(", ".join(missing)),
                 details={"present": sorted(keys_found & set(cf_keys)),
                          "missing": missing},
                 remediation="See --emit-remediation for the exact "
                             "auditctl/augenrules invocations.")

def check_pidfd_getfd_auditd_rule():
    """v2.1.0 DETECTION: report on pidfd_getfd audit rule presence.

    pidfd_getfd(2) lets a tracer steal an open fd from a tracee, the
    primitive behind the pintheft class (extract live TLS pins, agent
    socket fds, keyring handles). The rule key rfxn_pidfd_getfd is
    written by --emit-remediation and matches the canonical augenrules
    snippet shipped by the operator-side guidance."""
    rc, out, err = run_cmd(["auditctl", "-l"], timeout=3)
    if rc != 0:
        return Check("pidfd_getfd_auditd_rule", "DETECTION", Status.SKIP,
                     "auditctl unavailable or returned error")
    text = out.decode("utf-8", errors="replace") if out else ""
    key = "rfxn_pidfd_getfd"
    found = False
    for line in text.splitlines():
        if re.search(r"-k\s+" + re.escape(key) + r"\b", line) or \
           re.search(r"key=" + re.escape(key) + r"\b", line):
            found = True
            break
    if found:
        return Check("pidfd_getfd_auditd_rule", "DETECTION", Status.OK,
                     "pidfd_getfd audit rule loaded (key={})".format(key),
                     details={"key": key})
    return Check("pidfd_getfd_auditd_rule", "DETECTION", Status.FAIL,
                 "pidfd_getfd audit rule absent (pintheft primitive "
                 "unobserved)",
                 details={"key": key},
                 remediation="See --emit-remediation for the auditctl/"
                             "augenrules invocation that installs "
                             "key={}.".format(key))

def check_initcall_blacklist():
    """v2.0.0: parse /proc/cmdline for initcall_blacklist= covering
    cf-class entry-point modules.

    Per CERT-EU 2026-005 + CloudLinux guidance: on some distros, modprobe
    blacklist alone is insufficient because the modules are wired into
    the kernel via initcall_*. Operators can append `initcall_blacklist=
    algif_aead_init,esp4_init,esp6_init,rxrpc_init` to GRUB to neutralize
    these at boot."""
    cmdline = read_text_safe("/proc/cmdline") or ""
    m = re.search(r"\binitcall_blacklist=(\S+)", cmdline)
    if not m:
        return Check("initcall_blacklist", "MITIGATION", Status.INFO,
                     "no initcall_blacklist= in /proc/cmdline (defense in "
                     "depth gap on kernels where cf-class modules are "
                     "compiled in)",
                     remediation="Append to GRUB_CMDLINE_LINUX in "
                                 "/etc/default/grub: "
                                 "initcall_blacklist=algif_aead_init,"
                                 "esp4_init,esp6_init,rxrpc_init "
                                 "then grub2-mkconfig + reboot. Coverage is "
                                 "boot-permanent until the line is removed.")
    inits = m.group(1).split(",")
    cf_inits = ["algif_aead_init", "esp4_init", "esp6_init", "rxrpc_init"]
    covered = [i for i in cf_inits if i in inits]
    missing = [i for i in cf_inits if i not in inits]
    if missing:
        return Check("initcall_blacklist", "MITIGATION", Status.INFO,
                     "initcall_blacklist=set but missing: {}".format(
                         ", ".join(missing)),
                     details={"covered": covered, "missing": missing,
                              "all_initcalls": inits})
    return Check("initcall_blacklist", "MITIGATION", Status.OK,
                 "initcall_blacklist= covers all cf-class init paths",
                 details={"covered": covered, "all_initcalls": inits})

# Legitimate AF_ALG users on a stock RHEL host. Anything OUTSIDE this list
# is suspicious (per Sysdig CVE-2026-31431 scoping guidance).
AF_ALG_LEGITIMATE = (
    "cryptsetup", "systemd-cryptsetup", "veritysetup", "integritysetup",
    "kcapi-",
)

def check_af_alg_holders():
    """v2.0.0: snapshot processes currently holding AF_ALG sockets.

    Useful as a baseline before enabling the shim, operators can
    confirm no legitimate process actively uses AF_ALG before enforcing
    a global block. Best-effort: depends on `ss` or `lsof`; degrades to
    SKIP if neither is available."""
    rc, out, err = run_cmd(["ss", "-x", "-a", "-p"], timeout=4)
    if rc != 0:
        rc, out, err = run_cmd(["lsof", "-nP", "+c0"], timeout=8)
        if rc != 0:
            return Check("af_alg_holders", "DETECTION", Status.SKIP,
                         "ss/lsof unavailable - cannot inventory AF_ALG "
                         "socket holders")
    text = out.decode("utf-8", errors="replace") if out else ""
    holders = []
    suspicious = []
    for line in text.splitlines():
        if "AF_ALG" not in line and "alg:" not in line:
            continue
        # Try to extract a process name from the line. Format varies
        # across ss / lsof; we keep it best-effort.
        proc = ""
        m = re.search(r'"([^"]+)"', line)
        if m:
            proc = m.group(1)
        else:
            toks = line.split()
            if toks:
                proc = toks[0]
        if not proc:
            continue
        holders.append(proc)
        if not any(proc.startswith(p) for p in AF_ALG_LEGITIMATE):
            suspicious.append(proc)
    if not holders:
        return Check("af_alg_holders", "DETECTION", Status.OK,
                     "no live AF_ALG socket holders")
    if suspicious:
        return Check("af_alg_holders", "DETECTION", Status.WARN,
                     "{} suspicious AF_ALG socket holder(s): {}".format(
                         len(set(suspicious)), ", ".join(sorted(set(suspicious))[:5])),
                     details={"all_holders": sorted(set(holders)),
                              "suspicious": sorted(set(suspicious))})
    return Check("af_alg_holders", "DETECTION", Status.INFO,
                 "{} AF_ALG socket holder(s), all in expected allowlist".format(
                     len(set(holders))),
                 details={"holders": sorted(set(holders))})

def check_kernel_log_iocs():
    """v2.0.0: scan dmesg / /var/log/messages for cf-class kernel events.

    Specific signatures from Threatbear + thrandomv detection packs:
    - alg: api / alg-aead module load events
    - xfrm_user: Init / xfrm-policy add events
    - ESP: Init / IPv6 ESP autoload markers
    - 'Key type rxrpc registered' (rxrpc.ko first-load marker)
    - audit: type=1326 (seccomp violation) clusters
    Best-effort: tries dmesg first, falls back to /var/log/messages."""
    rc, out, err = run_cmd(["dmesg", "-T"], timeout=4)
    if rc == 0 and out:
        text = out.decode("utf-8", errors="replace")
    else:
        # Single-pass fallback: read /var/log/messages directly.
        text = read_text_safe("/var/log/messages",
                              max_bytes=2 * 1024 * 1024) or ""
    if not text:
        return Check("kernel_log_iocs", "DETECTION", Status.SKIP,
                     "kernel log unavailable (dmesg requires CAP_SYSLOG; "
                     "/var/log/messages unreadable)")

    patterns = [
        ("algif_aead_load",     re.compile(r"alg:\s+(?:Test|api).*aead", re.IGNORECASE)),
        ("xfrm_user_init",      re.compile(r"\bxfrm_user\b", re.IGNORECASE)),
        ("esp_init",            re.compile(r"\bIPsec\s*ESP\b|\bESP[46]?\s*Init\b", re.IGNORECASE)),
        ("rxrpc_register",      re.compile(r"Key type rxrpc registered", re.IGNORECASE)),
        ("seccomp_violation",   re.compile(r"audit.*type=1326")),
    ]
    hits = {}
    for name, pat in patterns:
        n = sum(1 for _ in pat.finditer(text))
        if n:
            hits[name] = n
    if not hits:
        return Check("kernel_log_iocs", "DETECTION", Status.OK,
                     "no cf-class IOC patterns in kernel log sample")
    return Check("kernel_log_iocs", "DETECTION", Status.INFO,
                 "kernel log has cf-class load/probe markers: " +
                 ", ".join("{}={}".format(k, v) for k, v in hits.items()),
                 details={"hits": hits})

def check_lsm_stack():
    """v2.0.0: report active LSM stack.

    bpf-lsm enables Cloudflare-style AF_ALG socket_bind allowlisting
    (out of scope for this auditor's enforcement, but operators
    deploying that defense need bpf in the LSM stack)."""
    val = read_text_safe("/sys/kernel/security/lsm")
    if val is None:
        return Check("lsm_stack", "ENV", Status.SKIP,
                     "/sys/kernel/security/lsm unreadable "
                     "(securityfs not mounted, or no CAP_SYS_ADMIN)")
    lsms = val.strip().split(",")
    has_bpf = "bpf" in lsms
    return Check("lsm_stack", "ENV", Status.INFO,
                 "active LSMs: {} (bpf-lsm: {})".format(
                     ", ".join(lsms), "available" if has_bpf else "absent"),
                 details={"lsms": lsms, "bpf_lsm": has_bpf})

# --- Orchestration ---------------------------------------------------------

def run_all_checks(args):
    cats = (set(c.upper().strip() for c in args.category.split(","))
            if args.category else None)
    results = []

    def add_one(check, category):
        if cats and category not in cats:
            return
        if check is not None:
            results.append(check)

    def add_many(checks, category):
        if cats and category not in cats:
            return
        for c in checks or []:
            if c is not None:
                results.append(c)

    PROGRESS.step("collecting environment info")
    add_many(check_environment(), "ENV")
    # v2.0.0: apparmor userns posture (Ubuntu/Debian only; returns None on
    # non-apparmor hosts and add_one filters it).
    PROGRESS.step("checking apparmor userns posture")
    add_one(check_apparmor_userns_restrict(), "ENV")
    # v2.0.0: active LSM stack (bpf-lsm informs Cloudflare-style enforcement)
    PROGRESS.step("reading active LSM stack")
    add_one(check_lsm_stack(), "ENV")

    PROGRESS.step("probing AF_ALG socket family")
    add_one(check_af_alg_socket(), "KERNEL")
    PROGRESS.step("probing authencesn cipher")
    add_one(check_authencesn_cipher(), "KERNEL")
    PROGRESS.step("checking algif_aead state")
    add_one(check_algif_aead_state(), "KERNEL")
    # v2.0.0: cf2 / dirtyfrag-ESP module reachability
    PROGRESS.step("checking xfrm-ESP module state (cf2 / dirtyfrag-ESP)")
    add_one(check_xfrm_modules(), "KERNEL")
    # v2.0.0: dirtyfrag-RxRPC module reachability
    PROGRESS.step("checking rxrpc module state (dirtyfrag-RxRPC)")
    add_one(check_rxrpc_module(), "KERNEL")
    if not args.skip_trigger:
        PROGRESS.step("running trigger probe (sentinel file, AEAD/splice)")
        add_one(trigger_probe(), "KERNEL")

    PROGRESS.step("checking /etc/ld.so.preload")
    add_one(check_ld_so_preload(), "MITIGATION")
    PROGRESS.step("testing if shim blocks AF_ALG")
    add_one(check_shim_blocks_af_alg(), "MITIGATION")
    PROGRESS.step("scanning modprobe.d for AF_ALG blacklist")
    add_one(check_modprobe_blacklist(), "MITIGATION")
    # v2.0.0: cf-class extended modprobe coverage
    PROGRESS.step("scanning modprobe.d for cf-class extended coverage")
    add_one(check_modprobe_blacklist_extended(), "MITIGATION")
    # v2.0.1: auto-detect.json state
    PROGRESS.step("reading auto-detect.json state")
    add_one(check_auto_detect_state(), "MITIGATION")
    # v2.1.0: pintheft - AF_RDS modprobe + systemd restrictions
    PROGRESS.step("checking AF_RDS modprobe drop-in (pintheft)")
    add_one(check_rds_modprobe(), "MITIGATION")
    PROGRESS.step("checking AF_RDS systemd restriction (pintheft)")
    add_one(check_af_rds_restrict(), "MITIGATION")
    # v2.1.1: pintheft - io_uring disable (secondary mitigation)
    PROGRESS.step("checking kernel.io_uring_disabled (pintheft secondary)")
    add_one(check_io_uring_disabled(), "MITIGATION")
    PROGRESS.step("checking kernel.modules_disabled")
    add_one(check_modules_disabled(), "MITIGATION")
    # v2.0.0: initcall_blacklist= GRUB-line check (CERT-EU 2026-005 +
    # CloudLinux guidance for builtin-module kernels)
    PROGRESS.step("checking initcall_blacklist= in /proc/cmdline")
    add_one(check_initcall_blacklist(), "MITIGATION")
    PROGRESS.step("checking systemd RestrictAddressFamilies")
    add_one(check_systemd_restrict_address_families(), "MITIGATION")
    # v2.0.0: cf2 / dirtyfrag-ESP RestrictNamespaces coverage
    PROGRESS.step("checking systemd RestrictNamespaces (cf2/dirtyfrag-ESP)")
    add_one(check_systemd_restrict_namespaces(), "MITIGATION")
    PROGRESS.step("checking user@.service drop-in")
    add_one(check_user_service_dropin(), "MITIGATION")
    PROGRESS.step("checking systemd drop-in freshness vs running daemons")
    add_one(check_dropin_freshness(), "MITIGATION")

    if not args.skip_hardening:
        PROGRESS.step("inventorying setuid binaries")
        add_one(check_suid_inventory(), "HARDENING")
        PROGRESS.step("checking page-cache integrity (privilege configs)")
        add_many(check_page_cache_integrity(), "HARDENING")
        PROGRESS.step("scanning file capabilities")
        add_one(check_file_capabilities(), "HARDENING")
        # v2.0.0: cf2 / dirtyfrag-ESP /usr/bin/su hardening (conditional)
        PROGRESS.step("checking /usr/bin/su hardening (conditional)")
        add_one(check_su_target_hardening(), "HARDENING")
        # v2.0.0: unprivileged userns sysctl posture
        PROGRESS.step("checking unprivileged userns sysctls")
        add_one(check_unprivileged_userns_sysctl(), "HARDENING")
        # v2.1.0: keysign-pwn - kernel.yama.ptrace_scope posture
        PROGRESS.step("checking kernel.yama.ptrace_scope (keysign-pwn)")
        add_one(check_ptrace_scope(), "HARDENING")

    PROGRESS.step("checking auditd state and rules")
    add_many(check_auditd(), "DETECTION")
    # v2.0.0: cf-class extended auditd rule keys
    PROGRESS.step("checking cf-class auditd rules (cf_userns, cf_addkey)")
    add_one(check_auditd_rules_extended(), "DETECTION")
    # v2.1.0: pintheft - pidfd_getfd audit rule
    PROGRESS.step("checking pidfd_getfd audit rule (pintheft)")
    add_one(check_pidfd_getfd_auditd_rule(), "DETECTION")
    PROGRESS.step("verifying seccomp filter active on running daemons")
    add_one(check_seccomp_runtime(), "DETECTION")
    # v2.0.0: dirtyfrag-RxRPC PAM nullok scan
    PROGRESS.step("scanning PAM stacks for pam_unix.so nullok")
    add_one(check_pam_nullok(), "DETECTION")
    # v2.0.0: live AF_ALG socket holder snapshot (Sysdig scoping)
    PROGRESS.step("inventorying live AF_ALG socket holders")
    add_one(check_af_alg_holders(), "DETECTION")
    # v2.0.0: kernel log IOC scan (Threatbear / thrandomv signatures)
    PROGRESS.step("scanning kernel log for cf-class load/probe markers")
    add_one(check_kernel_log_iocs(), "DETECTION")
    PROGRESS.step("scanning recent log IOCs")
    add_one(check_recent_ioc_signals(), "DETECTION")

    PROGRESS.done()
    return results

def determine_exit_code(results):
    has_vuln = any(r.status == Status.VULN for r in results)
    has_fail = any(r.status == Status.FAIL for r in results)
    shim_blocks = any(r.name == "shim_blocks_af_alg" and r.status == Status.OK
                      for r in results)
    if has_vuln:
        return 3 if shim_blocks else 2
    if has_fail:
        return 4
    return 0

# Map check names to posture-layer slots. Each slot is "ok" if the named
# check is OK, "missing" otherwise. Lets a SIEM / dashboard consumer answer
# "what defenses does this host actually have?" without re-implementing
# our verdict logic on top of 50 raw check results.
POSTURE_LAYERS = [
    ("kernel_patched",       "trigger_probe"),
    ("af_alg_unreachable",   "af_alg_socket"),
    ("modprobe_blacklist",   "modprobe_blacklist"),
    ("ld_preload_shim",      "shim_blocks_af_alg"),
    ("systemd_restriction",  "systemd_restrict"),
    ("user_service_dropin",  "user_service_dropin"),
    ("seccomp_runtime",      "seccomp_runtime"),
    ("auditd_running",       "auditd_running"),
    ("audit_rule_af_alg",    "audit_rule_af_alg"),
]

def determine_posture(results, category_filter_active=False):
    by_name = {r.name: r for r in results}
    layers = {}
    for slot, check_name in POSTURE_LAYERS:
        r = by_name.get(check_name)
        if r is None:
            layers[slot] = "not_evaluated"
        elif r.status == Status.OK:
            layers[slot] = "ok"
        elif r.status == Status.SKIP:
            layers[slot] = "skipped"
        else:
            layers[slot] = "missing"
    # Headline verdict, derived from layers + raw results.
    has_vuln = any(r.status == Status.VULN for r in results)
    if has_vuln and layers.get("ld_preload_shim") == "ok":
        verdict = "vulnerable_kernel_userspace_mitigated"
    elif has_vuln:
        verdict = "vulnerable"
    elif layers.get("kernel_patched") == "ok":
        verdict = "patched"
    elif layers.get("af_alg_unreachable") == "ok":
        verdict = "kernel_likely_safe"
    else:
        verdict = "inconclusive"
    # When --category narrows the run, the bug-class aggregator can't
    # see the full picture (e.g., MITIGATION checks omitted on a
    # KERNEL-only run). Mark mitigated/applicable as None and emit
    # filter_applied so SIEM consumers don't treat partial runs as
    # "vulnerable".
    bug_classes = _aggregate_bug_classes(by_name)
    if category_filter_active:
        for cls in bug_classes:
            bug_classes[cls]["mitigated"] = None
            bug_classes[cls]["filter_applied"] = True
        covered = []
    else:
        covered = sorted([cls for cls, info in bug_classes.items()
                          if info.get("mitigated") is True])
    auto_detect_summary = _summarize_auto_detect(by_name)
    return {
        "verdict": verdict,
        "layers": layers,
        "bug_classes": bug_classes,
        "bug_classes_covered": covered,
        "auto_detect": auto_detect_summary,
    }

def _summarize_auto_detect(by_name):
    """v2.0.1: summarize auto_detect_state check for posture surface.

    Returns the summarized fields the SIEM/dashboard wants without
    embedding the full raw JSON file - that's what the .details on
    the underlying check carries.

    Rev 2 fixup (D-53 / M-3): exposes schema_unrecognized field for
    SIEM to filter on schema-rejection events without parsing the
    raw JSON file."""
    r = by_name.get("auto_detect_state")
    if r is None or r.status == Status.SKIP:
        return {"available": False}
    details = r.details or {}
    return {
        "available": True,
        "force_full": details.get("force_full", False),
        "detected_workloads": details.get("detected_workloads", []),
        "suppressed_mitigations": details.get("suppressed_mitigations", []),
        "schema_unrecognized": details.get("schema_unrecognized", False),
    }

def _aggregate_bug_classes(by_name):
    """Per-class applicability + mitigation summary for SIEM consumers.

    A class is `applicable` when its kernel sink is reachable on the
    host. A class is `mitigated` when at least one of the layers that
    cuts the chain is OK. The map is consumed via posture.bug_classes
    (granular) and posture.bug_classes_covered (array of mitigated
    class IDs - SIEM-ergonomic single filter)."""
    def is_ok(name):
        r = by_name.get(name)
        return r is not None and r.status == Status.OK

    def status_of(name):
        r = by_name.get(name)
        return r.status if r is not None else None

    # cf1 - CVE-2026-31431 / algif_aead
    # v3.0.1: applicability is determined by the KERNEL's ability to
    # produce the sink (algif_aead loaded / builtin) - NOT by whether
    # the userspace socket() probe succeeds. The shim returning EPERM
    # is a mitigation signal, not a "sink not reachable" signal: the
    # kernel still has algif_aead in-tree, the shim just prevents
    # unprivileged programs from reaching it. Treating shim-EPERM as
    # "n/a" hides the shim's contribution from the layer credit.
    aead_state = status_of("algif_aead_state")
    cf1_app = aead_state in (Status.WARN, Status.FAIL, Status.VULN) or \
              (status_of("trigger_probe") == Status.VULN)
    cf1_mit = is_ok("shim_blocks_af_alg") or is_ok("systemd_restrict") or \
              is_ok("modprobe_extended") or is_ok("modprobe_blacklist") or \
              is_ok("trigger_probe")  # patched kernel

    # cf2 / dirtyfrag-ESP - xfrm-ESP skip_cow path
    xfrm_state = by_name.get("xfrm_modules")
    cf2_app = xfrm_state is not None and xfrm_state.status != Status.OK
    cf2_mit = is_ok("modprobe_extended") or \
              is_ok("systemd_restrict_namespaces") or \
              is_ok("userns_sysctl")

    # dirtyfrag-RxRPC - rxkad pcbc(fcrypt) on splice'd frag
    rxrpc_state = by_name.get("rxrpc_module")
    rxrpc_app = rxrpc_state is not None and rxrpc_state.status != Status.OK
    rxrpc_mit = is_ok("modprobe_extended") or \
                (rxrpc_state is not None and rxrpc_state.status == Status.OK)

    # pintheft - AF_RDS + pidfd_getfd live-key exfiltration class
    pintheft_app = (status_of("rds_modprobe") not in (Status.OK, None)) or \
                   (status_of("af_rds_restrict") not in (Status.OK, None))
    pintheft_mit = is_ok("rds_modprobe") or is_ok("af_rds_restrict")

    # keysign-pwn - ptrace/PROC_MEM reach into key-holding agents
    keysign_app = status_of("ptrace_scope") not in (Status.OK, None)
    keysign_mit = is_ok("ptrace_scope")

    # Per-class layer breakdown (which mitigations are active for each class).
    # Surfaces in the holistic view + JSON for SIEM/dashboard consumption.
    cf1_layers = {
        "kernel_patched":         status_of("trigger_probe") == Status.OK,
        "ld_preload_shim":        is_ok("shim_blocks_af_alg"),
        "systemd_af_alg":         is_ok("systemd_restrict"),
        "modprobe_blacklist":     is_ok("modprobe_extended") or is_ok("modprobe_blacklist"),
        "modules_disabled":       is_ok("modules_disabled"),
    }
    cf2_layers = {
        "modprobe_blacklist":     is_ok("modprobe_extended"),
        "systemd_restrict_ns":    is_ok("systemd_restrict_namespaces"),
        "userns_sysctl":          is_ok("userns_sysctl"),
        "user_at_dropin":         is_ok("user_service_dropin"),
    }
    rxrpc_layers = {
        "modprobe_blacklist":     is_ok("modprobe_extended"),
        "systemd_af_rxrpc":       is_ok("systemd_restrict_namespaces"),  # same drop-in body covers both
        "rxrpc_module_absent":    rxrpc_state is not None and rxrpc_state.status == Status.OK,
    }
    pintheft_layers = {
        "rds_modprobe":           is_ok("rds_modprobe"),
        "af_rds_restrict":        is_ok("af_rds_restrict"),
    }
    keysign_layers = {
        "ptrace_scope":           is_ok("ptrace_scope"),
    }

    return {
        "cf1": {
            "applicable": bool(cf1_app),
            "mitigated":  bool(cf1_mit) if cf1_app else None,
            "kernel_sink": "algif_aead AEAD scratch-write (CVE-2026-31431)",
            "layers": {k: bool(v) for k, v in cf1_layers.items()},
        },
        "cf2": {
            "applicable": bool(cf2_app),
            "mitigated":  bool(cf2_mit) if cf2_app else None,
            "kernel_sink": "esp_input skip_cow path (xfrm IPsec ESP)",
            "layers": {k: bool(v) for k, v in cf2_layers.items()},
        },
        "dirtyfrag-esp": {
            "applicable": bool(cf2_app),  # same kernel sink as cf2
            "mitigated":  bool(cf2_mit) if cf2_app else None,
            "kernel_sink": "esp_input skip_cow path (Dirty Frag-ESP, same sink as cf2)",
            "layers": {k: bool(v) for k, v in cf2_layers.items()},
        },
        "dirtyfrag-rxrpc": {
            "applicable": bool(rxrpc_app),
            "mitigated":  bool(rxrpc_mit) if rxrpc_app else None,
            "kernel_sink": "rxkad_verify_packet_1 in-place pcbc(fcrypt)",
            "layers": {k: bool(v) for k, v in rxrpc_layers.items()},
        },
        "pintheft": {
            "applicable": bool(pintheft_app),
            "mitigated":  bool(pintheft_mit) if pintheft_app else None,
            "kernel_sink": "AF_RDS socket + pidfd_getfd live-key exfiltration",
            "layers": {k: bool(v) for k, v in pintheft_layers.items()},
        },
        "keysign-pwn": {
            "applicable": bool(keysign_app),
            "mitigated":  bool(keysign_mit) if keysign_app else None,
            "kernel_sink": "ptrace/PROC_MEM reach into key-holding agents",
            "layers": {k: bool(v) for k, v in keysign_layers.items()},
        },
    }

def emit_remediation_script(results, category_filter_active=False):
    """Print a bash script of the remediations from non-OK checks.

    Aggregates Check.remediation strings as comments + commented-out
    commands. Fleet operators are expected to review before pasting:
    several remediations (chmod 4750 on suid binaries, modules_disabled
    sysctl) are operationally consequential and policy-dependent.
    """
    posture = determine_posture(results, category_filter_active)
    lines = [
        "#!/bin/bash",
        "# Auto-generated remediation suggestions for the Copy Fail",
        "# bug class (cf1 / cf2 / Dirty Frag).",
        "# rfxn.com - forged in prod - github.com/rfxn/rfxn-defense",
        "# Hostname: {}    Kernel: {}".format(
            os.uname().nodename, os.uname().release),
        "# Verdict:  {}".format(posture["verdict"]),
        "# Coverage: " + (", ".join(posture.get("bug_classes_covered", []))
                          or "(none)"),
        "#",
        "# REVIEW EVERY BLOCK BEFORE RUNNING. Some changes (chmod on suid",
        "# binaries, kernel.modules_disabled=1) are policy-dependent or",
        "# require a reboot to recover from. Lines are commented out by",
        "# default; uncomment the ones you actually want to apply.",
        "",
        "set -euo pipefail",
        "",
    ]
    actionable = [r for r in results
                  if r.status not in (Status.OK, Status.INFO, Status.SKIP)
                  and r.remediation]
    if not actionable:
        lines.append("# No actionable remediations - posture appears clean.")
        print("\n".join(lines))
        return
    for r in actionable:
        lines.append("# ----------------------------------------------------")
        lines.append("# [{}/{}] {}: {}".format(
            r.status.upper(), r.category, r.name, r.message))
        for ln in r.remediation.splitlines():
            lines.append("# " + ln)
        lines.append("")
    # Append a small block of canonical commands the auditor knows are
    # safe to run unattended; keep them commented so review remains
    # mandatory. v2.0.0: extended for cf-class coverage.
    lines += [
        "# === Canonical commands (review and uncomment to apply) =====",
        "#",
        "# # FAST PATH: install the rfxn-defense umbrella package",
        "# # (covers everything below):",
        "# curl -sSL https://rfxn.github.io/rfxn-defense/rfxn-defense.repo \\",
        "#   | sudo tee /etc/yum.repos.d/rfxn-defense.repo",
        "# sudo dnf install -y rfxn-defense",
        "# sudo /usr/sbin/rfxn-shim-enable",
        "#",
        "# # MANUAL PATH (if you can't install the package):",
        "#",
        "# # 1. cf-class modprobe blacklist (cf1 + cf2 + Dirty Frag):",
        "# sudo tee /etc/modprobe.d/99-rfxn-defense.conf >/dev/null <<'EOF'",
        "# install algif_aead   /bin/false",
        "# install authenc      /bin/false",
        "# install authencesn   /bin/false",
        "# install af_alg       /bin/false",
        "# install esp4         /bin/false",
        "# install esp6         /bin/false",
        "# install xfrm_user    /bin/false",
        "# install xfrm_algo    /bin/false",
        "# install rxrpc        /bin/false",
        "# blacklist algif_aead",
        "# blacklist authenc",
        "# blacklist authencesn",
        "# blacklist af_alg",
        "# blacklist esp4",
        "# blacklist esp6",
        "# blacklist xfrm_user",
        "# blacklist xfrm_algo",
        "# blacklist rxrpc",
        "# EOF",
        "# for m in algif_aead authenc authencesn af_alg esp4 esp6 \\",
        "#          xfrm_user xfrm_algo rxrpc; do",
        "#     sudo rmmod \"$m\" 2>/dev/null || true",
        "# done",
        "#",
        "# # 2. systemd cf-class drop-ins (tenant units):",
        "# for u in user@ sshd cron crond atd; do",
        "#     sudo install -d /etc/systemd/system/${u}.service.d",
        "#     sudo tee /etc/systemd/system/${u}.service.d/10-rfxn-defense.conf \\",
        "#         >/dev/null <<EOF",
        "# [Service]",
        "# RestrictAddressFamilies=~AF_ALG ~AF_RXRPC",
        "# RestrictNamespaces=~user ~net",
        "# SystemCallArchitectures=native",
        "# SystemCallFilter=~@swap",
        "# EOF",
        "# done",
        "# sudo systemctl daemon-reload",
        "# sudo systemctl try-reload-or-restart sshd.service",
        "#",
        "# # 3. cf-class auditd rules (extended):",
        "# sudo tee /etc/audit/rules.d/rfxn-defense.rules >/dev/null <<'EOF'",
        "# -a always,exit -F arch=b64 -S socket -F a0=38 -k afalg_attempt",
        "# -a always,exit -F arch=b64 -S unshare -F auid>=1000 -k cf_userns",
        "# -a always,exit -F arch=b64 -S add_key -F auid>=1000 -k cf_addkey",
        "# -a always,exit -F arch=b64 -S socket -F a0=16 -F a2=6 -k cf_xfrm_nl",
        "# -a always,exit -F arch=b64 -S splice -F auid>=1000 -k splice_tenant",
        "# EOF",
        "# sudo augenrules --load",
        "# # NOTE: cf_xfrm_nl filters AF_NETLINK socket() with NETLINK_XFRM=6",
        "# # protocol family. kauditd cannot natively filter netlink MESSAGE",
        "# # types per-protocol, so XFRM_MSG_NEWSA-specific filtering is a",
        "# # documented gap. Falco / VED-eBPF can fill this with custom",
        "# # probes if higher fidelity is required.",
        "# #",
        "# # POST-FILTER add_key for rxrpc-specific events (dirtyfrag-RxRPC):",
        "# # ausearch -k cf_addkey | aureport --start today \\",
        "# #   | grep -i 'desc=rxrpc:'",
        "",
    ]
    print("\n".join(lines))

def main():
    parser = argparse.ArgumentParser(
        description="Copy Fail bug-class checker (cf1 / cf2 / Dirty Frag)",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="Covers cf1 (CVE-2026-31431, algif_aead), cf2 (xfrm-ESP), and "
               "Dirty Frag (xfrm-ESP + RxRPC). "
               "Exit: 0=clean 1=err 2=VULN 3=vuln+mitigated 4=hardening_recs. "
               "JSON: posture.bug_classes_covered (array) + .bug_classes (map).")
    parser.add_argument("--json", action="store_true", help="JSON output")
    parser.add_argument("--verbose", "-v", action="store_true",
                        help="show all checks including passing")
    parser.add_argument("--skip-trigger", action="store_true",
                        help="skip live AF_ALG probe")
    parser.add_argument("--skip-hardening", action="store_true",
                        help="skip suid/page-cache/getcap audit")
    parser.add_argument("--category", default="",
                        help="filter: ENV,KERNEL,MITIGATION,HARDENING,DETECTION")
    parser.add_argument("--no-progress", action="store_true",
                        help="suppress in-progress status output to stderr")
    parser.add_argument("--emit-remediation", action="store_true",
                        help="instead of the report, print a bash script "
                             "of the remediations for every non-OK check "
                             "(stdout); review before executing")
    args = parser.parse_args()

    # Enable progress output unless explicitly disabled or in JSON mode where
    # the user might be capturing stderr too.
    global PROGRESS
    PROGRESS = Progress(enabled=not args.no_progress and not args.json)

    if PROGRESS.enabled or PROGRESS.plain:
        sys.stderr.write("rfxn-defense posture auditor (7 LPE classes) "
                         "starting on {} ({})\n".format(
            os.uname().nodename, os.uname().release))
        sys.stderr.flush()

    try:
        results = run_all_checks(args)
    except Exception as e:
        print("internal error: {}: {}".format(type(e).__name__, e),
              file=sys.stderr)
        return 1

    cat_filter_active = bool(args.category)
    if args.emit_remediation:
        emit_remediation_script(results, cat_filter_active)
        return determine_exit_code(results)

    if args.json:
        out = {
            "schema_version": "2.0",
            "tool": "rfxn-local-check",
            "publisher": "rfxn.com - forged in prod",
            "url": "https://github.com/rfxn/rfxn-defense",
            "covers": ["CVE-2026-31431", "cf2-xfrm-esp", "dirtyfrag-esp",
                       "dirtyfrag-rxrpc"],
            "timestamp": int(time.time()),
            "hostname": os.uname().nodename,
            "kernel": os.uname().release,
            "checks": [r.to_dict() for r in results],
            "posture": determine_posture(results, cat_filter_active),
            "summary": {
                "total": len(results),
                "ok":    sum(1 for r in results if r.status == Status.OK),
                "warn":  sum(1 for r in results if r.status == Status.WARN),
                "fail":  sum(1 for r in results if r.status == Status.FAIL),
                "vuln":  sum(1 for r in results if r.status == Status.VULN),
                "skip":  sum(1 for r in results if r.status == Status.SKIP),
                "error": sum(1 for r in results if r.status == Status.ERROR),
            },
            "exit_code": determine_exit_code(results),
        }
        print(json.dumps(out, indent=2, default=str))
    else:
        print(colorize("=" * 78, C.DIM))
        print(colorize("rfxn-defense host posture auditor  ", C.BOLD)
              + colorize("({})".format(os.uname().nodename), C.DIM))
        print(colorize("cf1 / cf2 / Dirty Frag-ESP / DF-RxRPC / Fragnesia / "
                       "PinTheft / DirtyDecrypt / keysign-pwn",
                       C.DIM))
        print(colorize("rfxn.com - forged in prod - github.com/rfxn/rfxn-defense",
                       C.DIM))
        print(colorize("=" * 78, C.DIM))
        for r in results:
            line = r.render(verbose=args.verbose)
            if line:
                print(line)
        print(colorize("-" * 78, C.DIM))
        ok    = sum(1 for r in results if r.status == Status.OK)
        warn  = sum(1 for r in results if r.status == Status.WARN)
        fail  = sum(1 for r in results if r.status == Status.FAIL)
        vuln  = sum(1 for r in results if r.status == Status.VULN)
        skip  = sum(1 for r in results if r.status == Status.SKIP)
        print("Total: {} | {} | {} | {} | {} | {}".format(
            len(results),
            colorize("OK:" + str(ok), C.GREEN),
            colorize("WARN:" + str(warn), C.YELLOW),
            colorize("FAIL:" + str(fail), C.RED),
            colorize("VULN:" + str(vuln), C.RED + C.BOLD),
            colorize("SKIP:" + str(skip), C.DIM)))
        # v2.0.0: holistic surface-area view
        posture_summary = determine_posture(results, cat_filter_active)
        bc = posture_summary.get("bug_classes", {})
        print()
        print(colorize("Surface area / mitigation matrix:", C.BOLD))
        print(colorize("  Class            Sink reachable?      Mitigated?      Active layers", C.DIM))
        for cls_id, label in (("cf1", "cf1 (CVE-2026-31431)"),
                              ("cf2", "cf2 (xfrm-ESP)"),
                              ("dirtyfrag-esp",   "Dirty Frag-ESP"),
                              ("dirtyfrag-rxrpc", "Dirty Frag-RxRPC"),
                              ("pintheft",        "pintheft (AF_RDS)"),
                              ("keysign-pwn",     "keysign-pwn (ptrace)")):
            info = bc.get(cls_id, {})
            applicable = info.get("applicable")
            mitigated  = info.get("mitigated")
            layers     = info.get("layers", {})
            active = sorted(k for k, v in layers.items() if v)
            if not applicable:
                reach   = colorize("no (n/a)", C.DIM)
                mit_str = colorize("-", C.DIM)
                lay_str = colorize("-", C.DIM)
            elif mitigated is None:
                reach   = colorize("YES", C.YELLOW)
                mit_str = colorize("filtered", C.DIM)
                lay_str = colorize("(--category active)", C.DIM)
            else:
                reach = colorize("YES", C.YELLOW)
                if mitigated:
                    mit_str = colorize("yes", C.GREEN)
                    lay_str = colorize(", ".join(active) if active else "(none)", C.GREEN)
                else:
                    mit_str = colorize("NO", C.RED)
                    lay_str = colorize("(none)", C.RED)
            print("  {:<16} {:<20} {:<15} {}".format(label, reach, mit_str, lay_str))
        # v2.0.1: auto-detect summary line.
        ad = posture_summary.get("auto_detect", {})
        if ad.get("available"):
            if ad.get("force_full"):
                ad_line = "force-full sentinel active (all mitigations applied)"
            elif ad.get("detected_workloads"):
                workloads = ", ".join(ad["detected_workloads"])
                suppressed = ad.get("suppressed_mitigations", [])
                if suppressed:
                    ad_line = "{} (suppressed: {})".format(
                        workloads, ", ".join(suppressed))
                else:
                    ad_line = workloads
            else:
                ad_line = "clean (no conflicts)"
            print()
            print(colorize("Auto-detect:", C.BOLD), ad_line)
        print()
        # Single-line scan-friendly summary (kept for log-tailers / SIEM grep)
        def _bc_repr(cls_id):
            info = bc.get(cls_id, {})
            if not info.get("applicable"):
                return colorize("n/a", C.DIM)
            mit = info.get("mitigated")
            if mit is None:
                # filter_applied or class-not-evaluated; do not lie either way
                return colorize("filtered", C.DIM)
            if mit:
                return colorize("mitigated", C.GREEN)
            return colorize("vulnerable", C.RED)
        print("Bug-class coverage: cf1={} cf2={} dirtyfrag-esp={} "
              "dirtyfrag-rxrpc={} pintheft={} keysign-pwn={}".format(
                  _bc_repr("cf1"), _bc_repr("cf2"),
                  _bc_repr("dirtyfrag-esp"), _bc_repr("dirtyfrag-rxrpc"),
                  _bc_repr("pintheft"), _bc_repr("keysign-pwn")))

    return determine_exit_code(results)

if __name__ == "__main__":
    sys.exit(main())
