Production Deployment
Scale validation pipelines from development to production.
Production Architecture
Raw Data Ingestion
↓
Profile & Assess
↓
Validate (Syntax → DNS → SMTP)
↓
Separate Valid/Invalid
↓
Monitor & Alert
↓
Archive & Compliance
Step 1: Staging vs. Production Setup
Development (Ad-hoc)
-- Quick validation on a sample
SELECT
customer_id,
email,
anofox_email_validate(email, 'dns') as valid
FROM customers
LIMIT 1000;
Production (Scheduled, Monitored, Logged)
-- Production: Store results, log failures, trigger alerts
CREATE TABLE IF NOT EXISTS customer_validation_log (
validation_id INTEGER PRIMARY KEY,
batch_id VARCHAR,
customer_id INTEGER,
email VARCHAR,
email_valid BOOLEAN,
validated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
validation_duration_ms INTEGER
);
-- Run validation with logging
INSERT INTO customer_validation_log (batch_id, customer_id, email, email_valid, validation_duration_ms)
SELECT
'BATCH_' || DATE_FORMAT(CURRENT_DATE, '%Y%m%d') as batch_id,
customer_id,
email,
anofox_email_validate(email, 'dns') as email_valid,
0 -- Duration would be tracked by application
FROM customers
WHERE validated_at IS NULL OR validated_at < CURRENT_DATE - INTERVAL '30 days';
Step 2: Batch Processing
Scheduled Daily Validation
-- Create a scheduled task (via cron or workflow scheduler)
CREATE OR REPLACE PROCEDURE daily_customer_validation()
LANGUAGE SQL
AS $$
BEGIN
-- Validate customers modified in last 24 hours
INSERT INTO customer_validation_results
SELECT
customer_id,
email,
phone,
vat_id,
amount,
anofox_email_validate(email, 'dns') as email_valid,
anofox_phone_format(phone, 'US') as phone_valid,
anofox_vat_is_valid(vat_id) as vat_valid,
anofox_money_is_positive(amount) as amount_valid,
CURRENT_TIMESTAMP as validated_at
FROM customers
WHERE modified_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND validated_at IS NULL;
END;
$$;
-- Run daily at 2 AM UTC
-- SCHEDULE: 0 2 * * * daily_customer_validation()
Incremental Validation (New Records Only)
-- Track which records have been validated
CREATE TABLE IF NOT EXISTS validation_checkpoint (
table_name VARCHAR,
last_validated_id INTEGER,
last_validated_at TIMESTAMP
);
-- Only validate new records since last checkpoint
CREATE OR REPLACE PROCEDURE incremental_validation()
LANGUAGE SQL
AS $$
DECLARE
last_id INTEGER;
BEGIN
SELECT COALESCE(last_validated_id, 0) INTO last_id
FROM validation_checkpoint
WHERE table_name = 'customers';
INSERT INTO customer_validation_results
SELECT
customer_id,
email,
anofox_email_validate(email, 'dns') as email_valid,
CURRENT_TIMESTAMP as validated_at
FROM customers
WHERE customer_id > last_id;
UPDATE validation_checkpoint
SET
last_validated_id = (SELECT MAX(customer_id) FROM customers),
last_validated_at = CURRENT_TIMESTAMP
WHERE table_name = 'customers';
END;
$$;
Step 3: Error Handling & Recovery
Graceful Degradation
-- If validation fails (network error), log and continue
CREATE OR REPLACE PROCEDURE validate_with_retry()
LANGUAGE SQL
AS $$
BEGIN
TRY
SELECT anofox_email_validate(email, 'dns') FROM customers LIMIT 1;
CATCH
-- Network error or timeout
INSERT INTO validation_errors (error_message, error_time)
VALUES ('DNS validation failed', CURRENT_TIMESTAMP);
-- Fall back to regex-only validation
INSERT INTO customer_validation_results
SELECT
customer_id,
email,
anofox_email_validate(email, 'regex') as email_valid, -- Fallback
CURRENT_TIMESTAMP
FROM customers
WHERE email NOT IN (SELECT email FROM customer_validation_results);
END;
END;
$$;
Idempotent Operations
-- Validation should be safe to re-run
CREATE OR REPLACE PROCEDURE validate_idempotent()
LANGUAGE SQL
AS $$
BEGIN
DELETE FROM customer_validation_results
WHERE customer_id IN (
SELECT customer_id FROM customers
WHERE modified_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
);
-- Re-validate
INSERT INTO customer_validation_results
SELECT
customer_id,
email,
anofox_email_validate(email, 'dns') as email_valid,
CURRENT_TIMESTAMP
FROM customers
WHERE modified_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour';
END;
$$;
Step 4: Monitoring & Alerting
Validation Metrics Dashboard
-- Real-time validation metrics
CREATE VIEW validation_metrics AS
SELECT
DATE_TRUNC('hour', validated_at) as hour,
COUNT(*) as records_validated,
COUNT(*) FILTER (WHERE email_valid) as emails_valid,
COUNT(*) FILTER (WHERE NOT email_valid) as emails_invalid,
ROUND(100.0 * COUNT(*) FILTER (WHERE email_valid) / COUNT(*), 1) as email_valid_rate_pct,
COUNT(*) FILTER (WHERE phone_valid) as phones_valid,
COUNT(*) FILTER (WHERE NOT phone_valid) as phones_invalid,
ROUND(100.0 * COUNT(*) FILTER (WHERE phone_valid) / COUNT(*), 1) as phone_valid_rate_pct,
AVG(validation_duration_ms) as avg_duration_ms
FROM customer_validation_results
GROUP BY 1
ORDER BY 1 DESC;
SELECT * FROM validation_metrics LIMIT 24; -- Last 24 hours
Alerts
-- Alert if validation pass rate drops
CREATE VIEW validation_alerts AS
SELECT
CASE
WHEN (SELECT ROUND(100.0 * COUNT(*) FILTER (WHERE email_valid) / COUNT(*), 1)
FROM customer_validation_results
WHERE validated_at > CURRENT_TIMESTAMP - INTERVAL '1 hour') < 80
THEN 'ALERT: Email validation pass rate < 80%'
ELSE 'OK'
END as email_alert,
CASE
WHEN (SELECT COUNT(*) FROM validation_errors WHERE error_time > CURRENT_TIMESTAMP - INTERVAL '1 hour') > 10
THEN 'ALERT: More than 10 validation errors in last hour'
ELSE 'OK'
END as error_alert;
Step 5: Integration with ETL/ELT Pipelines
Pre-Ingestion Validation
-- Validate before data enters main warehouse
CREATE OR REPLACE PROCEDURE ingest_customers_validated()
LANGUAGE SQL
AS $$
BEGIN
-- Stage 1: Load to staging table
CREATE TEMP TABLE staging_customers AS
SELECT * FROM external_customer_feed;
-- Stage 2: Validate
ALTER TABLE staging_customers ADD COLUMN email_valid BOOLEAN;
ALTER TABLE staging_customers ADD COLUMN vat_valid BOOLEAN;
UPDATE staging_customers
SET
email_valid = anofox_email_validate(email, 'dns'),
vat_valid = anofox_vat_is_valid(vat_id);
-- Stage 3: Load valid records
INSERT INTO customers
SELECT * FROM staging_customers WHERE email_valid AND vat_valid;
-- Stage 4: Archive invalid records
INSERT INTO customers_rejected
SELECT * FROM staging_customers WHERE NOT email_valid OR NOT vat_valid;
END;
$$;
Post-Ingestion Enrichment
-- Add validation results as enrichment
CREATE OR REPLACE PROCEDURE enrich_customers_with_validation()
LANGUAGE SQL
AS $$
BEGIN
UPDATE customers c
SET
email_verified = v.email_valid,
phone_verified = v.phone_valid,
vat_verified = v.vat_valid,
verification_date = v.validated_at
FROM customer_validation_results v
WHERE c.customer_id = v.customer_id;
END;
$$;
Step 6: Performance Optimization
Parallel Validation
-- Use parallel processing for large datasets
CREATE OR REPLACE PROCEDURE validate_parallel()
LANGUAGE SQL
AS $$
BEGIN
-- Split customers into 4 parallel batches
INSERT INTO customer_validation_results
SELECT
customer_id,
email,
anofox_email_validate(email, 'regex') as email_valid,
CURRENT_TIMESTAMP
FROM customers
WHERE MOD(customer_id, 4) = 0 -- Batch 1
UNION ALL
SELECT * FROM customers WHERE MOD(customer_id, 4) = 1 -- Batch 2
UNION ALL
SELECT * FROM customers WHERE MOD(customer_id, 4) = 2 -- Batch 3
UNION ALL
SELECT * FROM customers WHERE MOD(customer_id, 4) = 3; -- Batch 4
END;
$$;
Caching Validation Results
-- Reuse validation results within TTL
CREATE TABLE validation_cache (
email VARCHAR PRIMARY KEY,
email_valid BOOLEAN,
cached_at TIMESTAMP,
ttl_hours INTEGER DEFAULT 24
);
-- Check cache first, validate only on miss
CREATE OR REPLACE FUNCTION get_email_validation(email_addr VARCHAR)
RETURNS BOOLEAN AS $$
BEGIN
-- Check cache
SELECT email_valid INTO result FROM validation_cache
WHERE email = email_addr
AND cached_at > CURRENT_TIMESTAMP - INTERVAL '1 hour' * ttl_hours;
IF result IS NOT NULL THEN
RETURN result; -- Cached result
ELSE
-- Cache miss: validate and store
result := anofox_email_validate(email_addr, 'dns');
INSERT INTO validation_cache (email, email_valid, cached_at)
VALUES (email_addr, result, CURRENT_TIMESTAMP)
ON CONFLICT (email) DO UPDATE SET
email_valid = result,
cached_at = CURRENT_TIMESTAMP;
RETURN result;
END IF;
END;
$$ LANGUAGE SQL;
Step 7: Compliance & Audit Trail
Immutable Validation Log
-- Append-only log for audit compliance
CREATE TABLE IF NOT EXISTS validation_audit_log (
log_id SERIAL PRIMARY KEY,
batch_id VARCHAR,
customer_id INTEGER,
field_name VARCHAR,
validation_method VARCHAR,
validation_result BOOLEAN,
validated_by VARCHAR,
validated_at TIMESTAMP,
CONSTRAINT audit_immutable UNIQUE (log_id)
);
-- Log all validations
INSERT INTO validation_audit_log
(batch_id, customer_id, field_name, validation_method, validation_result, validated_by, validated_at)
VALUES
('BATCH_20250114', 1, 'email', 'dns', true, 'system', CURRENT_TIMESTAMP);
Compliance Report
-- Monthly compliance certification
CREATE VIEW compliance_certification AS
SELECT
DATE_TRUNC('month', validated_at) as period,
COUNT(*) as records_validated,
COUNT(*) FILTER (WHERE validation_result = TRUE) as records_passed,
ROUND(100.0 * COUNT(*) FILTER (WHERE validation_result = TRUE) / COUNT(*), 2) as pass_rate_pct,
COUNT(DISTINCT batch_id) as batches_processed,
COUNT(DISTINCT validated_by) as operators
FROM validation_audit_log
GROUP BY 1
ORDER BY 1 DESC;
Step 8: CI/CD Integration
Validation in Deployment Pipeline
# Example CI/CD pipeline (GitHub Actions / GitLab CI)
name: Validate Data Quality
on: [pull_request]
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Load Tabular Extension
run: duckdb -c "INSTALL anofox_tabular; LOAD anofox_tabular;"
- name: Run Validation Tests
run: |
duckdb -c "
SELECT
CASE
WHEN (SELECT COUNT(*) FROM validation_test_data WHERE anofox_email_validate(email, 'regex') = FALSE) > 0
THEN FAIL('Email validation failed')
ELSE PASS('Email validation OK')
END;
"
- name: Report Results
if: always()
run: |
duckdb -c "SELECT * FROM validation_metrics LIMIT 1;" > validation_results.txt
cat validation_results.txt
Deployment Checklist
- Validation logic tested on sample data
- Monitoring dashboard created and configured
- Alerts configured (error rate, pass rate thresholds)
- Error handling and fallback strategies implemented
- Audit logging enabled
- Performance tested (expected duration, throughput)
- Capacity planning done (data volume growth)
- Documentation updated
- Team trained on monitoring and troubleshooting
- Runbook created for common issues
- Scheduled tasks configured (cron, workflow scheduler)
- Backup and recovery procedures tested
- Compliance requirements documented
Troubleshooting in Production
Issue: Slow Validation
-- Diagnose where time is spent
SELECT
CASE
WHEN validation_method = 'regex' THEN '< 1ms (syntax)'
WHEN validation_method = 'dns' THEN '~100ms (domain check)'
WHEN validation_method = 'smtp' THEN '~500ms (mailbox check)'
END as method,
COUNT(*) as count,
AVG(validation_duration_ms) as avg_duration_ms
FROM validation_audit_log
GROUP BY 1
ORDER BY avg_duration_ms DESC;
-- Solution: Use faster method (regex instead of DNS, sample SMTP instead of checking all)
Issue: High Error Rate
-- Investigate what's failing
SELECT
error_message,
COUNT(*) as error_count,
MAX(error_time) as latest_error
FROM validation_errors
WHERE error_time > CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY 1
ORDER BY error_count DESC;
-- Check system health
SELECT
pg_sleep(1), -- Simple connectivity test
COUNT(*) as ok_count
FROM customers LIMIT 1;
Issue: Missing Records
-- Find unvalidated records
SELECT COUNT(*) as unvalidated
FROM customers
WHERE validated_at IS NULL;
-- Re-run validation for missed records
CALL incremental_validation();
Next Steps
- Validation Strategies — Optimize your approach
- Data Profiling — Monitor data health
- Basic Workflow — Review fundamentals