From Sklearn to Production: Deploying ML Models That Actually Work

Nov 23, 2024

My first ML model worked beautifully in my Jupyter notebook.

95% accuracy. Clean code. Beautiful visualizations. I was ready to deploy.

Then I tried to put it in production.

The pickle file was 2GB. Loading took 30 seconds. The first prediction took 5 seconds. Memory usage spiked to 8GB. The API crashed every few hours for mysterious reasons.

Turns out, there's a massive gap between "it works in a notebook" and "it works in production."

I spent the next two weeks learning this gap the hard way. Let me save you that time and show you how to bridge it properly.

The Notebook-to-Production Gap

Here's what everyone gets wrong:

In your notebook:

  • Clean, pre-loaded data

  • Unlimited memory

  • Single user (you)

  • No time pressure

  • You restart when it breaks

  • No monitoring needed

In production:

  • Messy, real-time data

  • Limited resources

  • Thousands of concurrent users

  • Sub-second latency requirements

  • Must run 24/7 without restarts

  • Everything must be monitored

The painful transition:

Week 1: "My model works!" 
Week 2: "Why is deployment so hard?" 😰
Week 3: "Everything is broken" 🔥
Week 4: "Finally working in production" 😅

Let me show you how to skip Week 2 and 3.

Step 1: Design for Production from Day One

Don't wait until the end to think about deployment.

Bad Approach (What I Did)

python

# notebook.ipynb

import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
import pickle

# Load data (works on my machine!)
df = pd.read_csv('/Users/me/Desktop/data.csv')

# Train model
X = df.drop('target', axis=1)
y = df['target']

model = RandomForestClassifier(
    n_estimators=500,      # Way too many!
    max_depth=None,        # Unlimited depth
    min_samples_split=2,   # Will overfit
    random_state=42
)

model.fit(X, y)

# Save model
pickle.dump(model, open('model.pkl', 'wb'))

print("Done! Accuracy:", model.score(X, y))
# Accuracy: 0.99 (overfitted garbage)

Problems:

  • ❌ Hardcoded file paths

  • ❌ No data validation

  • ❌ Overfitted model (too complex)

  • ❌ No preprocessing pipeline

  • ❌ Pickle is fragile

  • ❌ No versioning

  • ❌ Training accuracy = overfitting

Good Approach (What I Should Have Done)

python

# train.py

import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import joblib
import mlflow
from pathlib import Path

def load_data(data_path: str) -> pd.DataFrame:
    """Load and validate data"""
    df = pd.read_csv(data_path)
    
    # Validate schema
    required_columns = ['feature1', 'feature2', 'feature3', 'target']
    if not all(col in df.columns for col in required_columns):
        raise ValueError(f"Missing required columns")
    
    # Validate data types
    assert df['feature1'].dtype == 'float64'
    assert df['target'].dtype == 'int64'
    
    return df

def create_pipeline() -> Pipeline:
    """Create reproducible ML pipeline"""
    return Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', RandomForestClassifier(
            n_estimators=100,     # Reasonable size
            max_depth=10,         # Prevent overfitting
            min_samples_split=10, # Regularization
            random_state=42,
            n_jobs=-1
        ))
    ])

def train_model(data_path: str, model_path: str):
    """Train model with proper validation"""
    
    # Load data
    df = load_data(data_path)
    
    X = df.drop('target', axis=1)
    y = df['target']
    
    # Proper train/test split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    # Create pipeline
    pipeline = create_pipeline()
    
    # Train with MLflow tracking
    with mlflow.start_run():
        
        # Cross-validation for robust estimate
        cv_scores = cross_val_score(
            pipeline, X_train, y_train, cv=5, scoring='accuracy'
        )
        
        print(f"CV Accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})")
        
        # Train on full training set
        pipeline.fit(X_train, y_train)
        
        # Evaluate on held-out test set
        test_score = pipeline.score(X_test, y_test)
        print(f"Test Accuracy: {test_score:.4f}")
        
        # Log to MLflow
        mlflow.log_param("n_estimators", 100)
        mlflow.log_param("max_depth", 10)
        mlflow.log_metric("cv_accuracy", cv_scores.mean())
        mlflow.log_metric("test_accuracy", test_score)
        
        # Save model
        joblib.dump(pipeline, model_path)
        mlflow.sklearn.log_model(pipeline, "model")
        
        print(f"Model saved to {model_path}")

