Files
IQA-Metric-Benchmark/scripts/threshold_analysis.py
Nguyễn Phước Thành 462754cfb0 udpate
2025-09-12 15:01:22 +07:00

672 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Threshold analysis for DeQA scores vs. human labels (High/Low).
Inputs (defaults for facture task):
- results/facture.txt # lines like: "4.2 - filename.jpg"
- data/facture/labels.csv # columns: filename,label with label in {High,Low}
Outputs:
- results/facture_thresholds_summary.json # best thresholds for accuracy/precision/recall/F1
- results/facture_metric_curves.png # metrics vs threshold
- results/facture_score_distributions.png # score histograms by label
- results/facture_decisions.csv # per-image decisions at each operating point
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Dict, List, Tuple
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
def read_deqa_results_txt(path: Path) -> pd.DataFrame:
"""Read TXT results of the form "<score> - <filename>" into a DataFrame."""
rows: List[Dict[str, str | float]] = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
# Expect pattern: "<score> - <filename>"
try:
score_part, fname = line.split(" - ", 1)
score = float(score_part)
rows.append({"filename": fname, "score": score})
except Exception:
# Skip malformed lines silently
continue
df = pd.DataFrame(rows)
if not df.empty:
df["filename"] = df["filename"].astype(str)
df["stem"] = df["filename"].apply(lambda x: Path(x).stem.lower())
return df
def read_labels_csv(path: Path) -> pd.DataFrame:
"""Read labels CSV with columns: filename,label (High/Low)."""
df = pd.read_csv(path)
# Normalize
df["filename"] = df["filename"].astype(str)
df["label"] = df["label"].astype(str).str.strip().str.capitalize()
# Map High->1, Low->0
label_map = {"High": 1, "Low": 0}
df["y_true"] = df["label"].map(label_map)
df["stem"] = df["filename"].apply(lambda x: Path(x).stem.lower())
return df[["filename", "label", "y_true", "stem"]]
def confusion_from_threshold(scores: np.ndarray, y_true: np.ndarray, thr: float) -> Tuple[int, int, int, int]:
pred = (scores >= thr).astype(int)
tp = int(np.sum((pred == 1) & (y_true == 1)))
fp = int(np.sum((pred == 1) & (y_true == 0)))
fn = int(np.sum((pred == 0) & (y_true == 1)))
tn = int(np.sum((pred == 0) & (y_true == 0)))
return tp, fp, fn, tn
def metric_from_confusion(tp: int, fp: int, fn: int, tn: int, metric: str) -> float:
if metric == "accuracy":
denom = tp + fp + fn + tn
return (tp + tn) / denom if denom > 0 else 0.0
if metric == "precision":
denom = tp + fp
return tp / denom if denom > 0 else 0.0
if metric == "recall":
denom = tp + fn
return tp / denom if denom > 0 else 0.0
if metric == "f1":
p_denom = tp + fp
r_denom = tp + fn
precision = tp / p_denom if p_denom > 0 else 0.0
recall = tp / r_denom if r_denom > 0 else 0.0
denom = precision + recall
return (2 * precision * recall / denom) if denom > 0 else 0.0
raise ValueError(f"Unsupported metric: {metric}")
def pick_threshold(scores: np.ndarray, y_true: np.ndarray, metric: str = "f1") -> Tuple[float, float, Dict[str, int]]:
thr_candidates = np.unique(scores)
best_thr: float | None = None
best_val: float = -1.0
best_conf: Tuple[int, int, int, int] | None = None
for t in thr_candidates:
tp, fp, fn, tn = confusion_from_threshold(scores, y_true, t)
val = metric_from_confusion(tp, fp, fn, tn, metric)
# Tie-breaker: prefer higher threshold if metric ties (safer for downstream)
if (val > best_val) or (np.isclose(val, best_val) and (best_thr is None or t > best_thr)):
best_val = val
best_thr = t
best_conf = (tp, fp, fn, tn)
assert best_thr is not None and best_conf is not None
tp, fp, fn, tn = best_conf
return float(best_thr), float(best_val), {"TP": tp, "FP": fp, "FN": fn, "TN": tn}
def compute_metric_curves(scores: np.ndarray, y_true: np.ndarray) -> pd.DataFrame:
data: List[Dict[str, float]] = []
for t in np.unique(scores):
tp, fp, fn, tn = confusion_from_threshold(scores, y_true, t)
row = {
"threshold": float(t),
"accuracy": metric_from_confusion(tp, fp, fn, tn, "accuracy"),
"precision": metric_from_confusion(tp, fp, fn, tn, "precision"),
"recall": metric_from_confusion(tp, fp, fn, tn, "recall"),
"f1": metric_from_confusion(tp, fp, fn, tn, "f1"),
"TP": tp,
"FP": fp,
"FN": fn,
"TN": tn,
}
data.append(row)
return pd.DataFrame(data).sort_values("threshold").reset_index(drop=True)
def _robust_bandwidth(x: np.ndarray) -> float:
"""Silverman-like robust bandwidth for Gaussian KDE."""
x = np.asarray(x, dtype=float)
n = len(x)
if n <= 1:
return 0.1 if n == 1 else 0.2
std = np.std(x, ddof=1)
iqr = np.subtract(*np.percentile(x, [75, 25]))
sigma = min(std, iqr / 1.34) if iqr > 0 else std
return 0.9 * sigma * n ** (-1/5)
def _kde_gaussian(x: np.ndarray, grid: np.ndarray, bw: float | None = None) -> np.ndarray:
"""Univariate Gaussian KDE evaluated on grid."""
x = np.asarray(x, dtype=float)
grid = np.asarray(grid, dtype=float)
if bw is None or bw <= 0:
bw = _robust_bandwidth(x)
if bw <= 0:
bw = max(1e-3, 0.1 * (np.max(x) - np.min(x) + 1e-6))
z = (grid[None, :] - x[:, None]) / bw
dens = np.exp(-0.5 * z * z) / (np.sqrt(2 * np.pi))
dens = dens.mean(axis=0) / bw
return dens
def find_density_intersections(x_high: np.ndarray, x_low: np.ndarray) -> list[float]:
"""Find x where KDE_high == KDE_low via linear interpolation on a fine grid."""
x_all = np.concatenate([x_high, x_low]).astype(float)
lo, hi = float(np.min(x_all)), float(np.max(x_all))
grid = np.linspace(lo, hi, 1024)
fH = _kde_gaussian(x_high, grid)
fL = _kde_gaussian(x_low, grid)
diff = fH - fL
s = np.sign(diff)
sign_change = np.where(np.diff(s) != 0)[0]
xs: list[float] = []
for i in sign_change:
x1, x2 = grid[i], grid[i + 1]
y1, y2 = diff[i], diff[i + 1]
if (y2 - y1) != 0:
xr = x1 - y1 * (x2 - x1) / (y2 - y1)
if lo <= xr <= hi:
xs.append(float(xr))
return xs
def pick_density_threshold(df: pd.DataFrame) -> float | None:
"""Pick 'prior-balanced' threshold at intersection near midpoint of class means."""
xH = df.loc[df["label"] == "High", "score"].astype(float).to_numpy()
xL = df.loc[df["label"] == "Low", "score"].astype(float).to_numpy()
if len(xH) < 2 or len(xL) < 2:
return None
inters = find_density_intersections(xH, xL)
if not inters:
return None
mH, mL = float(np.mean(xH)), float(np.mean(xL))
mid = 0.5 * (mH + mL)
thr = min(inters, key=lambda t: abs(t - mid))
return float(thr)
def plot_distributions(
df: pd.DataFrame,
out_path: Path,
threshold: float | None = None,
acc_at_thr: float | None = None,
f1_at_thr: float | None = None,
density_thr: float | None = None,
density_acc: float | None = None,
density_f1: float | None = None,
) -> None:
# Clean, white background without gray grid
sns.set_style("white")
plt.figure(figsize=(10, 6))
# Side-by-side bars (dodge) with wider bars
palette = {"High": "tab:blue", "Low": "tab:orange"}
sns.histplot(
data=df,
x="score",
hue="label",
bins=None,
binwidth=0.18,
kde=False,
stat="density",
common_norm=False,
multiple="dodge",
palette=palette,
element="bars",
shrink=0.85,
alpha=0.8,
edgecolor="white",
linewidth=0.5,
)
# KDE lines for High, Low, and All samples (three lines)
try:
high_scores = df.loc[df["label"] == "High", "score"].astype(float)
low_scores = df.loc[df["label"] == "Low", "score"].astype(float)
all_scores = df["score"].astype(float)
if len(high_scores) > 1:
sns.kdeplot(high_scores, color="tab:blue", linewidth=2.0, label="High density")
if len(low_scores) > 1:
sns.kdeplot(low_scores, color="tab:orange", linewidth=2.0, label="Low density")
if len(all_scores) > 1:
sns.kdeplot(all_scores, color="black", linewidth=2.2, linestyle="-", label="All density")
except Exception:
pass
# Threshold vertical line with styled annotation (F1-opt)
if threshold is not None:
ax = plt.gca()
ax.axvline(threshold, color="red", linestyle=(0, (6, 4)), linewidth=2.0)
acc_str = f"{acc_at_thr:.3f}" if acc_at_thr is not None else "NA"
f1_str = f"{f1_at_thr:.3f}" if f1_at_thr is not None else "NA"
label_text = f"threshold(F1)={threshold:.3f} Accuracy={acc_str} F1={f1_str}"
ymax = ax.get_ylim()[1]
ax.text(
threshold + 0.02,
ymax * 0.97,
label_text,
color="red",
ha="left",
va="top",
fontsize=10,
bbox=dict(boxstyle="round,pad=0.3", facecolor="#ffecec", edgecolor="#ff9a9a", alpha=0.85),
)
# Density-intersection threshold (purple)
if density_thr is not None:
ax = plt.gca()
ax.axvline(density_thr, color="purple", linestyle="--", linewidth=2.0)
ymax = ax.get_ylim()[1]
dens_acc_str = f"{density_acc:.3f}" if density_acc is not None else "NA"
dens_f1_str = f"{density_f1:.3f}" if density_f1 is not None else "NA"
ax.text(
density_thr + 0.02,
ymax * 0.90,
f"threshold(density)={density_thr:.3f} Accuracy={dens_acc_str} F1={dens_f1_str}",
color="purple",
ha="left",
va="top",
fontsize=10,
bbox=dict(boxstyle="round,pad=0.3", facecolor="#efe6ff", edgecolor="#b497ff", alpha=0.85),
)
# Add stats box in bottom-right: counts and mean/std per class and overall
try:
high_scores = df.loc[df["label"] == "High", "score"].astype(float)
low_scores = df.loc[df["label"] == "Low", "score"].astype(float)
n_high = int(high_scores.shape[0])
n_low = int(low_scores.shape[0])
mean_high = float(high_scores.mean()) if n_high > 0 else float("nan")
std_high = float(high_scores.std(ddof=1)) if n_high > 1 else float("nan")
mean_low = float(low_scores.mean()) if n_low > 0 else float("nan")
std_low = float(low_scores.std(ddof=1)) if n_low > 1 else float("nan")
all_scores = df["score"].astype(float)
mean_all = float(all_scores.mean()) if all_scores.shape[0] > 0 else float("nan")
std_all = float(all_scores.std(ddof=1)) if all_scores.shape[0] > 1 else float("nan")
stats_text = (
f"High: n={n_high}, \u03BC={mean_high:.3f}, \u03C3={std_high:.3f}\n"
f"Low: n={n_low}, \u03BC={mean_low:.3f}, \u03C3={std_low:.3f}\n"
f"All: n={n_high+n_low}, \u03BC={mean_all:.3f}, \u03C3={std_all:.3f}"
)
ax = plt.gca()
ax.text(
0.99, 0.02, stats_text,
transform=ax.transAxes,
ha="right", va="bottom",
fontsize=9,
bbox=dict(boxstyle="round,pad=0.5", facecolor="white", edgecolor="gray", alpha=0.95),
)
except Exception:
pass
plt.title("DeQA score distributions by label")
plt.xlabel("DeQA score")
plt.ylabel("Density")
plt.legend()
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_distributions_count(
df: pd.DataFrame,
out_path: Path,
threshold: float | None = None,
acc_at_thr: float | None = None,
f1_at_thr: float | None = None,
density_thr: float | None = None,
density_acc: float | None = None,
density_f1: float | None = None,
) -> None:
sns.set_style("white")
plt.figure(figsize=(10, 6))
palette = {"High": "tab:blue", "Low": "tab:orange"}
used_binwidth = 0.18
ax = plt.gca()
sns.histplot(
data=df,
x="score",
hue="label",
bins=None,
binwidth=used_binwidth,
kde=False,
stat="count",
common_norm=False,
multiple="dodge",
palette=palette,
element="bars",
shrink=0.85,
alpha=0.8,
edgecolor="white",
linewidth=0.5,
ax=ax,
)
# KDE lines for High, Low, and All, scaled to counts
try:
high_scores = df.loc[df["label"] == "High", "score"].astype(float)
low_scores = df.loc[df["label"] == "Low", "score"].astype(float)
all_scores = df["score"].astype(float)
if len(high_scores) > 1:
sns.kdeplot(high_scores, color="tab:blue", linewidth=2.0, label="High KDE (count)", ax=ax)
line = ax.lines[-1]
x, y = line.get_data()
line.set_data(x, y * len(high_scores) * used_binwidth)
if len(low_scores) > 1:
sns.kdeplot(low_scores, color="tab:orange", linewidth=2.0, label="Low KDE (count)", ax=ax)
line = ax.lines[-1]
x, y = line.get_data()
line.set_data(x, y * len(low_scores) * used_binwidth)
if len(all_scores) > 1:
sns.kdeplot(all_scores, color="black", linewidth=2.2, linestyle="-", label="All KDE (count)", ax=ax)
line = ax.lines[-1]
x, y = line.get_data()
line.set_data(x, y * len(all_scores) * used_binwidth)
except Exception:
pass
if threshold is not None:
ax.axvline(threshold, color="red", linestyle=(0, (6, 4)), linewidth=2.0)
acc_str = f"{acc_at_thr:.3f}" if acc_at_thr is not None else "NA"
f1_str = f"{f1_at_thr:.3f}" if f1_at_thr is not None else "NA"
label_text = f"threshold(F1)={threshold:.3f} Accuracy={acc_str} F1={f1_str}"
ymax = ax.get_ylim()[1]
ax.text(
threshold + 0.02,
ymax * 0.97,
label_text,
color="red",
ha="left",
va="top",
fontsize=10,
bbox=dict(boxstyle="round,pad=0.3", facecolor="#ffecec", edgecolor="#ff9a9a", alpha=0.85),
)
if density_thr is not None:
ax.axvline(density_thr, color="purple", linestyle="--", linewidth=2.0)
ymax = ax.get_ylim()[1]
dens_acc_str = f"{density_acc:.3f}" if density_acc is not None else "NA"
dens_f1_str = f"{density_f1:.3f}" if density_f1 is not None else "NA"
ax.text(
density_thr + 0.02,
ymax * 0.90,
f"threshold(density)={density_thr:.3f} Accuracy={dens_acc_str} F1={dens_f1_str}",
color="purple",
ha="left",
va="top",
fontsize=10,
bbox=dict(boxstyle="round,pad=0.3", facecolor="#efe6ff", edgecolor="#b497ff", alpha=0.85),
)
# Stats box
try:
high_scores = df.loc[df["label"] == "High", "score"].astype(float)
low_scores = df.loc[df["label"] == "Low", "score"].astype(float)
n_high = int(high_scores.shape[0])
n_low = int(low_scores.shape[0])
mean_high = float(high_scores.mean()) if n_high > 0 else float("nan")
std_high = float(high_scores.std(ddof=1)) if n_high > 1 else float("nan")
mean_low = float(low_scores.mean()) if n_low > 0 else float("nan")
std_low = float(low_scores.std(ddof=1)) if n_low > 1 else float("nan")
all_scores = df["score"].astype(float)
mean_all = float(all_scores.mean()) if all_scores.shape[0] > 0 else float("nan")
std_all = float(all_scores.std(ddof=1)) if all_scores.shape[0] > 1 else float("nan")
stats_text = (
f"High: n={n_high}, \u03BC={mean_high:.3f}, \u03C3={std_high:.3f}\n"
f"Low: n={n_low}, \u03BC={mean_low:.3f}, \u03C3={std_low:.3f}\n"
f"All: n={n_high+n_low}, \u03BC={mean_all:.3f}, \u03C3={std_all:.3f}"
)
ax.text(
0.99, 0.02, stats_text,
transform=ax.transAxes,
ha="right", va="bottom",
fontsize=9,
bbox=dict(boxstyle="round,pad=0.5", facecolor="white", edgecolor="gray", alpha=0.95),
)
except Exception:
pass
plt.title("DeQA score distributions by label (counts)")
plt.xlabel("DeQA score")
plt.ylabel("Count")
plt.legend()
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_metric_curves(curve_df: pd.DataFrame, out_path: Path) -> None:
plt.figure(figsize=(8, 5))
for metric in ["accuracy", "precision", "recall", "f1"]:
plt.plot(curve_df["threshold"], curve_df[metric], label=metric)
plt.xlabel("Threshold (score >= t => HIGH)")
plt.ylabel("Metric value")
plt.ylim(0.0, 1.05)
plt.title("Metrics vs threshold")
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_sorted_scores_with_threshold(df: pd.DataFrame, thr: float, out_path: Path) -> None:
tmp = df.sort_values("score").reset_index(drop=True)
x = np.arange(len(tmp))
y = tmp["score"].to_numpy()
plt.figure(figsize=(9, 4))
plt.scatter(x, y, s=6, alpha=0.6)
plt.axhline(thr, color="red", linestyle="--", label=f"threshold={thr:.3f}")
plt.xlabel("Images sorted by score")
plt.ylabel("DeQA score")
plt.title("Sorted scores with operating threshold")
plt.legend()
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_pr_curve(curves: pd.DataFrame, out_path: Path) -> None:
plt.figure(figsize=(6, 5))
plt.plot(curves["recall"], curves["precision"], marker="o", ms=3, lw=1)
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall across thresholds")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_roc_like(curves: pd.DataFrame, out_path: Path) -> None:
# TPR=recall, FPR=FP/(FP+TN)
denom = (curves["FP"] + curves["TN"]).replace(0, np.nan)
fpr = curves["FP"] / denom
tpr = curves["recall"]
plt.figure(figsize=(6, 5))
plt.plot(fpr.fillna(0), tpr, marker="o", ms=3, lw=1)
plt.xlabel("False Positive Rate (FPR)")
plt.ylabel("True Positive Rate (TPR)")
plt.title("ROC-like curve across thresholds")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_confusion_heatmap(tp: int, fp: int, fn: int, tn: int, out_path: Path) -> None:
cm = np.array([[tp, fp],[fn, tn]])
plt.figure(figsize=(4, 4))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False,
xticklabels=["Pred High","Pred Low"], yticklabels=["True High","True Low"])
plt.title("Confusion matrix at operating threshold")
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def main() -> None:
parser = argparse.ArgumentParser(description="Threshold analysis for DeQA scores vs labels")
parser.add_argument("--scores", type=str, default="results/facture.txt", help="Path to deqa scores txt")
parser.add_argument("--labels", type=str, default="data/facture/labels.csv", help="Path to labels csv")
parser.add_argument("--outdir", type=str, default="results", help="Directory to write outputs")
parser.add_argument("--sample-per-class", type=int, default=0,
help="If >0, randomly sample N High and N Low for a quick benchmark")
parser.add_argument("--seed", type=int, default=42, help="Random seed for sampling")
args = parser.parse_args()
scores_path = Path(args.scores)
labels_path = Path(args.labels)
outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
# Load
df_scores = read_deqa_results_txt(scores_path)
df_labels = read_labels_csv(labels_path)
# Join on lowercase stem to tolerate extension differences
df = df_scores.merge(df_labels, on="stem", how="inner", suffixes=("_score", "_label"))
# Prefer label-side filename when available
df["filename"] = df["filename_label"].where(df["filename_label"].notna(), df["filename_score"])
df.drop(columns=[c for c in ["filename_label", "filename_score"] if c in df.columns], inplace=True)
if df.empty:
raise RuntimeError("No overlap between scores and labels. Check filenames.")
# Optional sampling per class
if args.sample_per_class and args.sample_per_class > 0:
rng = np.random.default_rng(args.seed)
high_df = df[df["y_true"] == 1]
low_df = df[df["y_true"] == 0]
n_high = min(args.sample_per_class, len(high_df))
n_low = min(args.sample_per_class, len(low_df))
high_sample = high_df.sample(n=n_high, random_state=args.seed)
low_sample = low_df.sample(n=n_low, random_state=args.seed)
df = pd.concat([high_sample, low_sample], ignore_index=True)
df = df.sample(frac=1.0, random_state=args.seed).reset_index(drop=True)
scores = df["score"].to_numpy(dtype=float)
y_true = df["y_true"].to_numpy(dtype=int)
# Compute best thresholds
thr_f1, best_f1, conf_f1 = pick_threshold(scores, y_true, metric="f1")
thr_acc, best_acc, conf_acc = pick_threshold(scores, y_true, metric="accuracy")
thr_prec, best_prec, conf_prec = pick_threshold(scores, y_true, metric="precision")
thr_rec, best_rec, conf_rec = pick_threshold(scores, y_true, metric="recall")
# New: density-intersection threshold
density_thr = pick_density_threshold(df)
if density_thr is not None:
tp_d, fp_d, fn_d, tn_d = confusion_from_threshold(scores, y_true, density_thr)
acc_at_density = metric_from_confusion(tp_d, fp_d, fn_d, tn_d, "accuracy")
f1_at_density = metric_from_confusion(tp_d, fp_d, fn_d, tn_d, "f1")
else:
tp_d = fp_d = fn_d = tn_d = None
acc_at_density = None
f1_at_density = None
summary = {
"positive_definition": "HIGH when score >= threshold",
"best_thresholds": {
"f1": {"threshold": thr_f1, "value": best_f1, "confusion": conf_f1},
"accuracy": {"threshold": thr_acc, "value": best_acc, "confusion": conf_acc},
"precision": {"threshold": thr_prec, "value": best_prec, "confusion": conf_prec},
"recall": {"threshold": thr_rec, "value": best_rec, "confusion": conf_rec},
"density_intersection": {
"threshold": density_thr,
"acc": acc_at_density,
"f1": f1_at_density,
"confusion": {"TP": tp_d, "FP": fp_d, "FN": fn_d, "TN": tn_d} if density_thr is not None else None,
"notes": "Intersection of KDE(High) and KDE(Low), equal prior decision boundary",
},
},
"counts": {
"total": int(len(df)),
"positives": int(df["y_true"].sum()),
"negatives": int(len(df) - int(df["y_true"].sum())),
},
}
# Metric curves and figures
curves = compute_metric_curves(scores, y_true)
# Accuracy and F1 at selected threshold for annotation
tp_f1, fp_f1, fn_f1, tn_f1 = confusion_from_threshold(scores, y_true, thr_f1)
acc_at_thr = metric_from_confusion(tp_f1, fp_f1, fn_f1, tn_f1, "accuracy")
f1_at_thr = metric_from_confusion(tp_f1, fp_f1, fn_f1, tn_f1, "f1")
plot_distributions(
df,
outdir / "facture_score_distributions.png",
threshold=thr_f1,
acc_at_thr=acc_at_thr,
f1_at_thr=f1_at_thr,
density_thr=density_thr,
density_acc=acc_at_density,
density_f1=f1_at_density,
)
# New: counts version
plot_distributions_count(
df,
outdir / "facture_score_distributions_count.png",
threshold=thr_f1,
acc_at_thr=acc_at_thr,
f1_at_thr=f1_at_thr,
density_thr=density_thr,
density_acc=acc_at_density,
density_f1=f1_at_density,
)
plot_metric_curves(curves, outdir / "facture_metric_curves.png")
# Extra plots
plot_sorted_scores_with_threshold(df, thr_f1, outdir / "facture_sorted_scores_with_thr.png")
plot_pr_curve(curves, outdir / "facture_precision_recall_curve.png")
plot_roc_like(curves, outdir / "facture_roc_like_curve.png")
plot_confusion_heatmap(conf_f1["TP"], conf_f1["FP"], conf_f1["FN"], conf_f1["TN"], outdir / "facture_confusion_matrix.png")
# Decisions CSV (for three operating points + F1)
def decide(thr: float) -> np.ndarray:
return (scores >= thr).astype(int)
df_out = df.copy()
df_out["decision_f1"] = decide(thr_f1)
df_out["decision_acc"] = decide(thr_acc)
df_out["decision_prec"] = decide(thr_prec)
df_out["decision_rec"] = decide(thr_rec)
# Map 1/0 to textual action
to_action = {1: "implement", 0: "reject"}
for col in ["decision_f1", "decision_acc", "decision_prec", "decision_rec"]:
df_out[col] = df_out[col].map(to_action)
df_out.rename(columns={"score": "deqa_score"}, inplace=True)
df_out = df_out[["filename", "deqa_score", "label", "decision_f1", "decision_acc", "decision_prec", "decision_rec"]]
df_out.to_csv(outdir / "facture_decisions.csv", index=False)
# Save summary JSON
with open(outdir / "facture_thresholds_summary.json", "w", encoding="utf-8") as f:
json.dump(summary, f, indent=2)
# Save a single Excel file with one sheet containing all rows and decisions (F1 operating point)
try:
excel_path = outdir / "facture_deqa_images.xlsx"
one_sheet_df = df_out.copy()
# Keep core columns only
keep_cols = ["filename", "deqa_score", "label", "decision_f1"]
one_sheet_df = one_sheet_df[keep_cols]
one_sheet_df.rename(columns={"decision_f1": "decision"}, inplace=True)
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
one_sheet_df.to_excel(writer, sheet_name="DeQA_Images", index=False)
except Exception as e:
print(f"Warning: Failed to write Excel file: {e}")
# Also print a concise console summary
print("Best thresholds (score >= thr => HIGH):")
for k in ["f1", "accuracy", "precision", "recall"]:
info = summary["best_thresholds"][k]
print(f"- {k}: thr={info['threshold']:.3f}, value={info['value']:.3f}, conf={info['confusion']}")
if density_thr is not None:
print(f"- density_threshold: {density_thr:.3f}")
if __name__ == "__main__":
main()