import math
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt
7)
torch.manual_seed(7)
np.random.seed(
= "cpu"
device
def train_test_split(X, y, test_size=0.3, shuffle=True, seed=0):
= X.shape[0]
n = np.arange(n)
idx if shuffle:
= np.random.default_rng(seed)
rng
rng.shuffle(idx)= int(round(test_size * n))
n_test = idx[:n_test]
test_idx = idx[n_test:]
train_idx return X[train_idx], X[test_idx], y[train_idx], y[test_idx]
def standardize_fit(X):
= X.mean(axis=0, keepdims=True)
mu = X.std(axis=0, keepdims=True) + 1e-8
std return mu, std
def standardize_transform(X, mu, std):
return (X - mu) / std
def auc_roc(scores, labels):
= np.asarray(scores).ravel()
scores = np.asarray(labels).astype(int).ravel()
labels = np.argsort(-scores, kind="mergesort")
order = labels[order]
y = y.sum()
P = len(y) - P
N if P == 0 or N == 0:
return float("nan")
= np.cumsum(y)
tps = np.cumsum(1 - y)
fps = tps / P
tpr = fps / N
fpr return float(np.trapz(tpr, fpr))
Logistic Regression with SGD on a Gaussian GMM Dataset + PCA
Objective. Generate a synthetic Gaussian Mixture Model (GMM) binary classification dataset where only a low‑dimensional subspace is informative and the remaining dimensions are noisy.
Train logistic regression with SGD in PyTorch with and without PCA, and compare performance.
What students should learn - How redundant/noisy features degrade a linear classifier trained with SGD. - How PCA (unsupervised) can improve performance by denoising and reducing dimensionality. - How to visualize explained variance and decision boundaries in the top PCs.
Setup
Generate a GMM dataset (binary) with noisy dimensions
def make_gmm_binary(n_per_class=1000, d_total=20, d_informative=2, class_sep=2.5, noise_std=1.0, seed=0):
= np.random.default_rng(seed)
rng
= class_sep
sep = np.array([[-sep, 0.0], [0.0, -sep]])
mus0 = np.array([[sep, 0.0], [0.0, sep]])
mus1 = np.array([[1.0, 0.3], [0.3, 1.0]])
cov_inf
def sample_class(mus, n):
= rng.integers(0, len(mus), size=n)
z = np.stack([rng.multivariate_normal(mus[k], cov_inf) for k in z], axis=0)
X2 return X2
= sample_class(mus0, n_per_class)
X0_inf = sample_class(mus1, n_per_class)
X1_inf
= max(0, d_total - d_informative)
d_noise if d_noise > 0:
= rng.normal(0.0, noise_std, size=(n_per_class, d_noise))
X0_noise = rng.normal(0.0, noise_std, size=(n_per_class, d_noise))
X1_noise = np.concatenate([X0_inf, X0_noise], axis=1)
X0 = np.concatenate([X1_inf, X1_noise], axis=1)
X1 else:
= X0_inf
X0 = X1_inf
X1
= np.vstack([X0, X1]).astype(np.float32)
X = np.concatenate([np.zeros(n_per_class), np.ones(n_per_class)]).astype(np.int64)
y
= np.linalg.qr(rng.normal(size=(d_total, d_total)))
Q, _ = (X @ Q).astype(np.float32)
X
return X, y
# X, y = make_gmm_binary(n_per_class=1200, d_total=20, d_informative=2, class_sep=2.5, noise_std=1.5, seed=42)
= make_gmm_binary(n_per_class=200, d_total=20, d_informative=2, class_sep=2.5, noise_std=1.0, seed=42)
X, y
= train_test_split(X, y, test_size=0.3, shuffle=True, seed=123)
X_train, X_test, y_train, y_test = standardize_fit(X_train)
mu, std = standardize_transform(X_train, mu, std)
Xtr = standardize_transform(X_test, mu, std)
Xte
X.shape, Xtr.shape, Xte.shape, y_train.shape, y_test.shape
((400, 20), (280, 20), (120, 20), (280,), (120,))
PCA (Torch SVD)
def pca_fit_torch(X_np):
= X_np - X_np.mean(axis=0, keepdims=True)
Xc = torch.from_numpy(Xc)
Xc_t = torch.linalg.svd(Xc_t, full_matrices=False)
U, S, Vh = X_np.shape[0]
n = (S**2) / (n - 1)
ev = ev / ev.sum()
evr = Vh.numpy()
components return components, evr.numpy()
def pca_transform(X_np, components, k):
= X_np - X_np.mean(axis=0, keepdims=True)
Xc = components[:k].T
W return (Xc @ W).astype(np.float32)
= pca_fit_torch(Xtr.copy())
components, evr = np.cumsum(evr)
explained = int(np.searchsorted(explained, 0.95) + 1)
k95 = 2
k_fixed
print("Explained variance ratio (first 10):", evr[:10])
print("k@95% =", k95)
Explained variance ratio (first 10): [0.14723612 0.12738939 0.06149405 0.05959329 0.05904397 0.05309147
0.05024018 0.04745107 0.04639223 0.04420933]
k@95% = 18
Plot: cumulative explained variance
=(6, 4))
plt.figure(figsize1, len(evr) + 1), np.cumsum(evr), marker="o")
plt.plot(np.arange("number of PCs")
plt.xlabel("cumulative explained variance")
plt.ylabel("PCA cumulative explained variance on training set")
plt.title( plt.show()
Logistic regression (PyTorch, SGD)
class LogisticReg(nn.Module):
def __init__(self, in_dim):
super().__init__()
self.lin = nn.Linear(in_dim, 1)
def forward(self, x):
return self.lin(x).squeeze(-1)
def auc_roc(scores, labels):
= np.asarray(scores).ravel()
scores = np.asarray(labels).astype(int).ravel()
labels = np.argsort(-scores, kind="mergesort")
order = labels[order]
y = y.sum()
P = len(y) - P
N if P == 0 or N == 0:
return float("nan")
= np.cumsum(y)
tps = np.cumsum(1 - y)
fps = tps / P
tpr = fps / N
fpr return float(np.trapz(tpr, fpr))
def train_logreg_sgd(Xtr_np, ytr_np, Xte_np, yte_np, lr=5e-3, epochs=300, batch_size=128, l2=1e-4):
= torch.from_numpy(Xtr_np.astype(np.float32))
Xtr_t = torch.from_numpy(ytr_np.astype(np.float32))
ytr_t = torch.from_numpy(Xte_np.astype(np.float32))
Xte_t = torch.from_numpy(yte_np.astype(np.float32))
yte_t
= LogisticReg(in_dim=Xtr_t.shape[1])
model = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=l2)
opt = nn.BCEWithLogitsLoss()
loss_fn
= TensorDataset(Xtr_t, ytr_t)
ds = DataLoader(ds, batch_size=batch_size, shuffle=True)
dl
model.train()for ep in range(1, epochs + 1):
= 0.0
running for xb, yb in dl:
= model(xb)
logits = loss_fn(logits, yb)
loss
opt.zero_grad()
loss.backward()
opt.step()+= loss.item() * xb.size(0)
running if ep % 50 == 0:
print(f"epoch {ep:4d} | loss: {running / len(ds):.4f}")
eval()
model.with torch.no_grad():
= model(Xtr_t).numpy()
logits_tr = model(Xte_t).numpy()
logits_te
= 1.0 / (1.0 + np.exp(-logits_tr))
prob_tr = 1.0 / (1.0 + np.exp(-logits_te))
prob_te = (prob_te >= 0.5).astype(int)
pred_te = (pred_te == yte_np).mean()
acc_te = auc_roc(prob_te, yte_np)
auc_te return model, acc_te, auc_te, prob_te, logits_te
Train & evaluate: baseline vs PCA
= train_logreg_sgd(
model_raw, acc_raw, auc_raw, prob_raw, logit_raw =5e-3, epochs=300, batch_size=128, l2=1e-4
Xtr, y_train, Xte, y_test, lr
)print(f"Baseline (no PCA): acc={acc_raw:.3f}, AUC={auc_raw:.3f}")
= pca_transform(Xtr, components, k95)
Xtr_pca95 = pca_transform(Xte, components, k95)
Xte_pca95 = train_logreg_sgd(
model_pca95, acc_pca95, auc_pca95, prob_pca95, _ =5e-3, epochs=300, batch_size=128, l2=1e-4
Xtr_pca95, y_train, Xte_pca95, y_test, lr
)print(f"PCA (k@95%={k95}): acc={acc_pca95:.3f}, AUC={auc_pca95:.3f}")
= pca_transform(Xtr, components, 2)
Xtr_pca2 = pca_transform(Xte, components, 2)
Xte_pca2 = train_logreg_sgd(
model_pca2, acc_pca2, auc_pca2, prob_pca2, _ =5e-3, epochs=300, batch_size=128, l2=1e-4
Xtr_pca2, y_train, Xte_pca2, y_test, lr
)print(f"PCA (k=2): acc={acc_pca2:.3f}, AUC={auc_pca2:.3f}")
epoch 50 | loss: 0.1858
epoch 100 | loss: 0.1544
epoch 150 | loss: 0.1423
epoch 200 | loss: 0.1353
epoch 250 | loss: 0.1308
epoch 300 | loss: 0.1278
Baseline (no PCA): acc=0.950, AUC=0.986
epoch 50 | loss: 0.1850
epoch 100 | loss: 0.1537
epoch 150 | loss: 0.1422
epoch 200 | loss: 0.1358
epoch 250 | loss: 0.1317
epoch 300 | loss: 0.1289
PCA (k@95%=18): acc=0.933, AUC=0.986
epoch 50 | loss: 0.1929
epoch 100 | loss: 0.1670
epoch 150 | loss: 0.1584
epoch 200 | loss: 0.1545
epoch 250 | loss: 0.1523
epoch 300 | loss: 0.1506
PCA (k=2): acc=0.933, AUC=0.984
Plot: decision boundary in 2D PCA space
= np.meshgrid(
xx, yy 0].min() - 1.0, Xtr_pca2[:, 0].max() + 1.0, 200),
np.linspace(Xtr_pca2[:, 1].min() - 1.0, Xtr_pca2[:, 1].max() + 1.0, 200),
np.linspace(Xtr_pca2[:,
)= np.stack([xx.ravel(), yy.ravel()], axis=1).astype(np.float32)
grid with torch.no_grad():
= model_pca2.lin(torch.from_numpy(grid)).numpy().reshape(xx.shape)
logits_grid = 1.0 / (1.0 + np.exp(-logits_grid))
probs_grid
=(6, 5))
plt.figure(figsize=20, alpha=0.3)
plt.contourf(xx, yy, probs_grid, levels0], Xtr_pca2[:, 1], s=10, c=y_train, alpha=0.6)
plt.scatter(Xtr_pca2[:, "Decision surface (logistic on top-2 PCs)")
plt.title("PC1")
plt.xlabel("PC2")
plt.ylabel( plt.show()
Plot: ROC curves (test set) — raw vs PCA
def roc_curve(scores, labels):
= np.asarray(scores).ravel()
scores = np.asarray(labels).astype(int).ravel()
labels = np.argsort(-scores, kind="mergesort")
order = labels[order]
y = y.sum()
P = len(y) - P
N = np.cumsum(y)
tps = np.cumsum(1 - y)
fps = tps / (P + 1e-12)
tpr = fps / (N + 1e-12)
fpr return fpr, tpr
= roc_curve(prob_raw, y_test)
fpr_raw, tpr_raw = roc_curve(prob_pca95, y_test)
fpr_95, tpr_95 = roc_curve(prob_pca2, y_test)
fpr_2, tpr_2
=(6, 5))
plt.figure(figsize=f"Raw (AUC={auc_raw:.3f})")
plt.plot(fpr_raw, tpr_raw, label=f"PCA@95% (AUC={auc_pca95:.3f})")
plt.plot(fpr_95, tpr_95, label=f"PCA k=2 (AUC={auc_pca2:.3f})")
plt.plot(fpr_2, tpr_2, label0, 1], [0, 1], linestyle="--")
plt.plot(["FPR")
plt.xlabel("TPR")
plt.ylabel("ROC (test) — logistic SGD")
plt.title(
plt.legend() plt.show()