if __name__ == "__main__":
    train_model(
        data_path="data/train.csv",
        model_path="models/model_v1.pkl"
    )

Improvements:

  • ✅ Proper train/test split

  • ✅ Cross-validation

  • ✅ Data validation

  • ✅ Pipeline for preprocessing

  • ✅ Reasonable hyperparameters

  • ✅ MLflow tracking

  • ✅ Configurable paths

Step 2: Optimize Model Size and Speed

Your 2GB model with 30-second load time won't work in production.

The Problem

python

# Huge, slow model
model = RandomForestClassifier(
    n_estimators=1000,  # 1000 trees!
    max_depth=None,     # Unlimited depth
)

model.fit(X_train, y_train)

# Save
joblib.dump(model, 'model.pkl')

# File size: 2.1 GB 🔥
# Load time: 28 seconds 🔥
# Memory usage: 8 GB 🔥
# Prediction time: 450ms 🔥

The Solution: Model Optimization

python

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
import joblib
import time
import os

def optimize_model_size(X_train, y_train, X_test, y_test):
    """Find the smallest model that maintains performance"""
    
    # Baseline: Large model
    large_model = RandomForestClassifier(
        n_estimators=500,
        max_depth=None,
        random_state=42
    )
    large_model.fit(X_train, y_train)
    large_accuracy = large_model.score(X_test, y_test)
    
    # Save and check size
    joblib.dump(large_model, 'large_model.pkl')
    large_size = os.path.getsize('large_model.pkl') / (1024 * 1024)  # MB
    
    print(f"Large Model:")
    print(f"  Accuracy: {large_accuracy:.4f}")
    print(f"  Size: {large_size:.1f} MB")
    
    # Optimized: Smaller model
    param_grid = {
        'n_estimators': [50, 100, 200],
        'max_depth': [5, 10, 15],
        'min_samples_split': [10, 20, 50]
    }
    
    grid_search = GridSearchCV(
        RandomForestClassifier(random_state=42),
        param_grid,
        cv=3,
        scoring='accuracy',
        n_jobs=-1
    )
    
    grid_search.fit(X_train, y_train)
    
    optimized_model = grid_search.best_estimator_
    optimized_accuracy = optimized_model.score(X_test, y_test)
    
    joblib.dump(optimized_model, 'optimized_model.pkl')
    optimized_size = os.path.getsize('optimized_model.pkl') / (1024 * 1024)
    
    print(f"\nOptimized Model:")
    print(f"  Accuracy: {optimized_accuracy:.4f}")
    print(f"  Size: {optimized_size:.1f} MB")
    print(f"  Accuracy drop: {large_accuracy - optimized_accuracy:.4f}")
    print(f"  Size reduction: {(1 - optimized_size/large_size)*100:.1f}%")
    
    return optimized_model

# Example results:
# Large Model:
#   Accuracy: 0.9523
#   Size: 2,147 MB
#
# Optimized Model:
#   Accuracy: 0.9501
#   Size: 156 MB
#   Accuracy drop: 0.0022
#   Size reduction: 92.7%

Key optimization techniques:

1. Reduce Number of Estimators

python

# Too many
model = RandomForestClassifier(n_estimators=1000)  # Overkill

# Just right
model = RandomForestClassifier(n_estimators=100)   # Usually enough

2. Limit Tree Depth

python

# Too deep
model = RandomForestClassifier(max_depth=None)  # Unlimited

