My first ML model worked beautifully in my Jupyter notebook.
95% accuracy. Clean code. Beautiful visualizations. I was ready to deploy.
Then I tried to put it in production.
The pickle file was 2GB. Loading took 30 seconds. The first prediction took 5 seconds. Memory usage spiked to 8GB. The API crashed every few hours for mysterious reasons.
Turns out, there's a massive gap between "it works in a notebook" and "it works in production."
I spent the next two weeks learning this gap the hard way. Let me save you that time and show you how to bridge it properly.
The Notebook-to-Production Gap
Here's what everyone gets wrong:
In your notebook:
Clean, pre-loaded data
Unlimited memory
Single user (you)
No time pressure
You restart when it breaks
No monitoring needed
In production:
Messy, real-time data
Limited resources
Thousands of concurrent users
Sub-second latency requirements
Must run 24/7 without restarts
Everything must be monitored
The painful transition:
Week 1: "My model works!" ✅
Week 2: "Why is deployment so hard?" 😰
Week 3: "Everything is broken" 🔥
Week 4: "Finally working in production" 😅
Let me show you how to skip Week 2 and 3.
Step 1: Design for Production from Day One
Don't wait until the end to think about deployment.
Bad Approach (What I Did)
python
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
import pickle
df = pd.read_csv('/Users/me/Desktop/data.csv')
X = df.drop('target', axis=1)
y = df['target']
model = RandomForestClassifier(
n_estimators=500,
max_depth=None,
min_samples_split=2,
random_state=42
)
model.fit(X, y)
pickle.dump(model, open('model.pkl', 'wb'))
print("Done! Accuracy:", model.score(X, y))
Problems:
❌ Hardcoded file paths
❌ No data validation
❌ Overfitted model (too complex)
❌ No preprocessing pipeline
❌ Pickle is fragile
❌ No versioning
❌ Training accuracy = overfitting
Good Approach (What I Should Have Done)
python
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import joblib
import mlflow
from pathlib import Path
def load_data(data_path: str) -> pd.DataFrame:
"""Load and validate data"""
df = pd.read_csv(data_path)
required_columns = ['feature1', 'feature2', 'feature3', 'target']
if not all(col in df.columns for col in required_columns):
raise ValueError(f"Missing required columns")
assert df['feature1'].dtype == 'float64'
assert df['target'].dtype == 'int64'
return df
def create_pipeline() -> Pipeline:
"""Create reproducible ML pipeline"""
return Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(
n_estimators=100,
max_depth=10,
min_samples_split=10,
random_state=42,
n_jobs=-1
))
])
def train_model(data_path: str, model_path: str):
"""Train model with proper validation"""
df = load_data(data_path)
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
pipeline = create_pipeline()
with mlflow.start_run():
cv_scores = cross_val_score(
pipeline, X_train, y_train, cv=5, scoring='accuracy'
)
print(f"CV Accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})")
pipeline.fit(X_train, y_train)
test_score = pipeline.score(X_test, y_test)
print(f"Test Accuracy: {test_score:.4f}")
mlflow.log_param("n_estimators", 100)
mlflow.log_param("max_depth", 10)
mlflow.log_metric("cv_accuracy", cv_scores.mean())
mlflow.log_metric("test_accuracy", test_score)
joblib.dump(pipeline, model_path)
mlflow.sklearn.log_model(pipeline, "model")
print(f"Model saved to {model_path}")
if __name__ == "__main__":
train_model(
data_path="data/train.csv",
model_path="models/model_v1.pkl"
)Improvements:
✅ Proper train/test split
✅ Cross-validation
✅ Data validation
✅ Pipeline for preprocessing
✅ Reasonable hyperparameters
✅ MLflow tracking
✅ Configurable paths
Step 2: Optimize Model Size and Speed
Your 2GB model with 30-second load time won't work in production.
The Problem
python
model = RandomForestClassifier(
n_estimators=1000,
max_depth=None,
)
model.fit(X_train, y_train)
joblib.dump(model, 'model.pkl')
The Solution: Model Optimization
python
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
import joblib
import time
import os
def optimize_model_size(X_train, y_train, X_test, y_test):
"""Find the smallest model that maintains performance"""
large_model = RandomForestClassifier(
n_estimators=500,
max_depth=None,
random_state=42
)
large_model.fit(X_train, y_train)
large_accuracy = large_model.score(X_test, y_test)
joblib.dump(large_model, 'large_model.pkl')
large_size = os.path.getsize('large_model.pkl') / (1024 * 1024)
print(f"Large Model:")
print(f" Accuracy: {large_accuracy:.4f}")
print(f" Size: {large_size:.1f} MB")
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, 15],
'min_samples_split': [10, 20, 50]
}
grid_search = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid,
cv=3,
scoring='accuracy',
n_jobs=-1
)
grid_search.fit(X_train, y_train)
optimized_model = grid_search.best_estimator_
optimized_accuracy = optimized_model.score(X_test, y_test)
joblib.dump(optimized_model, 'optimized_model.pkl')
optimized_size = os.path.getsize('optimized_model.pkl') / (1024 * 1024)
print(f"\nOptimized Model:")
print(f" Accuracy: {optimized_accuracy:.4f}")
print(f" Size: {optimized_size:.1f} MB")
print(f" Accuracy drop: {large_accuracy - optimized_accuracy:.4f}")
print(f" Size reduction: {(1 - optimized_size/large_size)*100:.1f}%")
return optimized_model
Key optimization techniques:
1. Reduce Number of Estimators
python
model = RandomForestClassifier(n_estimators=1000)
model = RandomForestClassifier(n_estimators=100)
2. Limit Tree Depth
python
model = RandomForestClassifier(max_depth=None)
model = RandomForestClassifier(max_depth=10)
3. Use Compression
python
joblib.dump(model, 'model.pkl')
joblib.dump(model, 'model.pkl', compress=3)
4. Consider Simpler Models
python
from sklearn.ensemble import RandomForestClassifier
rf_model = RandomForestClassifier(n_estimators=100)
from sklearn.linear_model import LogisticRegression
lr_model = LogisticRegression()
Step 3: Build a Production-Ready API
Don't just slap Flask on your model and call it done.
Bad API (Rookie Mistakes)
python
from flask import Flask, request
import pickle
app = Flask(__name__)
model = pickle.load(open('model.pkl', 'rb'))
@app.route('/predict', methods=['POST'])
def predict():
data = request.json
features = [data['f1'], data['f2'], data['f3']]
prediction = model.predict([features])
return {'prediction': int(prediction[0])}
if __name__ == '__main__':
app.run(debug=True) Problems:
❌ No input validation
❌ No error handling
❌ No logging
❌ No monitoring
❌ Debug mode enabled
❌ Blocking model load
Good API (Production-Ready)
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, validator
from prometheus_client import Counter, Histogram, generate_latest
from prometheus_fastapi_instrumentator import Instrumentator
import joblib
import logging
import time
from typing import List
import numpy as np
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI(
title="ML Prediction API",
version="1.0.0",
description="Production ML model serving"
)
predictions_counter = Counter(
'ml_predictions_total',
'Total predictions made',
['model_version', 'prediction_class']
)
prediction_latency = Histogram(
'ml_prediction_latency_seconds',
'Prediction latency'
)
prediction_errors = Counter(
'ml_prediction_errors_total',
'Total prediction errors',
['error_type']
)
Instrumentator().instrument(app).expose(app)
class PredictionRequest(BaseModel):
feature1: float
feature2: float
feature3: float
feature4: float
feature5: float
@validator('feature1', 'feature2', 'feature3', 'feature4', 'feature5')
def validate_features(cls, v):
if not -100 <= v <= 100:
raise ValueError('Feature value out of valid range')
if np.isnan(v) or np.isinf(v):
raise ValueError('Feature contains NaN or Inf')
return v
class PredictionResponse(BaseModel):
prediction: int
probability: float
model_version: str
latency_ms: float
model = None
MODEL_VERSION = "v1.0.0"
@app.on_event("startup")
async def load_model():
"""Load model at startup (non-blocking)"""
global model
try:
logger.info("Loading model...")
model = joblib.load('models/model.pkl')
logger.info(f"Model {MODEL_VERSION} loaded successfully")
except Exception as e:
logger.error(f"Failed to load model: {e}")
raise e
@app.get("/health")
async def health_check():
"""Health check endpoint"""
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
return {
"status": "healthy",
"model_version": MODEL_VERSION
}
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Make prediction with full error handling and monitoring"""
start_time = time.time()
try:
if model is None:
prediction_errors.labels(error_type='model_not_loaded').inc()
raise HTTPException(
status_code=503,
detail="Model not loaded"
)
features = np.array([[
request.feature1,
request.feature2,
request.feature3,
request.feature4,
request.feature5
]])
prediction = model.predict(features)[0]
probability = model.predict_proba(features)[0]
latency = time.time() - start_time
prediction_latency.observe(latency)
predictions_counter.labels(
model_version=MODEL_VERSION,
prediction_class=str(prediction)
).inc()
logger.info(
f"Prediction made: class={prediction}, "
f"prob={probability[prediction]:.4f}, "
f"latency={latency*1000:.2f}ms"
)
return PredictionResponse(
prediction=int(prediction),
probability=float(probability[prediction]),
model_version=MODEL_VERSION,
latency_ms=round(latency * 1000, 2)
)
except ValueError as e:
prediction_errors.labels(error_type='validation_error').inc()
logger.warning(f"Validation error: {e}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
prediction_errors.labels(error_type='unexpected_error').inc()
logger.error(f"Prediction error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
from starlette.responses import Response
return Response(content=generate_latest(), media_type="text/plain")
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info"
)Improvements:
✅ Input validation with Pydantic
✅ Comprehensive error handling
✅ Structured logging
✅ Prometheus metrics
✅ Health check endpoint
✅ Model loaded at startup
✅ Type hints everywhere
✅ Proper HTTP status codes
Step 4: Containerize with Docker
Make it run anywhere.
Dockerfile
dockerfile
# Multi-stage build for smaller image
FROM python:3.10-slim as builder
# Install build dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/listsBuild and Run
bash
docker build -t ml-api:v1.0.0 .
docker run -d \
--name ml-api \
-p 8000:8000 \
--memory="1g" \
--cpus="2" \
ml-api:v1.0.0
curl -X POST http://localhost:8000/predict \
-H "Content-Type: application/json" \
-d '{
"feature1": 0.5,
"feature2": 1.2,
"feature3": -0.3,
"feature4": 0.8,
"feature5": -1.1
}'Step 5: Handle Common Production Issues
Real problems you'll face.
Issue #1: Memory Leaks
Problem:
python
predictions = []
@app.post("/predict")
def predict(data):
prediction = model.predict(data)
predictions.append(prediction)
return predictionSolution:
python
from collections import deque
predictions = deque(maxlen=1000)
@app.post("/predict")
def predict(data):
prediction = model.predict(data)
predictions.append(prediction)
return predictionIssue #2: Cold Start Latency
Problem:
python
@app.post("/predict")
def predict(data):
model = joblib.load('model.pkl')
return model.predict(data)Solution:
python
model = None
@app.on_event("startup")
async def load_model():
global model
model = joblib.load('model.pkl')
dummy_data = np.zeros((1, 5))
model.predict(dummy_data)
logger.info("Model warmed up")
@app.post("/predict")
def predict(data):
return model.predict(data) Issue #3: Concurrent Request Handling
Problem:
Solution:
python
uvicorn main:app --workers 4
gunicorn main:app \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000
Issue #4: Feature Preprocessing Consistency
Problem:
python
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
scaler = StandardScaler()
X_new_scaled = scaler.transform(X_new)
Solution:
python
from sklearn.pipeline import Pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', RandomForestClassifier())
])
pipeline.fit(X_train, y_train)
joblib.dump(pipeline, 'model.pkl')
loaded_pipeline = joblib.load('model.pkl')
prediction = loaded_pipeline.predict(X_new) Issue #5: Model Version Confusion
Problem:
python
joblib.dump(model, 'model.pkl')
Solution:
python
from datetime import datetime
version = datetime.now().strftime("%Y%m%d_%H%M")
model_path = f'models/model_{version}.pkl'
joblib.dump(pipeline, model_path)
mlflow.log_artifact(model_path)
mlflow.set_tag("version", version)
import os
os.symlink(model_path, 'models/model_production.pkl')Step 6: Testing Production Models
Don't skip this!
Unit Tests
python
import pytest
import numpy as np
import joblib
@pytest.fixture
def model():
return joblib.load('models/model.pkl')
def test_model_loads():
"""Test model can be loaded"""
model = joblib.load('models/model.pkl')
assert model is not None
def test_model_predicts():
"""Test model makes predictions"""
model = joblib.load('models/model.pkl')
X_test = np.array([[0.5, 1.2, -0.3, 0.8, -1.1]])
prediction = model.predict(X_test)
assert prediction.shape == (1,)
assert prediction[0] in [0, 1]
def test_model_probability():
"""Test model returns probabilities"""
model = joblib.load('models/model.pkl')
X_test = np.array([[0.5, 1.2, -0.3, 0.8, -1.1]])
proba = model.predict_proba(X_test)
assert proba.shape == (1, 2)
assert np.isclose(proba.sum(), 1.0)
assert np.all(proba >= 0) and np.all(proba <= 1)
def test_model_handles_edge_cases():
"""Test model with edge case inputs"""
model = joblib.load('models/model.pkl')
X_zeros = np.zeros((1, 5))
pred_zeros = model.predict(X_zeros)
assert pred_zeros.shape == (1,)
X_large = np.ones((1, 5)) * 100
pred_large = model.predict(X_large)
assert pred_large.shape == (1,)
def test_model_batch_prediction():
"""Test model handles batch predictions"""
model = joblib.load('models/model.pkl')
X_batch = np.random.randn(100, 5)
predictions = model.predict(X_batch)
assert predictions.shape == (100,)Integration Tests
python
from fastapi.testclient import TestClient
from production_api import app
client = TestClient(app)
def test_health_check():
"""Test health endpoint"""
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_predict_valid_input():
"""Test prediction with valid input"""
response = client.post("/predict", json={
"feature1": 0.5,
"feature2": 1.2,
"feature3": -0.3,
"feature4": 0.8,
"feature5": -1.1
})
assert response.status_code == 200
data = response.json()
assert "prediction" in data
assert "probability" in data
assert "model_version" in data
assert data["prediction"] in [0, 1]
assert 0 <= data["probability"] <= 1
def test_predict_invalid_input():
"""Test prediction with invalid input"""
response = client.post("/predict", json={
"feature1": 0.5
})
assert response.status_code == 422
response = client.post("/predict", json={
"feature1": 1000,
"feature2": 1.2,
"feature3": -0.3,
"feature4": 0.8,
"feature5": -1.1
})
assert response.status_code == 400
def test_metrics_endpoint():
"""Test Prometheus metrics"""
response = client.get("/metrics")
assert response.status_code == 200
assert "ml_predictions_total" in response.textLoad Testing
python
import requests
import time
import concurrent.futures
import numpy as np
def make_prediction():
"""Make a single prediction"""
data = {
"feature1": float(np.random.randn()),
"feature2": float(np.random.randn()),
"feature3": float(np.random.randn()),
"feature4": float(np.random.randn()),
"feature5": float(np.random.randn())
}
start = time.time()
response = requests.post(
"http://localhost:8000/predict",
json=data
)
latency = time.time() - start
return {
"status": response.status_code,
"latency": latency
}
def load_test(num_requests=1000, num_workers=10):
"""Run load test"""
print(f"Running load test: {num_requests} requests, {num_workers} workers")
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(make_prediction) for _ in range(num_requests)]
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
latencies = [r['latency'] for r in results]
success_count = sum(1 for r in results if r['status'] == 200)
print(f"\nResults:")
print(f" Success rate: {success_count/len(results)*100:.2f}%")
print(f" Mean latency: {np.mean(latencies)*1000:.2f}ms")
print(f" P50 latency: {np.percentile(latencies, 50)*1000:.2f}ms")
print(f" P95 latency: {np.percentile(latencies, 95)*1000:.2f}ms")
print(f" P99 latency: {np.percentile(latencies, 99)*1000:.2f}ms")
print(f" Max latency: {np.max(latencies)*1000:.2f}ms")
if __name__ == "__main__":
load_test(num_requests=1000, num_workers=10)Step 7: Deployment Checklist
Before you go to production:
Model Checklist
✅ Model size < 500MB (preferably < 100MB)
✅ Load time < 5 seconds
✅ Prediction latency < 100ms (P95)
✅ Proper train/test split used
✅ Cross-validation performed
✅ Model versioned and logged
✅ Pipeline includes preprocessing
✅ Compression enabled
API Checklist
✅ Input validation (Pydantic)
✅ Error handling (try/except)
✅ Logging configured
✅ Metrics exposed (Prometheus)
✅ Health check endpoint
✅ Model loaded at startup
✅ No debug mode
✅ Type hints added
Docker Checklist
✅ Multi-stage build
✅ Non-root user
✅ Health check configured
✅ Resource limits set
✅ Image size < 1GB
✅ Security scanning passed
Testing Checklist
✅ Unit tests (model behavior)
✅ Integration tests (API endpoints)
✅ Load testing (handles concurrent requests)
✅ Edge case testing
✅ All tests pass
Monitoring Checklist
✅ Prometheus metrics
✅ Grafana dashboard
✅ Alerts configured
✅ Logging to centralized system
✅ Performance tracking
Common Production Mistakes
Mistake #1: Not Using a Pipeline
❌ Don't:
python
scaler.fit(X_train)
X_scaled = scaler.transform(X_train)
model.fit(X_scaled, y_train)
scaler = StandardScaler()
X_new_scaled = scaler.transform(X_new)
✅ Do:
python
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', RandomForestClassifier())
])
pipeline.fit(X_train, y_train)
prediction = pipeline.predict(X_new)Mistake #2: Overfitting in Notebooks
❌ Don't:
✅ Do:
Mistake #3: No Error Handling
❌ Don't:
python
@app.post("/predict")
def predict(data):
return model.predict(data) ✅ Do:
python
@app.post("/predict")
def predict(request: PredictionRequest):
try:
return model.predict(request.features)
except Exception as e:
logger.error(f"Prediction failed: {e}")
raise HTTPException(status_code=500)Mistake #4: Loading Model Every Request
❌ Don't:
python
@app.post("/predict")
def predict(data):
model = joblib.load('model.pkl')
return model.predict(data)✅ Do:
python
model = None
@app.on_event("startup")
def load_model():
global model
model = joblib.load('model.pkl')
@app.post("/predict")
def predict(data):
return model.predict(data)
Conclusion
Going from notebook to production isn't just "save the model and deploy."
The complete journey:
Train properly - Validation, pipelines, versioning
Optimize - Model size, speed, efficiency
Build API - Validation, errors, monitoring
Containerize - Docker, health checks, security
Test - Unit, integration, load tests
Monitor - Metrics, logs, alerts
Deploy - With confidence
What you get:
✅ Models that actually work in production
✅ Fast, reliable predictions
✅ Proper error handling
✅ Full observability
✅ Easy rollbacks
✅ Scalable deployment
The gap between notebook and production is real. But it's bridgeable.
Want to see a complete production ML system?
Check out my fraud detection project with the full production setup:
GitHub: github.com/Shodexco/fraud-detection-production
Questions? Let's connect:
Portfolio: jonathansodeke.framer.website
GitHub: github.com/Shodexco
LinkedIn: [Your LinkedIn]
Now go deploy your models properly. Production is waiting.
About the Author
Jonathan Sodeke is a Data Engineer and ML Engineer who learned production deployment the hard way (by breaking it in every possible way first). He now builds ML systems that actually work in production, with proper monitoring, testing, and deployment pipelines.
When he's not optimizing model sizes at 2am, he's helping others bridge the notebook-to-production gap.
Portfolio: jonathansodeke.framer.website
GitHub: github.com/Shodexco
LinkedIn: [Your LinkedIn URL]