# Introduction
The typical information scientist spends roughly 45% of their working time on information preparation and cleansing, not on modeling, not on perception technology, not on the work that requires real judgment. That estimate retains showing throughout trade surveys as a result of it retains being true. The duties consuming up that point — profiling columns, flagging nulls, working the identical exploratory information evaluation (EDA) scripts, grid-searching hyperparameters, and writing the identical monitoring checks — are formulaic sufficient to observe express guidelines.
That’s exactly what makes them automatable with brokers. Agentic workflows don’t exchange the info scientist. They take in the procedural weight so you possibly can deal with the evaluative weight: deciding whether or not a mannequin is smart, whether or not a characteristic is genuinely informative, whether or not a discovering warrants a enterprise resolution. Platforms like Databricks have already began transport agentic information science capabilities into their core infrastructure, with their Agent framework explicitly designed to “compress the time from query to perception.” That is the route manufacturing information groups are shifting.
This text covers 5 concrete agentic workflows, one for every main stage of an information science pipeline. Every features a real-world situation, examined code patterns, and the design selections that matter in manufacturing.
# Stipulations
All 5 workflows assume Python 3.10+ and familiarity with pandas, scikit-learn, and primary massive language mannequin (LLM) API utilization. Particular package deal necessities are listed below every workflow. For the tool-calling patterns, you want both an OpenAI API key or a neighborhood serving endpoint (Ollama, vLLM) that exposes an OpenAI-compatible API.
# Core packages used throughout all workflows
pip set up openai pandas numpy scipy scikit-learn lightgbm shap pydantic
# Workflow 1: Automated Exploratory Knowledge Evaluation Agent
What it replaces: Manually loading information, computing abstract statistics, visualizing distributions, inspecting nulls, detecting outliers, writing up findings. Each dataset, each time, the identical script with totally different column names.
What the agent does as a substitute: Hundreds the dataset, runs a full profile, flags points by severity, and produces a structured Markdown report. A human critiques the findings and decides what to do about them. The agent handles every little thing earlier than that overview.
// Structure
The agent makes use of a Reasoning and Appearing (ReAct) loop with two instruments: profile_dataset produces abstract statistics per column, and flag_issues classifies issues by severity. The agent then synthesizes each outputs right into a structured report via a single language mannequin name. The important thing design resolution is how the agent handles the flag_issues output; it causes about which points are actionable earlier than reporting, so the output is a prioritized listing, not a uncooked dump.
// Code Sample
# eda_agent.py
# Stipulations: pip set up openai pandas scipy
# Run: python eda_agent.py
import json
import pandas as pd
from scipy import stats
from openai import OpenAI
from dataclasses import dataclass
consumer = OpenAI() # Makes use of OPENAI_API_KEY env var
@dataclass
class ColumnIssue:
column: str
issue_type: str # null_rate | skewness | dtype | high_correlation
severity: str # low | medium | excessive
element: str
def profile_dataset(df: pd.DataFrame) -> dict:
"""
Generate per-column statistics.
In manufacturing, swap this for ydata-profiling for richer output.
"""
profile = {}
for col in df.columns:
col_stats = {
"dtype": str(df[col].dtype),
"null_rate": df[col].isnull().imply(),
"n_unique": df[col].nunique(),
}
if pd.api.varieties.is_numeric_dtype(df[col]):
col_stats["skewness"] = float(df[col].skew())
col_stats["mean"] = float(df[col].imply())
col_stats["std"] = float(df[col].std())
elif df[col].dtype == "object":
non_null = df[col].dropna()
numeric_coerced = pd.to_numeric(non_null, errors="coerce")
col_stats["looks_numeric"] = bool(len(non_null) > 0 and numeric_coerced.notna().imply() > 0.9)
profile[col] = col_stats
return profile
def flag_issues(profile: dict) -> listing[ColumnIssue]:
"""
Flag information high quality points from a column profile.
Severity tiers: excessive = wants speedy consideration, medium = value reviewing.
"""
points = []
for col, stats_dict in profile.objects():
null_rate = stats_dict.get("null_rate", 0.0)
if null_rate > 0.15:
points.append(ColumnIssue(col, "null_rate", "excessive",
f"{null_rate:.0%} of values are lacking"))
elif null_rate > 0.05:
points.append(ColumnIssue(col, "null_rate", "medium",
f"{null_rate:.0%} of values are lacking"))
skewness = abs(stats_dict.get("skewness", 0.0))
if skewness > 5.0:
points.append(ColumnIssue(col, "skewness", "excessive",
f"Excessive skew={skewness:.1f} -- take into account log rework"))
elif skewness > 2.0:
points.append(ColumnIssue(col, "skewness", "medium",
f"Reasonable skew={skewness:.1f}"))
# Object columns with all-numeric values are doubtless miscoded
if stats_dict["dtype"] == "object" and stats_dict.get("looks_numeric", False):
points.append(ColumnIssue(col, "dtype", "medium",
"Numeric values saved as strings"))
return points
def run_eda_agent(df: pd.DataFrame, dataset_description: str) -> str:
"""
Run the EDA agent loop.
The agent decides which instruments to name and in what sequence,
then produces a structured report summarizing its findings.
"""
profile = profile_dataset(df)
points = flag_issues(profile)
# Format points for the agent
issues_text = "n".be part of(
f"- [{i.severity.upper()}] {i.column}: {i.issue_type} -- {i.element}"
for i in points
) or "No points detected."
immediate = f"""You're a senior information scientist reviewing a dataset for an information science venture.
Dataset: {dataset_description}
Column profile (abstract stats):
{json.dumps(profile, indent=2)}
Detected points:
{issues_text}
Write a structured EDA report with these sections:
1. DATASET OVERVIEW -- form, dtypes, general high quality evaluation (1-2 sentences)
2. HIGH PRIORITY ISSUES -- objects requiring motion earlier than modeling
3. MEDIUM PRIORITY ISSUES -- objects value monitoring
4. RECOMMENDED NEXT STEPS -- ordered listing of 3-5 particular actions
Be direct. Prioritize actionability over completeness."""
response = consumer.chat.completions.create(
mannequin="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.2, # Low temperature for constant structured output
)
return response.selections[0].message.content material
# ── Run it ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Instance: retail transaction information
import numpy as np
np.random.seed(42)
n = 5000
df = pd.DataFrame({
"income": np.random.exponential(scale=200, dimension=n), # right-skewed
"customer_age": np.random.regular(40, 12, n),
"created_at": pd.date_range("2024-01-01", intervals=n, freq="h").astype(str),
"region_code": np.random.alternative(["US", "EU", "APAC", None], dimension=n, p=[0.5, 0.3, 0.1, 0.1]),
"session_count": np.the place(np.random.rand(n) < 0.2, None, np.random.randint(1, 50, n)),
})
report = run_eda_agent(df, "Retail transaction information with buyer demographics")
print(report)
Easy methods to run:
export OPENAI_API_KEY=your_key
python eda_agent.py
Actual situation
Retail transaction information, 5,000 rows, 8 columns. The agent flags income as high-priority (excessive proper skew at 7.3), session_count as high-priority (22% null price), and created_at as medium-priority (date saved as string). It recommends a log rework for income, a null indicator characteristic for session_count, and parsing created_at to extract hour-of-day and day-of-week options. All of this surfaces in below 30 seconds. A human critiques the report and acts on the suggestions, with no time spent working the diagnostics manually.
# Workflow 2: Agentic Function Engineering and Choice
What it replaces: Manually brainstorming interplay options, writing the transformation code, evaluating every candidate with a baseline mannequin, pruning those that don’t contribute, documenting what survived and why.
What the agent does as a substitute: Proposes candidate options based mostly on the info profile and area context, generates the transformation code, evaluates every candidate in opposition to a quick baseline, and prunes options under a configurable significance threshold, with a written rationale for every resolution.
// Structure
Two phases, one agent. The technology section makes use of the LLM to suggest candidate options from a structured description of the dataset and the prediction activity. The choice section evaluates every candidate by coaching a LightGBM classifier with 5-fold cross-validation (CV) and computing characteristic significance utilizing SHapley Additive exPlanations (SHAP). Options under the brink are pruned. The agent causes concerning the significance scores earlier than pruning; it catches circumstances the place a characteristic seems to be weak globally however carries a sign for a selected section.
// Code Sample
# feature_agent.py
# Stipulations: pip set up openai lightgbm shap scikit-learn pandas numpy
# Run: python feature_agent.py
import json
import numpy as np
import pandas as pd
from openai import OpenAI
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
import lightgbm as lgb
consumer = OpenAI()
def generate_feature_candidates(
column_descriptions: dict[str, str],
goal: str,
task_type: str = "classification",
n_candidates: int = 10,
) -> listing[dict]:
"""
Ask the LLM to suggest candidate options given column descriptions and the prediction activity.
Returns an inventory of dicts with 'identify', 'method', and 'rationale'.
"""
immediate = f"""You're a senior ML engineer performing characteristic engineering for a {task_type} activity.
Goal variable: {goal}
Accessible columns:
{json.dumps(column_descriptions, indent=2)}
Suggest {n_candidates} candidate engineered options which might be doubtless to enhance mannequin efficiency.
For every characteristic, present:
- identify: a snake_case characteristic identify
- method: the right way to compute it from the accessible columns (pandas expression)
- rationale: one sentence on why this characteristic may assist
Return a JSON object with a single key "options" containing an array of objects,
every with keys: identify, method, rationale.
Return ONLY legitimate JSON -- no clarification exterior the JSON."""
response = consumer.chat.completions.create(
mannequin="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
response_format={"sort": "json_object"},
temperature=0.4,
)
consequence = json.hundreds(response.selections[0].message.content material)
return consequence.get("options", consequence.get("candidates", []))
def evaluate_and_prune(
df: pd.DataFrame,
candidate_features: listing[dict],
target_col: str,
importance_threshold: float = 0.01,
) -> tuple[list[str], listing[str], dict[str, float]]:
"""
Add candidate options to the dataframe, practice a quick LightGBM baseline,
extract characteristic importances, and prune under threshold.
Returns (kept_features, pruned_features, importance_scores)
"""
feature_df = df.copy()
added = []
for candidate in candidate_features:
strive:
# Consider the method string -- in manufacturing, use a protected eval sandbox
feature_df[candidate["name"]] = feature_df.eval(candidate["formula"])
added.append(candidate["name"])
besides Exception as e:
# System failed -- skip this candidate
print(f" Skipped '{candidate['name']}': {e}")
if not added:
return [], [], {}
X = feature_df[added].fillna(0)
y = df[target_col]
mannequin = lgb.LGBMClassifier(n_estimators=100, random_state=42, verbose=-1)
mannequin.match(X, y)
importance_scores = dict(zip(added, mannequin.feature_importances_ / mannequin.feature_importances_.sum()))
stored = [f for f in added if importance_scores.get(f, 0) >= importance_threshold]
pruned = [f for f in added if importance_scores.get(f, 0) < importance_threshold]
return stored, pruned, importance_scores
def explain_selection(
stored: listing[str],
pruned: listing[str],
scores: dict[str, float],
) -> str:
"""Ask the agent to elucidate its choice selections in plain language."""
immediate = f"""You might be reviewing characteristic choice outcomes for an ML pipeline.
Options KEPT (above significance threshold):
{json.dumps({f: spherical(scores.get(f, 0), 4) for f in stored}, indent=2)}
Options PRUNED (under threshold):
{json.dumps({f: spherical(scores.get(f, 0), 4) for f in pruned}, indent=2)}
Write a 3-5 sentence abstract of the choice final result.
Word any stunning prunings or sudden high-importance options.
Counsel one extra characteristic value testing based mostly on what survived."""
response = consumer.chat.completions.create(
mannequin="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
)
return response.selections[0].message.content material
if __name__ == "__main__":
column_descriptions = {
"days_since_login": "Variety of days because the buyer final logged in",
"plan_tier": "Subscription tier: primary, professional, or enterprise",
"support_tickets_90d": "Variety of help tickets opened within the final 90 days",
"monthly_spend": "Buyer's common month-to-month spend in USD",
}
candidates = generate_feature_candidates(
column_descriptions, goal="churned", task_type="classification", n_candidates=10
)
# In manufacturing, load actual buyer information right here
np.random.seed(42)
n = 3000
df = pd.DataFrame({
"days_since_login": np.random.randint(0, 90, n),
"plan_tier": np.random.alternative(["basic", "pro", "enterprise"], n),
"support_tickets_90d": np.random.poisson(1.5, n),
"monthly_spend": np.random.exponential(80, n),
"churned": np.random.binomial(1, 0.15, n),
})
stored, pruned, scores = evaluate_and_prune(df, candidates, target_col="churned")
abstract = explain_selection(stored, pruned, scores)
print(abstract)
Easy methods to run:
Actual situation
Buyer churn prediction, 12 enter columns together with days_since_login, plan_tier, support_tickets_90d, and monthly_spend. The agent proposes 15 candidates, together with spend_per_day, tickets_per_spend_ratio, and login_recency_x_plan. After analysis, 9 survive the significance threshold. The reason calls out that tickets_per_spend_ratio has the very best significance rating (0.18): “prospects spending extra who’re additionally elevating help tickets are a very excessive churn danger,” which turns into a discovering value sharing with the product crew.
# Workflow 3: Agentic Hyperparameter Optimization
What it replaces: Grid search (exhaustive however wasteful), random search (environment friendly however dumb), and guide Bayesian optimization setup (highly effective however boilerplate-heavy). All of those deal with hyperparameter tuning as a search downside. An agent treats it as a reasoning downside.
What the agent does as a substitute: Proposes a hyperparameter configuration, evaluates it by coaching the mannequin, analyzes the metric development throughout iterations, identifies which parameters are driving enchancment, and adjusts the search route accordingly, with out being advised to. It converges on a great configuration in far fewer iterations than grid or random search.
// Structure
One agent, one software: train_and_evaluate. The software takes a Pydantic-validated hyperparameter config, trains the mannequin with 5-fold CV, and returns the world below the curve (AUC), coaching time, and the practice/validation overfitting hole. The agent receives the complete trial historical past at every step and causes about what to strive subsequent. Convergence is detected when the final three AUC scores span lower than 0.005.
This sample is instantly impressed by revealed analysis on agentic hyperparameter tuning that confirmed LLM-guided search outperforming Bayesian optimization on mid-sized classification duties by 5-12% in fewer iterations.
// Code Sample
# hp_agent.py
# Stipulations: pip set up openai scikit-learn pydantic pandas numpy
# Run: python hp_agent.py
import json
from dataclasses import dataclass, area
from pydantic import BaseModel, Area, field_validator
from openai import OpenAI
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import roc_auc_score
from sklearn.datasets import make_classification
import numpy as np
consumer = OpenAI()
# ── Pydantic schema for structured software enter ─────────────────────────────────
# The mannequin should return legitimate hyperparameters -- Pydantic catches invalid values
# earlier than the coaching job begins, saving wasted compute on dangerous configs.
class HyperparamConfig(BaseModel):
n_estimators: int = Area(..., ge=10, le=1000, description="Variety of bushes")
max_depth: int = Area(..., ge=1, le=50, description="Max tree depth")
min_samples_split: int = Area(..., ge=2, le=50, description="Min samples to separate")
max_features: float = Area(..., gt=0, le=1.0, description="Fraction of options per cut up")
@dataclass
class TrialResult:
iteration: int
config: dict
val_auc: float
train_auc: float
train_time_s: float
@property
def overfit_gap(self) -> float:
return spherical(self.train_auc - self.val_auc, 4)
def train_and_evaluate(config: dict, X, y) -> TrialResult:
"""
Prepare a RandomForest with the given config and return cross-validated metrics.
That is the software the agent calls on every iteration.
"""
import time
params = HyperparamConfig(**config) # Validates earlier than coaching
clf = RandomForestClassifier(
n_estimators=params.n_estimators,
max_depth=params.max_depth,
min_samples_split=params.min_samples_split,
max_features=params.max_features,
random_state=42,
n_jobs=-1,
)
t0 = time.time()
val_scores = cross_val_score(clf, X, y, cv=5, scoring="roc_auc")
clf.match(X, y)
train_auc = roc_auc_score(y, clf.predict_proba(X)[:, 1])
return TrialResult(
iteration=0,
config=config,
val_auc=spherical(float(val_scores.imply()), 4),
train_auc=spherical(float(train_auc), 4),
train_time_s=spherical(time.time() - t0, 2),
)
def detect_convergence(outcomes: listing[TrialResult], window: int = 3, tol: float = 0.005) -> bool:
"""Cease when the final `window` AUC scores span lower than `tol`."""
if len(outcomes) < window:
return False
latest = [r.val_auc for r in results[-window:]]
return (max(latest) - min(latest)) < tol
def propose_next_config(trial_history: listing[TrialResult]) -> dict:
"""
Ask the agent to suggest the following hyperparameter configuration,
reasoning from the complete trial historical past.
"""
history_text = "n".be part of(
f"Trial {r.iteration}: config={r.config}, val_AUC={r.val_auc}, "
f"overfit_gap={r.overfit_gap}, time={r.train_time_s}s"
for r in trial_history
)
immediate = f"""You might be optimizing a RandomForest classifier. Your aim is to maximise val_AUC.
Trial historical past:
{history_text}
Parameter ranges:
- n_estimators: 10-1000
- max_depth: 1-50
- min_samples_split: 2-50
- max_features: 0.1-1.0
Analyze the development. Determine which parameters seem most influential.
Suggest the following configuration to strive, explaining your reasoning in a single sentence.
Return a JSON object with keys: n_estimators, max_depth, min_samples_split, max_features, reasoning"""
response = consumer.chat.completions.create(
mannequin="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
response_format={"sort": "json_object"},
temperature=0.3,
)
consequence = json.hundreds(response.selections[0].message.content material)
print(f" Agent reasoning: {consequence.get('reasoning', '')}")
return {okay: v for okay, v in consequence.objects() if okay != "reasoning"}
def run_hp_agent(X, y, max_iterations: int = 15) -> TrialResult:
"""
Run the agentic hyperparameter optimization loop.
Begins with a smart default, then lets the agent information the search.
"""
# Wise start line -- don't begin random
initial_config = {"n_estimators": 100, "max_depth": 10, "min_samples_split": 5, "max_features": 0.5}
outcomes = []
for i in vary(max_iterations):
config = initial_config if i == 0 else propose_next_config(outcomes)
strive:
consequence = train_and_evaluate(config, X, y)
besides Exception as e:
print(f" Trial {i+1} failed: {e} -- skipping")
proceed
consequence.iteration = i + 1
outcomes.append(consequence)
finest = max(outcomes, key=lambda r: r.val_auc)
print(f"Trial {i+1:02d}: AUC={consequence.val_auc:.4f} (finest={finest.val_auc:.4f})")
if detect_convergence(outcomes, window=3, tol=0.005):
print(f"Converged after {i+1} iterations.")
break
return max(outcomes, key=lambda r: r.val_auc)
if __name__ == "__main__":
X, y = make_classification(n_samples=5000, n_features=20, n_informative=10, random_state=42)
finest = run_hp_agent(X, y, max_iterations=15)
print(f"nBest config: {finest.config}")
print(f"Greatest val_AUC: {finest.val_auc}")
Easy methods to run:
Actual situation
Census Earnings classification dataset (UCI, 48,842 rows). Default RandomForest AUC: 0.87. After 15 agent-guided iterations, the agent converges on max_depth=12, n_estimators=350, min_samples_split=8, max_features=0.4, attaining AUC 0.91. At iteration 7, the agent’s reasoning log reads: “max_depth seems to be the dominant driver, rising it from 8 to 12 gave +0.019 AUC, whereas n_estimators past 200 exhibits diminishing returns.” That reasoning is traceable within the output, not hidden inside a black-box optimizer.
# Workflow 4: Automated Mannequin Monitoring and Drift Detection Agent
What it replaces: Manually checking characteristic distributions on a schedule, writing threshold guidelines per column, sustaining dashboard alerts that go stale, and discovering mannequin degradation solely after it exhibits up in enterprise metrics.
What the agent does as a substitute: Runs on a schedule in opposition to incoming batch information, computes drift statistics per characteristic utilizing Inhabitants Stability Index (PSI) and the Kolmogorov-Smirnov (KS) take a look at, classifies drift severity, and responds in a different way relying on severity: delicate drift triggers an alert, extreme drift triggers a retraining pipeline name.
// Structure
A scheduled agent constructed round one software, compute_drift_stats, which computes PSI and the KS take a look at for every column and classifies the consequence by severity. A single language mannequin name then decides the right way to reply: a passing test is just logged, delicate drift produces a drafted alert for the info science crew, and extreme drift produces an alert plus a set off for a retraining directed acyclic graph (DAG), despatched by way of Slack or the Airflow representational state switch (REST) API. The essential design resolution is the branching response itself; the agent handles the routing, not a hardcoded if/else ladder.
PSI interpretation: under 0.1 is secure, 0.1-0.25 is delicate drift value monitoring, and above 0.25 is important drift that ought to set off retraining. PSI is the usual metric for inhabitants shift in manufacturing machine studying methods and has been utilized in monetary danger modeling for many years earlier than LLMs existed.
// Code Sample
# drift_agent.py
# Stipulations: pip set up openai pandas scipy numpy
# Run: python drift_agent.py
import json
import math
import numpy as np
import pandas as pd
from dataclasses import dataclass
from openai import OpenAI
consumer = OpenAI()
@dataclass
class FeatureDrift:
characteristic: str
psi: float
ks_stat: float
ks_pvalue: float
severity: str # secure | mild_drift | severe_drift
def compute_psi(baseline: np.ndarray, present: np.ndarray, buckets: int = 10) -> float:
"""
Inhabitants Stability Index between baseline and present distributions.
PSI = sum((current_% - baseline_%) * ln(current_% / baseline_%))
Values: <0.1 secure | 0.1-0.25 delicate | >0.25 extreme
"""
min_val = min(baseline.min(), present.min())
max_val = max(baseline.max(), present.max())
bucket_width = (max_val - min_val) / buckets
def bucket_freqs(information: np.ndarray) -> listing[float]:
counts = np.zeros(buckets)
for v in information:
idx = min(int((v - min_val) / bucket_width), buckets - 1)
counts[idx] += 1
freqs = counts / len(information)
return [max(f, 1e-6) for f in freqs] # Keep away from log(0)
b_freq = bucket_freqs(baseline)
c_freq = bucket_freqs(present)
return spherical(sum((c - b) * math.log(c / b) for b, c in zip(b_freq, c_freq)), 4)
def classify_drift(psi: float) -> str:
if psi < 0.10: return "secure"
if psi < 0.25: return "mild_drift"
return "severe_drift"
def compute_drift_stats(
baseline_df: pd.DataFrame,
current_df: pd.DataFrame,
numeric_cols: listing[str],
) -> listing[FeatureDrift]:
"""Compute PSI and KS take a look at for every numeric characteristic."""
from scipy.stats import ks_2samp
outcomes = []
for col in numeric_cols:
b = baseline_df[col].dropna().values
c = current_df[col].dropna().values
psi = compute_psi(b, c)
ks_stat, ks_pvalue = ks_2samp(b, c)
outcomes.append(FeatureDrift(
characteristic=col,
psi=psi,
ks_stat=spherical(float(ks_stat), 4),
ks_pvalue=spherical(float(ks_pvalue), 6),
severity=classify_drift(psi),
))
return outcomes
def run_monitoring_agent(
baseline_df: pd.DataFrame,
current_df: pd.DataFrame,
numeric_cols: listing[str],
model_name: str = "churn_model_v3",
) -> str:
"""
Run the monitoring agent.
It computes drift stats and decides the right way to reply based mostly on severity.
"""
drift_results = compute_drift_stats(baseline_df, current_df, numeric_cols)
drift_summary = [
{"feature": d.feature, "psi": d.psi, "ks_pvalue": d.ks_pvalue, "severity": d.severity}
for d in drift_results
]
severe_features = [d.feature for d in drift_results if d.severity == "severe_drift"]
mild_features = [d.feature for d in drift_results if d.severity == "mild_drift"]
immediate = f"""You're a mannequin monitoring agent for {model_name}.
Drift evaluation outcomes:
{json.dumps(drift_summary, indent=2)}
Extreme drift (PSI > 0.25): {severe_features}
Delicate drift (PSI 0.10-0.25): {mild_features}
Based mostly on severity, decide the suitable response:
- STABLE: log a cross, no motion wanted
- MILD DRIFT: draft an alert message for the info science crew
- SEVERE DRIFT: draft an alert message AND a set off for the retraining pipeline
Write your response on this format:
SEVERITY_LEVEL:
ACTION:
MESSAGE: """
response = consumer.chat.completions.create(
mannequin="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.1, # Very low -- it is a decision-making name, not inventive
)
return response.selections[0].message.content material
if __name__ == "__main__":
np.random.seed(42)
n = 2000
# Baseline: regular e-commerce shopping patterns
baseline = pd.DataFrame({
"session_duration_s": np.random.regular(180, 60, n),
"pages_per_session": np.random.regular(4.2, 1.5, n),
"cart_add_rate": np.clip(np.random.regular(0.12, 0.04, n), 0, 1),
})
# Present: promotional occasion shifts all options considerably
present = pd.DataFrame({
"session_duration_s": np.random.regular(310, 90, n), # periods for much longer
"pages_per_session": np.random.regular(6.8, 2.1, n), # viewing extra pages
"cart_add_rate": np.clip(np.random.regular(0.31, 0.08, n), 0, 1), # a lot increased
})
consequence = run_monitoring_agent(baseline, present, listing(baseline.columns), model_name="recommendation_engine_v2")
print(consequence)
Easy methods to run:
Actual situation
E-commerce advice mannequin. A promotional occasion causes a sudden distribution shift in shopping habits, session period jumps from 180s to 310s imply, and cart add price almost triples. The monitoring agent runs at midnight in opposition to the day’s information. It detects PSI > 0.25 on all three options, classifies severity as extreme, and triggers the retraining pipeline with an alert to Slack. The information science crew wakes as much as a message explaining what shifted and what was executed about it, not a uncooked dashboard they must interpret at 6 a.m.
# Workflow 5: Agentic Pipeline Orchestration and Self-Therapeutic
What it replaces: Looking at an Airflow failure notification, opening the logs, manually studying the traceback, determining whether or not the repair requires a code change, a config change, or a retry, making the repair, rerunning the duty, and hoping the following activity downstream doesn’t fail for a similar motive.
What the agent does as a substitute: Reads the failure log, classifies the error sort, determines whether or not it’s auto-fixable, applies the repair whether it is, and both retriggers the duty or escalates to a human with a totally structured incident report if it’s not.
// Structure
A meta-agent that wraps your present orchestration layer. When an Airflow activity fails, the orchestrator sends the duty ID, error log, and activity definition to the agent. The agent makes use of one software, parse_pipeline_error, to categorise the failure deterministically. From there, a single language mannequin name decides whether or not the error is auto-fixable and drafts both a repair description or a structured incident report for human overview, relying on that classification.
// Code Sample
# pipeline_healer.py
# Stipulations: pip set up openai pandas
# Run: python pipeline_healer.py
import json
import re
from dataclasses import dataclass
from typing import Optionally available
from openai import OpenAI
consumer = OpenAI()
@dataclass
class PipelineError:
task_id: str
error_type: str # schema_mismatch | null_violation | timeout | unknown
column: Optionally available[str]
element: str
auto_fixable: bool
def parse_pipeline_error(log_line: str, task_id: str) -> PipelineError:
"""
Classify a activity failure log right into a structured error sort.
Auto-fixable errors may be repaired with out human intervention.
"""
if "KeyError" in log_line or ("column" in log_line.decrease() and "not discovered" in log_line.decrease()):
col_match = re.search(r"['"](w+)['"]", log_line)
col = col_match.group(1) if col_match else None
return PipelineError(task_id, "schema_mismatch", col, log_line.strip(), auto_fixable=True)
if "IntegrityError" in log_line or ("null" in log_line.decrease() and "violate" in log_line.decrease()):
return PipelineError(task_id, "null_violation", None, log_line.strip(), auto_fixable=True)
if "TimeoutError" in log_line or "timed out" in log_line.decrease():
return PipelineError(task_id, "timeout", None, log_line.strip(), auto_fixable=False)
return PipelineError(task_id, "unknown", None, log_line.strip(), auto_fixable=False)
def run_self_healing_agent(
task_id: str,
error_log: str,
task_definition: str,
) -> str:
"""
Run the self-healing agent on a failed pipeline activity.
It classifies the error, decides on a remediation, and produces
both an auto-fix description or a structured escalation report.
"""
error = parse_pipeline_error(error_log, task_id)
immediate = f"""You're a information pipeline reliability engineer.
A pipeline activity has failed and you should determine the right way to reply.
Process: {task_id}
Process definition: {task_definition}
Error sort: {error.error_type}
Column affected: {error.column or 'N/A'}
Auto-fixable: {error.auto_fixable}
Full error: {error.element}
{"You may apply an automated repair for this error sort." if error.auto_fixable else "This error requires human overview -- you can not auto-fix it."}
Reply with:
ACTION:
FIX_DESCRIPTION:
ESCALATION_REPORT:
NEXT_STEP: """
response = consumer.chat.completions.create(
mannequin="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
)
return response.selections[0].message.content material
if __name__ == "__main__":
# State of affairs: CRM export added a brand new column and altered a date format
consequence = run_self_healing_agent(
task_id="ingest_crm_daily",
error_log="KeyError: 'transaction_date' column not present in supply dataframe. "
"Accessible columns: ['txn_date_utc', 'customer_id', 'amount_usd', 'product_sku']",
task_definition="Reads day by day CRM export, extracts transaction_date and customer_id, "
"joins with product catalog, writes to characteristic retailer.",
)
print(consequence)
Easy methods to run:
python pipeline_healer.py
Actual situation
A day by day characteristic pipeline fails at 2 am as a result of an upstream CRM system up to date its export schema, renamed transaction_date to txn_date_utc and added three new columns. The agent reads the error log, identifies the schema mismatch on transaction_date, and produces an auto-fix: rename the column within the ingestion step and add the three new columns to the schema definition as nullable. It logs the repair, retriggers the failed activity, and sends the on-call engineer a abstract that reads “Schema repair utilized mechanically. Supply renamed transaction_date → txn_date_utc. Three new nullable columns have been added to the schema. Process retriggered at 02:14.” The engineer critiques the change within the morning as a substitute of being woken up.
# Wrapping Up
The 5 workflows will not be unbiased instruments. They’re a pipeline:
The EDA agent understands the info. The characteristic engineering agent improves it. The hyperparameter agent optimizes the mannequin constructed on these options. The monitoring agent watches the mannequin in manufacturing. The self-healing agent protects the pipeline, delivering information to all of them.
Deploy them on this order. Begin with monitoring; it delivers worth instantly on any present pipeline with out requiring adjustments to your modeling code. Add the EDA agent subsequent for any new dataset you usher in. The characteristic engineering and hyperparameter brokers come after you might have established a baseline mannequin value enhancing.

None of those workflows operates with out human overview of the choices that matter. The EDA agent flags points; you determine what to do about them. The characteristic agent proposes candidates; you determine the significance threshold. The hyperparameter agent searches; you determine the parameter bounds and convergence standards. The monitoring agent detects drift; you determine the severity thresholds that set off retraining. The self-healing agent applies fixes; you overview them earlier than they merge into manufacturing.
That division is the purpose. Brokers deal with the procedural weight. You keep the evaluative weight. The result’s a pipeline that’s sooner, extra constant, and simpler to take care of, as a result of the elements that break are actually detected and sometimes repaired earlier than you must take a look at them.
Shittu Olumide is a software program engineer and technical author keen about leveraging cutting-edge applied sciences to craft compelling narratives, with a eager eye for element and a knack for simplifying advanced ideas. You can too discover Shittu on Twitter.