# Reasonable depth
model = RandomForestClassifier(max_depth=10)    # Prevents overfitting

3. Use Compression

python

# Uncompressed
joblib.dump(model, 'model.pkl')
# Size: 500 MB

# Compressed
joblib.dump(model, 'model.pkl', compress=3)
# Size: 120 MB

4. Consider Simpler Models

python

# Complex: Random Forest
from sklearn.ensemble import RandomForestClassifier
rf_model = RandomForestClassifier(n_estimators=100)
# Size: 200 MB, Latency: 50ms

# Simple: Logistic Regression
from sklearn.linear_model import LogisticRegression
lr_model = LogisticRegression()
# Size: 50 KB, Latency: 2ms

# Only use complex models if you need them!

Step 3: Build a Production-Ready API

Don't just slap Flask on your model and call it done.

Bad API (Rookie Mistakes)

python

# bad_api.py

from flask import Flask, request
import pickle

app = Flask(__name__)

# Load model at import (blocks startup)
model = pickle.load(open('model.pkl', 'rb'))

@app.route('/predict', methods=['POST'])
def predict():
    # No validation!
    data = request.json
    
    # No error handling!
    features = [data['f1'], data['f2'], data['f3']]
    
    # No logging!
    prediction = model.predict([features])
    
    # No monitoring!
    return {'prediction': int(prediction[0])}

if __name__ == '__main__':
    app.run(debug=True)  # Debug mode in production?!

Problems:

  • ❌ No input validation

  • ❌ No error handling

  • ❌ No logging

  • ❌ No monitoring

  • ❌ Debug mode enabled

  • ❌ Blocking model load

Good API (Production-Ready)

python

# production_api.py

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, validator
from prometheus_client import Counter, Histogram, generate_latest
from prometheus_fastapi_instrumentator import Instrumentator
import joblib
import logging
import time
from typing import List
import numpy as np

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Initialize FastAPI
app = FastAPI(
    title="ML Prediction API",
    version="1.0.0",
    description="Production ML model serving"
)

# Prometheus metrics
predictions_counter = Counter(
    'ml_predictions_total',
    'Total predictions made',
    ['model_version', 'prediction_class']
)

prediction_latency = Histogram(
    'ml_prediction_latency_seconds',
    'Prediction latency'
)

prediction_errors = Counter(
    'ml_prediction_errors_total',
    'Total prediction errors',
    ['error_type']
)

# Add default metrics
Instrumentator().instrument(app).expose(app)

# Input validation with Pydantic
class PredictionRequest(BaseModel):
    feature1: float
    feature2: float
    feature3: float
    feature4: float
    feature5: float
    
    @validator('feature1', 'feature2', 'feature3', 'feature4', 'feature5')
    def validate_features(cls, v):
        if not -100 <= v <= 100:
            raise ValueError('Feature value out of valid range')
        if np.isnan(v) or np.isinf(v):
            raise ValueError('Feature contains NaN or Inf')
        return v

class PredictionResponse(BaseModel):
    prediction: int
    probability: float
    model_version: str
    latency_ms: float

# Global model variable
model = None
MODEL_VERSION = "v1.0.0"

