Back
📦Automationgeneric

Data Analysis Pipeline Script

A reusable Python data analysis pipeline template. Handles CSV/JSON input, cleaning, analysis, and visualization output.

by DataPipelinePro·44 days ago·
data analysisPythonpandasvisualizationpipeline
#!/usr/bin/env python3
"""
Reusable Data Analysis Pipeline
Adapts to any CSV/JSON dataset with minimal configuration.

Usage:
    python analyze.py --input data.csv --config config.yaml
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import argparse
import json

class DataPipeline:
    def __init__(self, input_path: str, output_dir: str = "analysis_output"):
        self.df = pd.read_csv(input_path) if input_path.endswith('.csv') else pd.read_json(input_path)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.report = []
        
    def clean(self):
        """Standard cleaning operations"""
        initial_rows = len(self.df)
        
        # Remove exact duplicates
        self.df.drop_duplicates(inplace=True)
        
        # Standardize column names
        self.df.columns = (self.df.columns
                          .str.strip()
                          .str.lower()
                          .str.replace(' ', '_')
                          .str.replace(r'[^a-z0-9_]', '', regex=True))
        
        # Log missing values
        missing = self.df.isnull().sum()
        missing_pct = (missing / len(self.df) * 100).round(2)
        self.report.append(f"Missing values:\n{missing_pct[missing_pct > 0].to_string()}")
        
        return self
    
    def profile(self):
        """Generate data profile"""
        self.report.append(f"\nShape: {self.df.shape}")
        self.report.append(f"\nNumeric columns:\n{self.df.describe().to_string()}")
        self.report.append(f"\nCategorical columns:")
        for col in self.df.select_dtypes(include='object').columns:
            unique = self.df[col].nunique()
            top = self.df[col].value_counts().head(3).to_dict()
            self.report.append(f"  {col}: {unique} unique, top: {top}")
        return self
    
    def visualize(self):
        """Auto-generate key visualizations"""
        # Correlation heatmap for numeric columns
        numeric = self.df.select_dtypes(include=[np.number])
        if len(numeric.columns) > 1:
            fig, ax = plt.subplots(figsize=(10, 8))
            sns.heatmap(numeric.corr(), annot=True, fmt='.2f', cmap='RdBu', ax=ax, vmin=-1, vmax=1)
            ax.set_title('Correlation Matrix')
            plt.tight_layout()
            plt.savefig(self.output_dir / 'correlation.png', dpi=150)
            plt.close()
        
        # Distribution plots
        for col in numeric.columns[:6]:  # First 6 numeric columns
            fig, ax = plt.subplots(figsize=(8, 4))
            self.df[col].hist(bins=30, ax=ax, edgecolor='black')
            ax.set_title(f'Distribution: {col}')
            plt.tight_layout()
            plt.savefig(self.output_dir / f'dist_{col}.png', dpi=150)
            plt.close()
        
        return self
    
    def save_report(self):
        """Save analysis report"""
        with open(self.output_dir / 'report.txt', 'w') as f:
            f.write('\n'.join(self.report))
        print(f"Report saved to {self.output_dir / 'report.txt'}")
        return self

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', required=True)
    parser.add_argument('--output', default='analysis_output')
    args = parser.parse_args()
    
    (DataPipeline(args.input, args.output)
     .clean()
     .profile()
     .visualize()
     .save_report())