@app.on_event("startup")
async def load_model():
    """Load model at startup (non-blocking)"""
    global model
    try:
        logger.info("Loading model...")
        model = joblib.load('models/model.pkl')
        logger.info(f"Model {MODEL_VERSION} loaded successfully")
    except Exception as e:
        logger.error(f"Failed to load model: {e}")
        raise e

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")
    return {
        "status": "healthy",
        "model_version": MODEL_VERSION
    }

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """Make prediction with full error handling and monitoring"""
    
    start_time = time.time()
    
    try:
        # Validate model is loaded
        if model is None:
            prediction_errors.labels(error_type='model_not_loaded').inc()
            raise HTTPException(
                status_code=503,
                detail="Model not loaded"
            )
        
        # Extract features
        features = np.array([[
            request.feature1,
            request.feature2,
            request.feature3,
            request.feature4,
            request.feature5
        ]])
        
        # Make prediction
        prediction = model.predict(features)[0]
        probability = model.predict_proba(features)[0]
        
        # Calculate latency
        latency = time.time() - start_time
        
        # Record metrics
        prediction_latency.observe(latency)
        predictions_counter.labels(
            model_version=MODEL_VERSION,
            prediction_class=str(prediction)
        ).inc()
        
        # Log prediction
        logger.info(
            f"Prediction made: class={prediction}, "
            f"prob={probability[prediction]:.4f}, "
            f"latency={latency*1000:.2f}ms"
        )
        
        return PredictionResponse(
            prediction=int(prediction),
            probability=float(probability[prediction]),
            model_version=MODEL_VERSION,
            latency_ms=round(latency * 1000, 2)
        )
        
    except ValueError as e:
        # Validation error
        prediction_errors.labels(error_type='validation_error').inc()
        logger.warning(f"Validation error: {e}")
        raise HTTPException(status_code=400, detail=str(e))
    
    except Exception as e:
        # Unexpected error
        prediction_errors.labels(error_type='unexpected_error').inc()
        logger.error(f"Prediction error: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Internal server error")

@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint"""
    from starlette.responses import Response
    return Response(content=generate_latest(), media_type="text/plain")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8000,
        log_level="info"
    )

Improvements:

  • ✅ Input validation with Pydantic

  • ✅ Comprehensive error handling

  • ✅ Structured logging

  • ✅ Prometheus metrics

  • ✅ Health check endpoint

  • ✅ Model loaded at startup

  • ✅ Type hints everywhere

  • ✅ Proper HTTP status codes

Step 4: Containerize with Docker

Make it run anywhere.

Dockerfile

dockerfile
  
# Multi-stage build for smaller image
FROM python:3.10-slim as builder

# Install build dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# Create virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Final stage
FROM python:3.10-slim

# Copy virtual environment
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# Set working directory
WORKDIR /app

# Copy application code
COPY production_api.py .
COPY models/ models/

# Create non-root user
RUN useradd -m -u 1000 appuser && \
    chown -R appuser:appuser /app
USER appuser

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8000/health')"

# Run application
CMD ["uvicorn", "production_api:app", "--host", "0.0.0.0", "--port", "8000"]
```

### **requirements.txt**
```
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
scikit-learn==1.3.2
joblib==1.3.2
numpy==1.26.2
prometheus-client==0.19.0
prometheus-fastapi-instrumentator==6.1.0

Build and Run

bash


# Build image
docker build -t ml-api:v1.0.0 .

# Run container
docker run -d \
  --name ml-api \
  -p 8000:8000 \
  --memory="1g" \
  --cpus="2" \
  ml-api:v1.0.0

# Test
curl -X POST http://localhost:8000/predict \
  -H "Content-Type: application/json" \
  -d '{
    "feature1": 0.5,
    "feature2": 1.2,
    "feature3": -0.3,
    "feature4": 0.8,
    "feature5": -1.1
  }'

Step 5: Handle Common Production Issues

Real problems you'll face.

Issue #1: Memory Leaks

Problem:

python

# Memory leak!
predictions = []  # This list grows forever

@app.post("/predict")
def predict(data):
    prediction = model.predict(data)
    predictions.append(prediction)  # Memory leak!
    return prediction

Solution:

python

# Use bounded collections
from collections import deque

# Keep only last 1000 predictions
predictions = deque(maxlen=1000)

@app.post("/predict")
def predict(data):
    prediction = model.predict(data)
    predictions.append(prediction)  # Safe!
    return prediction

Issue #2: Cold Start Latency

Problem:

python

# First prediction is slow
@app.post("/predict")
def predict(data):
    model = joblib.load('model.pkl')  # Loads every time!
    return model.predict(data)

Solution:

python

# Load once at startup
model = None

@app.on_event("startup")
async def load_model():
    global model
    model = joblib.load('model.pkl')
    
    # Warm up model with dummy prediction
    dummy_data = np.zeros((1, 5))
    model.predict(dummy_data)
    logger.info("Model warmed up")

@app.post("/predict")
def predict(data):
    return model.predict(data)  # Fast!

Issue #3: Concurrent Request Handling

Problem:

python

# Scikit-learn models aren't thread-safe during prediction
# Multiple requests can cause issues

Solution:

python

# Use process-based workers, not threads
# In your deployment:
uvicorn main:app --workers 4  # 4 processes

# Or use gunicorn
gunicorn main:app \
  --workers 4 \
  --worker-class uvicorn.workers.UvicornWorker \
  --bind 0.0.0.0:8000

Issue #4: Feature Preprocessing Consistency

Problem:

python

# Training preprocessing
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)

# Prediction preprocessing (different scaler!)
scaler = StandardScaler()  # Wrong! Different mean/std
X_new_scaled = scaler.transform(X_new)

Solution:

python

# Use sklearn Pipeline - preprocessing is part of the model
from sklearn.pipeline import Pipeline

pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('model', RandomForestClassifier())
])

pipeline.fit(X_train, y_train)

# Save pipeline (includes scaler!)
joblib.dump(pipeline, 'model.pkl')

# Load and predict (scaler included!)
loaded_pipeline = joblib.load('model.pkl')
prediction = loaded_pipeline.predict(X_new)  # Correct scaling!

Issue #5: Model Version Confusion

Problem:

python

# Which model is in production?
joblib.dump(model, 'model.pkl')  # Always same name!

Solution:

python

# Version your models
from datetime import datetime

version = datetime.now().strftime("%Y%m%d_%H%M")
model_path = f'models/model_{version}.pkl'

joblib.dump(pipeline, model_path)

# Log to MLflow
mlflow.log_artifact(model_path)
mlflow.set_tag("version", version)

# In production, use symlink
import os
os.symlink(model_path, 'models/model_production.pkl')

Step 6: Testing Production Models

Don't skip this!

Unit Tests

python

# tests/test_model.py

import pytest
import numpy as np
import joblib

@pytest.fixture
def model():
    return joblib.load('models/model.pkl')

def test_model_loads():
    """Test model can be loaded"""
    model = joblib.load('models/model.pkl')
    assert model is not None

def test_model_predicts():
    """Test model makes predictions"""
    model = joblib.load('models/model.pkl')
    
    X_test = np.array([[0.5, 1.2, -0.3, 0.8, -1.1]])
    prediction = model.predict(X_test)
    
    assert prediction.shape == (1,)
    assert prediction[0] in [0, 1]  # Binary classification

def test_model_probability():
    """Test model returns probabilities"""
    model = joblib.load('models/model.pkl')
    
    X_test = np.array([[0.5, 1.2, -0.3, 0.8, -1.1]])
    proba = model.predict_proba(X_test)
    
    assert proba.shape == (1, 2)
    assert np.isclose(proba.sum(), 1.0)
    assert np.all(proba >= 0) and np.all(proba <= 1)

def test_model_handles_edge_cases():
    """Test model with edge case inputs"""
    model = joblib.load('models/model.pkl')
    
    # All zeros
    X_zeros = np.zeros((1, 5))
    pred_zeros = model.predict(X_zeros)
    assert pred_zeros.shape == (1,)
    
    # Large values
    X_large = np.ones((1, 5)) * 100
    pred_large = model.predict(X_large)
    assert pred_large.shape == (1,)

def test_model_batch_prediction():
    """Test model handles batch predictions"""
    model = joblib.load('models/model.pkl')
    
    X_batch = np.random.randn(100, 5)
    predictions = model.predict(X_batch)
    
    assert predictions.shape == (100,)

Integration Tests

python

# tests/test_api.py

from fastapi.testclient import TestClient
from production_api import app

client = TestClient(app)

def test_health_check():
    """Test health endpoint"""
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"

def test_predict_valid_input():
    """Test prediction with valid input"""
    response = client.post("/predict", json={
        "feature1": 0.5,
        "feature2": 1.2,
        "feature3": -0.3,
        "feature4": 0.8,
        "feature5": -1.1
    })
    
    assert response.status_code == 200
    data = response.json()
    
    assert "prediction" in data
    assert "probability" in data
    assert "model_version" in data
    assert data["prediction"] in [0, 1]
    assert 0 <= data["probability"] <= 1

def test_predict_invalid_input():
    """Test prediction with invalid input"""
    # Missing features
    response = client.post("/predict", json={
        "feature1": 0.5
    })
    assert response.status_code == 422
    
    # Out of range value
    response = client.post("/predict", json={
        "feature1": 1000,  # Out of range
        "feature2": 1.2,
        "feature3": -0.3,
        "feature4": 0.8,
        "feature5": -1.1
    })
    assert response.status_code == 400

def test_metrics_endpoint():
    """Test Prometheus metrics"""
    response = client.get("/metrics")
    assert response.status_code == 200
    assert "ml_predictions_total" in response.text

Load Testing

python

# load_test.py

import requests
import time
import concurrent.futures
import numpy as np

def make_prediction():
    """Make a single prediction"""
    data = {
        "feature1": float(np.random.randn()),
        "feature2": float(np.random.randn()),
        "feature3": float(np.random.randn()),
        "feature4": float(np.random.randn()),
        "feature5": float(np.random.randn())
    }
    
    start = time.time()
    response = requests.post(
        "http://localhost:8000/predict",
        json=data
    )
    latency = time.time() - start
    
    return {
        "status": response.status_code,
        "latency": latency
    }

def load_test(num_requests=1000, num_workers=10):
    """Run load test"""
    print(f"Running load test: {num_requests} requests, {num_workers} workers")
    
    results = []
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(make_prediction) for _ in range(num_requests)]
        
        for future in concurrent.futures.as_completed(futures):
            results.append(future.result())
    
    # Analyze results
    latencies = [r['latency'] for r in results]
    success_count = sum(1 for r in results if r['status'] == 200)
    
    print(f"\nResults:")
    print(f"  Success rate: {success_count/len(results)*100:.2f}%")
    print(f"  Mean latency: {np.mean(latencies)*1000:.2f}ms")
    print(f"  P50 latency: {np.percentile(latencies, 50)*1000:.2f}ms")
    print(f"  P95 latency: {np.percentile(latencies, 95)*1000:.2f}ms")
    print(f"  P99 latency: {np.percentile(latencies, 99)*1000:.2f}ms")
    print(f"  Max latency: {np.max(latencies)*1000:.2f}ms")

if __name__ == "__main__":
    load_test(num_requests=1000, num_workers=10)

Step 7: Deployment Checklist

Before you go to production:

Model Checklist

  • ✅ Model size < 500MB (preferably < 100MB)

  • ✅ Load time < 5 seconds

  • ✅ Prediction latency < 100ms (P95)

  • ✅ Proper train/test split used

  • ✅ Cross-validation performed

  • ✅ Model versioned and logged

  • ✅ Pipeline includes preprocessing

  • ✅ Compression enabled

API Checklist

  • ✅ Input validation (Pydantic)

  • ✅ Error handling (try/except)

  • ✅ Logging configured

  • ✅ Metrics exposed (Prometheus)

  • ✅ Health check endpoint

  • ✅ Model loaded at startup

  • ✅ No debug mode

  • ✅ Type hints added

Docker Checklist

  • ✅ Multi-stage build

  • ✅ Non-root user

  • ✅ Health check configured

  • ✅ Resource limits set

  • ✅ Image size < 1GB

  • ✅ Security scanning passed

Testing Checklist

  • ✅ Unit tests (model behavior)

  • ✅ Integration tests (API endpoints)

  • ✅ Load testing (handles concurrent requests)

  • ✅ Edge case testing

  • ✅ All tests pass

Monitoring Checklist

  • ✅ Prometheus metrics

  • ✅ Grafana dashboard

  • ✅ Alerts configured

  • ✅ Logging to centralized system

  • ✅ Performance tracking

Common Production Mistakes

Mistake #1: Not Using a Pipeline

Don't:

python

# Train
scaler.fit(X_train)
X_scaled = scaler.transform(X_train)
model.fit(X_scaled, y_train)

# Production (different scaler!)
scaler = StandardScaler()
X_new_scaled = scaler.transform(X_new)  # WRONG!

Do:

python

# Pipeline bundles everything
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('model', RandomForestClassifier())
])
pipeline.fit(X_train, y_train)

# Production (correct!)
prediction = pipeline.predict(X_new)

Mistake #2: Overfitting in Notebooks

Don't:

python

# Training accuracy only
model.fit(X, y)
print(model.score(X, y))  # 99.9% - overfitted!

Do:

python

# Proper validation
X_train, X_test, y_train, y_test = train_test_split(X, y)
model.fit(X_train, y_train)
print(model.score(X_test, y_test))  # 92% - realistic

Mistake #3: No Error Handling

Don't:

python

@app.post("/predict")
def predict(data):
    return model.predict(data)  # Will crash on bad input

Do:

python

@app.post("/predict")
def predict(request: PredictionRequest):
    try:
        return model.predict(request.features)
    except Exception as e:
        logger.error(f"Prediction failed: {e}")
        raise HTTPException(status_code=500)

Mistake #4: Loading Model Every Request

Don't:

python

@app.post("/predict")
def predict(data):
    model = joblib.load('model.pkl')  # Slow!
    return model.predict(data)

Do:

python

model = None

@app.on_event("startup")
def load_model():
    global model
    model = joblib.load('model.pkl')  # Once!

@app.post("/predict")
def predict(data):
    return model.predict(data)  # Fast!

Conclusion

Going from notebook to production isn't just "save the model and deploy."

The complete journey:

  1. Train properly - Validation, pipelines, versioning

  2. Optimize - Model size, speed, efficiency

  3. Build API - Validation, errors, monitoring

  4. Containerize - Docker, health checks, security

  5. Test - Unit, integration, load tests

  6. Monitor - Metrics, logs, alerts

  7. Deploy - With confidence

What you get:

Models that actually work in production
Fast, reliable predictions
Proper error handling
Full observability
Easy rollbacks
Scalable deployment

The gap between notebook and production is real. But it's bridgeable.

Want to see a complete production ML system?

Check out my fraud detection project with the full production setup:

  • GitHub: github.com/Shodexco/fraud-detection-production

Questions? Let's connect:

  • Portfolio: jonathansodeke.framer.website

  • GitHub: github.com/Shodexco

  • LinkedIn: [Your LinkedIn]

Now go deploy your models properly. Production is waiting.

About the Author

Jonathan Sodeke is a Data Engineer and ML Engineer who learned production deployment the hard way (by breaking it in every possible way first). He now builds ML systems that actually work in production, with proper monitoring, testing, and deployment pipelines.

When he's not optimizing model sizes at 2am, he's helping others bridge the notebook-to-production gap.

Portfolio: jonathansodeke.framer.website
GitHub: github.com/Shodexco
LinkedIn: [Your LinkedIn URL]

Sign Up To My Newsletter

Get notified when a new article is posted.

Sign Up To My Newsletter

Get notified when a new article is posted.

Sign Up To My Newsletter

Get notified when a new article is posted.

© Jonathan Sodeke 2025

© Jonathan Sodeke 2025

© Jonathan Sodeke 2025

Create a free website with Framer, the website builder loved by startups, designers and agencies.