Tasuke HubLearn · Solve · Grow
#Python

Pythonデータ分析完全実践ガイド|pandas・NumPy・Matplotlib実装からPolars最適化・統計分析まで、実務レベル技術を徹底解説【2025年最新】

Pythonデータ分析の実務実装を完全解説。pandas・NumPy・Matplotlib基礎から、Polarsによる高速化、統計分析・機械学習実装、可視化テクニック、パフォーマンス最適化まで実践的技術を網羅。

時計のアイコン9 August, 2025

Pythonデータ分析完全実践ガイド|pandas・NumPy・Matplotlib実装からPolars最適化・統計分析まで、実務レベル技術を徹底解説【2025年最新】

「Pythonでのデータ分析を基礎から実務レベルまで体系的にマスターしたい」 「pandas・NumPyの効率的な使い方から最新のPolarsまで実装技術を学びたい」 「統計分析・可視化・機械学習の実践的コードを習得したい」

Pythonデータ分析は、データサイエンス分野において最も重要な基盤技術であり、2025年現在ではpandas 2.0の大幅な性能向上Polarsの台頭による超高速データ処理Jupyter環境の進化など、ツールとライブラリが急速に進歩しています。実務では単なるライブラリの使い方ではなく、大量データの効率的な処理メモリ最適化可読性の高いコード実装が求められます。

特に注目すべきは、Polarsライブラリの急速な普及です。従来のpandasと比較して10-100倍の処理速度向上を実現し、大規模データセットでの分析において圧倒的な優位性を示しています。また、pandas 2.0ではApache Arrowバックエンドの導入により、メモリ効率と処理速度が大幅に改善され、実務での使い勝手が劇的に向上しました。

本記事では、実際のビジネスデータを想定した実装例を通じて、Pythonデータ分析の実務技術を完全習得できる内容を提供します。

TH

Tasuke Hub管理人

東証プライム市場上場企業エンジニア

情報系修士卒業後、大手IT企業にてフルスタックエンジニアとして活躍。 Webアプリケーション開発からクラウドインフラ構築まで幅広い技術に精通し、 複数のプロジェクトでリードエンジニアを担当。 技術ブログやオープンソースへの貢献を通じて、日本のIT技術コミュニティに積極的に関わっている。

🎓情報系修士🏢東証プライム上場企業💻フルスタックエンジニア📝技術ブログ執筆者

1. Python環境構築|2025年最新のベストプラクティス

1.1 開発環境の選択と最適化

Jupyter Lab vs VSCode vs Google Colab の実用比較

2025年現在、データ分析における開発環境は用途に応じて使い分けることが重要です:

# 環境構築の実装例
import sys
import platform
import pandas as pd
import numpy as np
import polars as pl
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Union, List, Dict, Optional
import warnings
warnings.filterwarnings('ignore')

def setup_analysis_environment() -> Dict[str, str]:
    """
    データ分析環境の情報を取得し、最適な設定を適用する
    
    Returns:
        Dict[str, str]: 環境情報辞書
    """
    env_info = {
        'python_version': platform.python_version(),
        'system': platform.system(),
        'pandas_version': pd.__version__,
        'numpy_version': np.__version__,
        'polars_version': pl.__version__ if 'pl' in locals() else 'Not installed'
    }
    
    # matplotlib の日本語フォント設定
    plt.rcParams['font.family'] = ['DejaVu Sans', 'Hiragino Sans', 'Yu Gothic', 'Meiryo']
    plt.rcParams['axes.unicode_minus'] = False
    
    # pandas の表示設定最適化
    pd.set_option('display.max_columns', 20)
    pd.set_option('display.max_rows', 100)
    pd.set_option('display.width', None)
    pd.set_option('display.max_colwidth', 50)
    
    return env_info

# 環境情報の確認
env = setup_analysis_environment()
print("データ分析環境構築完了:")
for key, value in env.items():
    print(f"  {key}: {value}")

推奨パッケージ管理(conda vs pip)

# conda環境の作成(推奨)
conda create -n data_analysis_2025 python=3.11
conda activate data_analysis_2025

# 必須ライブラリの一括インストール
conda install pandas=2.1.* numpy=1.24.* matplotlib=3.7.* seaborn=0.12.*
conda install scikit-learn=1.3.* jupyter=1.0.* jupyterlab=4.0.*
pip install polars[all]==0.20.*  # Polarsは現在pipが推奨

# パフォーマンス向上ライブラリ
conda install numba=0.58.* bottleneck=1.3.*

1.2 実務レベルのプロジェクト構造

data_analysis_project/
├── notebooks/           # Jupyter notebooks
│   ├── 01_data_exploration.ipynb
│   ├── 02_data_cleaning.ipynb
│   └── 03_analysis_modeling.ipynb
├── src/                # Python modules
│   ├── __init__.py
│   ├── data_loader.py
│   ├── preprocessor.py
│   └── analyzer.py
├── data/
│   ├── raw/            # 生データ
│   ├── processed/      # 前処理済みデータ
│   └── external/       # 外部データ
├── outputs/
│   ├── figures/        # 図表出力
│   └── reports/        # 分析レポート
├── tests/              # テストコード
├── requirements.txt    # 依存関係
└── README.md          # プロジェクト説明
ベストマッチ

最短で課題解決する一冊

この記事の内容と高い親和性が確認できたベストマッチです。早めにチェックしておきましょう。

2. pandas実装技術|効率的なデータ操作と処理最適化

2.1 データ読み込みと基本操作

import pandas as pd
import numpy as np
from pathlib import Path
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DataLoader:
    """プロダクション環境対応のデータローダー"""
    
    def __init__(self, chunk_size: int = 50000, memory_threshold: float = 0.5):
        self.chunk_size = chunk_size
        self.memory_threshold = memory_threshold
    
    def load_csv_optimized(self, file_path: str, **kwargs) -> pd.DataFrame:
        """メモリ効率を考慮したCSV読み込み"""
        if not Path(file_path).exists():
            raise FileNotFoundError(f"File not found: {file_path}")
        
        file_size_gb = Path(file_path).stat().st_size / (1024**3)
        logger.info(f"Loading file: {file_path} ({file_size_gb:.2f} GB)")
        
        if file_size_gb > self.memory_threshold:
            return self._load_large_file(file_path, **kwargs)
        else:
            df = pd.read_csv(file_path, **kwargs)
            return self._optimize_dtypes(df)
    
    def _load_large_file(self, file_path: str, **kwargs) -> pd.DataFrame:
        """大きなファイルのチャンク読み込み"""
        chunks = []
        total_rows = 0
        
        try:
            for i, chunk in enumerate(pd.read_csv(file_path, chunksize=self.chunk_size, **kwargs)):
                chunk = self._optimize_dtypes(chunk)
                chunks.append(chunk)
                total_rows += len(chunk)
                
                if i % 10 == 0:
                    logger.info(f"Processed {i+1} chunks, {total_rows:,} rows")
            
            logger.info(f"Concatenating {len(chunks)} chunks...")
            return pd.concat(chunks, ignore_index=True)
            
        except Exception as e:
            logger.error(f"Error during chunk processing: {e}")
            raise
    
    def _optimize_dtypes(self, df: pd.DataFrame) -> pd.DataFrame:
        """データ型の自動最適化"""
        original_size = df.memory_usage(deep=True).sum()
        
        # 整数型の最適化
        int_cols = df.select_dtypes(include=['int64']).columns
        for col in int_cols:
            col_min, col_max = df[col].min(), df[col].max()
            
            if col_min >= 0:
                if col_max <= 255:
                    df[col] = df[col].astype('uint8')
                elif col_max <= 65535:
                    df[col] = df[col].astype('uint16')
                elif col_max <= 4294967295:
                    df[col] = df[col].astype('uint32')
            else:
                if col_min >= -128 and col_max <= 127:
                    df[col] = df[col].astype('int8')
                elif col_min >= -32768 and col_max <= 32767:
                    df[col] = df[col].astype('int16')
                elif col_min >= -2147483648 and col_max <= 2147483647:
                    df[col] = df[col].astype('int32')
        
        # 浮動小数点型の最適化
        float_cols = df.select_dtypes(include=['float64']).columns
        for col in float_cols:
            df[col] = pd.to_numeric(df[col], downcast='float')
        
        # カテゴリカル変数の最適化
        object_cols = df.select_dtypes(include=['object']).columns
        for col in object_cols:
            if df[col].dtype == 'object':
                unique_ratio = df[col].nunique() / len(df)
                if unique_ratio < 0.5:
                    df[col] = df[col].astype('category')
        
        optimized_size = df.memory_usage(deep=True).sum()
        reduction_pct = (1 - optimized_size / original_size) * 100
        
        logger.info(f"Memory optimization: {reduction_pct:.1f}% reduction "
                   f"({original_size/1024/1024:.1f}MB → {optimized_size/1024/1024:.1f}MB)")
        
        return df

# 実際のデータでテスト
np.random.seed(42)
sample_size = 1_000_000

# リアルなECデータの生成
sample_data = pd.DataFrame({
    'order_id': range(1, sample_size + 1),
    'customer_id': np.random.randint(1, 50000, sample_size),
    'product_id': np.random.randint(1, 5000, sample_size),
    'order_date': pd.date_range('2023-01-01', periods=sample_size, freq='30S'),
    'quantity': np.random.poisson(2, sample_size) + 1,
    'unit_price': np.random.lognormal(3, 0.5, sample_size).round(2),
    'category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home', 'Sports'], 
                                sample_size, p=[0.3, 0.25, 0.15, 0.2, 0.1]),
    'region': np.random.choice(['US-East', 'US-West', 'EU', 'APAC'], 
                              sample_size, p=[0.35, 0.25, 0.25, 0.15]),
    'discount_rate': np.random.beta(1, 9, sample_size)
})

sample_data['total_amount'] = sample_data['quantity'] * sample_data['unit_price'] * (1 - sample_data['discount_rate'])
sample_data.to_csv('ecommerce_data.csv', index=False)

# データローダーのテスト
loader = DataLoader()
df = loader.load_csv_optimized('ecommerce_data.csv')

print(f"データ形状: {df.shape}")
print(f"メモリ使用量: {df.memory_usage(deep=True).sum() / 1024 / 1024:.1f}MB")
print(f"\nデータ型:")
print(df.dtypes)

2.2 高度なデータ操作・集計技術

from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')

class AdvancedDataProcessor:
    """実務レベルのデータ処理エンジン"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self._validate_dataframe()
    
    def _validate_dataframe(self):
        """データフレームの妥当性チェック"""
        if self.df.empty:
            raise ValueError("DataFrame is empty")
        
        required_cols = ['order_date', 'customer_id', 'total_amount']
        missing_cols = [col for col in required_cols if col not in self.df.columns]
        if missing_cols:
            logger.warning(f"Missing recommended columns: {missing_cols}")
    
    def customer_segmentation_analysis(self) -> pd.DataFrame:
        """RFM分析による顧客セグメンテーション"""
        analysis_date = self.df['order_date'].max()
        
        rfm = (self.df.groupby('customer_id')
               .agg({
                   'order_date': lambda x: (analysis_date - x.max()).days,
                   'order_id': 'count',
                   'total_amount': 'sum'
               })
               .rename(columns={
                   'order_date': 'recency',
                   'order_id': 'frequency', 
                   'total_amount': 'monetary'
               }))
        
        # RFMスコア計算(5点満点)
        rfm['r_score'] = pd.qcut(rfm['recency'].rank(method='first'), 5, labels=range(5, 0, -1))
        rfm['f_score'] = pd.qcut(rfm['frequency'].rank(method='first'), 5, labels=range(1, 6))
        rfm['m_score'] = pd.qcut(rfm['monetary'].rank(method='first'), 5, labels=range(1, 6))
        
        # セグメント分類
        def classify_segment(row):
            r, f, m = int(row['r_score']), int(row['f_score']), int(row['m_score'])
            
            if r >= 4 and f >= 4 and m >= 4:
                return 'Champions'
            elif r >= 4 and f >= 3:
                return 'Loyal Customers'
            elif r >= 4 and f <= 2:
                return 'Potential Loyalists'
            elif r >= 3 and f >= 3 and m >= 3:
                return 'New Customers'
            elif r <= 2 and f >= 3:
                return 'At Risk'
            elif r <= 2 and f <= 2 and m >= 3:
                return 'Cannot Lose Them'
            elif r <= 2 and f <= 2 and m <= 2:
                return 'Hibernating'
            else:
                return 'Others'
        
        rfm['segment'] = rfm.apply(classify_segment, axis=1)
        
        # セグメント統計
        segment_stats = (rfm.groupby('segment')
                        .agg({
                            'recency': ['mean', 'count'],
                            'frequency': 'mean',
                            'monetary': 'mean'
                        })
                        .round(2))
        
        segment_stats.columns = ['avg_recency', 'customer_count', 'avg_frequency', 'avg_monetary']
        segment_stats['segment_value'] = (segment_stats['avg_frequency'] * 
                                        segment_stats['avg_monetary'] / 
                                        segment_stats['avg_recency']).round(2)
        
        return rfm.reset_index(), segment_stats.reset_index()
    
    def cohort_analysis(self) -> pd.DataFrame:
        """コホート分析による顧客リテンション分析"""
        cohort_data = self.df.copy()
        cohort_data['order_date'] = pd.to_datetime(cohort_data['order_date'])
        cohort_data['order_month'] = cohort_data['order_date'].dt.to_period('M')
        
        # 初回購入月の特定
        cohort_data['cohort_month'] = (cohort_data.groupby('customer_id')['order_date']
                                     .transform('min')
                                     .dt.to_period('M'))
        
        # 期間番号の計算
        def get_period_number(df):
            return (df['order_month'] - df['cohort_month']).apply(attrgetter('n'))
        
        from operator import attrgetter
        cohort_data['period_number'] = get_period_number(cohort_data)
        
        # コホートテーブルの作成
        cohort_table = (cohort_data.groupby(['cohort_month', 'period_number'])['customer_id']
                       .nunique()
                       .unstack(level=1)
                       .fillna(0))
        
        # リテンション率の計算
        cohort_sizes = cohort_data.groupby('cohort_month')['customer_id'].nunique()
        retention_table = cohort_table.divide(cohort_sizes, axis=0).round(4)
        
        return retention_table
    
    def advanced_time_series_features(self) -> pd.DataFrame:
        """高度な時系列特徴量生成"""
        ts_df = self.df.copy()
        ts_df['order_date'] = pd.to_datetime(ts_df['order_date'])
        
        # 時間特徴量
        ts_df['year'] = ts_df['order_date'].dt.year
        ts_df['month'] = ts_df['order_date'].dt.month
        ts_df['day'] = ts_df['order_date'].dt.day
        ts_df['weekday'] = ts_df['order_date'].dt.dayofweek
        ts_df['hour'] = ts_df['order_date'].dt.hour
        ts_df['is_weekend'] = (ts_df['weekday'] >= 5).astype(int)
        ts_df['is_month_start'] = ts_df['order_date'].dt.is_month_start.astype(int)
        ts_df['is_month_end'] = ts_df['order_date'].dt.is_month_end.astype(int)
        
        # 季節性特徴量
        ts_df['season'] = ts_df['month'].map({
            12: 'Winter', 1: 'Winter', 2: 'Winter',
            3: 'Spring', 4: 'Spring', 5: 'Spring',
            6: 'Summer', 7: 'Summer', 8: 'Summer',
            9: 'Fall', 10: 'Fall', 11: 'Fall'
        })
        
        # ビジネス特徴量
        def get_business_quarter(month):
            return f'Q{(month-1)//3 + 1}'
        
        ts_df['quarter'] = ts_df['month'].apply(get_business_quarter)
        
        # 時間帯分類
        def classify_time_of_day(hour):
            if 5 <= hour < 12:
                return 'Morning'
            elif 12 <= hour < 17:
                return 'Afternoon'
            elif 17 <= hour < 21:
                return 'Evening'
            else:
                return 'Night'
        
        ts_df['time_of_day'] = ts_df['hour'].apply(classify_time_of_day)
        
        return ts_df
    
    def calculate_business_metrics(self) -> Dict[str, float]:
        """重要なビジネス指標の計算"""
        metrics = {}
        
        # 基本指標
        metrics['total_revenue'] = self.df['total_amount'].sum()
        metrics['total_orders'] = len(self.df)
        metrics['unique_customers'] = self.df['customer_id'].nunique()
        metrics['avg_order_value'] = self.df['total_amount'].mean()
        
        # 顧客あたり指標
        customer_metrics = (self.df.groupby('customer_id')
                          .agg({
                              'total_amount': ['sum', 'count'],
                              'order_date': ['min', 'max']
                          }))
        
        customer_metrics.columns = ['total_spent', 'order_count', 'first_order', 'last_order']
        
        metrics['avg_customer_ltv'] = customer_metrics['total_spent'].mean()
        metrics['avg_orders_per_customer'] = customer_metrics['order_count'].mean()
        
        # リピート率
        repeat_customers = (customer_metrics['order_count'] > 1).sum()
        metrics['repeat_rate'] = repeat_customers / len(customer_metrics)
        
        # 売上上位20%の顧客による売上割合(パレート分析)
        customer_revenues = customer_metrics['total_spent'].sort_values(ascending=False)
        top_20_pct_count = int(len(customer_revenues) * 0.2)
        top_20_pct_revenue = customer_revenues.iloc[:top_20_pct_count].sum()
        metrics['pareto_ratio'] = top_20_pct_revenue / metrics['total_revenue']
        
        # 月次成長率
        monthly_revenue = (self.df.groupby(pd.to_datetime(self.df['order_date']).dt.to_period('M'))
                         ['total_amount'].sum().sort_index())
        
        if len(monthly_revenue) > 1:
            metrics['monthly_growth_rate'] = ((monthly_revenue.iloc[-1] / monthly_revenue.iloc[-2]) - 1) * 100
        else:
            metrics['monthly_growth_rate'] = 0
        
        return metrics

# 実際のデータ処理実行
processor = AdvancedDataProcessor(df)

# 顧客セグメンテーション
rfm_data, segment_stats = processor.customer_segmentation_analysis()
print("顧客セグメンテーション結果:")
print(segment_stats.sort_values('segment_value', ascending=False))

# コホート分析
retention_table = processor.cohort_analysis()
print(f"\nコホート分析完了: {retention_table.shape[0]} 個のコホートを分析")
print("1ヶ月後リテンション率:")
print(retention_table[1].describe())

# 時系列特徴量生成
ts_features = processor.advanced_time_series_features()
print(f"\n時系列特徴量: {len(ts_features.columns)} 個の特徴量を生成")

# ビジネス指標
business_metrics = processor.calculate_business_metrics()
print("\n主要ビジネス指標:")
for metric, value in business_metrics.items():
    if isinstance(value, float):
        print(f"  {metric}: {value:,.2f}")
    else:
        print(f"  {metric}: {value}")

2.3 パフォーマンス最適化技術

import time
import psutil
import gc
from contextlib import contextmanager
from typing import Callable, Any
import multiprocessing as mp
from functools import partial
import concurrent.futures

class ProductionPerformanceOptimizer:
    """本格的パフォーマンス最適化クラス"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df
        self.process = psutil.Process()
        
    @contextmanager
    def performance_monitor(self, operation_name: str):
        """パフォーマンス監視コンテキストマネージャ"""
        start_time = time.time()
        start_memory = self.process.memory_info().rss / 1024 / 1024
        cpu_start = self.process.cpu_percent()
        
        logger.info(f"開始: {operation_name}")
        
        try:
            yield
        finally:
            end_time = time.time()
            end_memory = self.process.memory_info().rss / 1024 / 1024
            cpu_end = self.process.cpu_percent()
            
            execution_time = end_time - start_time
            memory_diff = end_memory - start_memory
            
            logger.info(f"完了: {operation_name}")
            logger.info(f"  実行時間: {execution_time:.3f}秒")
            logger.info(f"  メモリ使用量: {memory_diff:+.1f}MB")
            logger.info(f"  CPU使用率: {cpu_end:.1f}%")
    
    def vectorized_vs_iterative_benchmark(self) -> Dict[str, Dict[str, float]]:
        """ベクトル化 vs 反復処理のベンチマーク"""
        results = {}
        sample_size = min(100000, len(self.df))
        test_df = self.df.head(sample_size).copy()
        
        # 1. 条件付き計算の比較
        with self.performance_monitor("条件付き計算比較"):
            
            # 反復処理(避けるべき方法)
            start = time.time()
            iterative_result = []
            for _, row in test_df.iterrows():
                if row['total_amount'] > test_df['total_amount'].median():
                    iterative_result.append(row['total_amount'] * 0.1)
                else:
                    iterative_result.append(0)
            iterative_time = time.time() - start
            
            # ベクトル化処理(推奨方法)
            start = time.time()
            median_value = test_df['total_amount'].median()
            vectorized_result = np.where(
                test_df['total_amount'] > median_value,
                test_df['total_amount'] * 0.1,
                0
            )
            vectorized_time = time.time() - start
            
            results['conditional_calculation'] = {
                'iterative': iterative_time,
                'vectorized': vectorized_time,
                'speedup': iterative_time / vectorized_time if vectorized_time > 0 else 0
            }
        
        # 2. 集計処理の比較
        with self.performance_monitor("集計処理比較"):
            
            # apply使用(低効率)
            start = time.time()
            apply_result = test_df.groupby('category')['total_amount'].apply(
                lambda x: x.sum() / x.count()
            )
            apply_time = time.time() - start
            
            # 直接的な集計(高効率)
            start = time.time()
            direct_result = test_df.groupby('category')['total_amount'].mean()
            direct_time = time.time() - start
            
            results['aggregation'] = {
                'apply_method': apply_time,
                'direct_method': direct_time,
                'speedup': apply_time / direct_time if direct_time > 0 else 0
            }
        
        return results
    
    def memory_optimization_techniques(self) -> pd.DataFrame:
        """メモリ最適化技術の実装"""
        with self.performance_monitor("メモリ最適化"):
            # 元のメモリ使用量
            original_memory = self.df.memory_usage(deep=True).sum() / 1024 / 1024
            
            # 最適化されたコピーを作成
            optimized_df = self.df.copy()
            
            # 1. カテゴリカル変数の最適化
            categorical_candidates = ['category', 'region']
            for col in categorical_candidates:
                if col in optimized_df.columns:
                    if optimized_df[col].dtype == 'object':
                        unique_ratio = optimized_df[col].nunique() / len(optimized_df)
                        if unique_ratio < 0.1:  # ユニーク値が10%未満
                            optimized_df[col] = optimized_df[col].astype('category')
            
            # 2. 数値型の最適化
            for col in optimized_df.select_dtypes(include=['int64', 'float64']).columns:
                if optimized_df[col].dtype == 'int64':
                    optimized_df[col] = pd.to_numeric(optimized_df[col], downcast='integer')
                elif optimized_df[col].dtype == 'float64':
                    optimized_df[col] = pd.to_numeric(optimized_df[col], downcast='float')
            
            # 3. 不要な列の削除(例:重複情報)
            cols_to_check = ['order_id']  # 一意識別子は分析に不要な場合が多い
            for col in cols_to_check:
                if col in optimized_df.columns:
                    if optimized_df[col].nunique() == len(optimized_df):  # 全て一意の場合
                        optimized_df = optimized_df.drop(columns=[col])
                        logger.info(f"削除された列: {col} (一意値のため分析不要)")
            
            # メモリ削減効果の計算
            optimized_memory = optimized_df.memory_usage(deep=True).sum() / 1024 / 1024
            reduction_pct = (1 - optimized_memory / original_memory) * 100
            
            logger.info(f"メモリ最適化結果:")
            logger.info(f"  元のサイズ: {original_memory:.1f}MB")
            logger.info(f"  最適化後: {optimized_memory:.1f}MB")
            logger.info(f"  削減率: {reduction_pct:.1f}%")
            
        return optimized_df
    
    def parallel_processing_demo(self, n_processes: int = None) -> pd.DataFrame:
        """並列処理による高速化デモ"""
        if n_processes is None:
            n_processes = min(mp.cpu_count() - 1, 4)
        
        def process_customer_group(customer_data: pd.DataFrame) -> Dict:
            """個別顧客グループの処理"""
            customer_id = customer_data['customer_id'].iloc[0]
            
            # 複雑な計算の例
            total_spent = customer_data['total_amount'].sum()
            avg_order = customer_data['total_amount'].mean()
            order_frequency = len(customer_data)
            
            # 時系列特徴量
            if 'order_date' in customer_data.columns:
                customer_data['order_date'] = pd.to_datetime(customer_data['order_date'])
                date_range = (customer_data['order_date'].max() - customer_data['order_date'].min()).days
                
                # 購買パターン分析
                hourly_pattern = customer_data['order_date'].dt.hour.mode().iloc[0] if len(customer_data) > 0 else 12
                weekend_ratio = (customer_data['order_date'].dt.dayofweek >= 5).mean()
            else:
                date_range = 0
                hourly_pattern = 12
                weekend_ratio = 0.0
            
            return {
                'customer_id': customer_id,
                'total_spent': total_spent,
                'avg_order_value': avg_order,
                'order_frequency': order_frequency,
                'customer_lifetime_days': date_range,
                'preferred_hour': hourly_pattern,
                'weekend_shopper_ratio': weekend_ratio,
                'value_score': (total_spent * order_frequency) / max(date_range, 1)
            }
        
        with self.performance_monitor("並列処理実行"):
            # データをグループ化
            customer_groups = [group for _, group in self.df.groupby('customer_id')]
            
            # 並列処理実行
            with concurrent.futures.ProcessPoolExecutor(max_workers=n_processes) as executor:
                results = list(executor.map(process_customer_group, customer_groups))
            
            result_df = pd.DataFrame(results)
            
            logger.info(f"並列処理完了: {len(results)} 顧客を {n_processes} プロセスで処理")
            
        return result_df
    
    def efficient_join_strategies(self, lookup_data: pd.DataFrame) -> pd.DataFrame:
        """効率的な結合戦略"""
        with self.performance_monitor("効率的結合"):
            
            # 1. インデックス設定による高速化
            if 'customer_id' in self.df.columns and 'customer_id' in lookup_data.columns:
                # インデックス設定
                df_indexed = self.df.set_index('customer_id')
                lookup_indexed = lookup_data.set_index('customer_id')
                
                # 結合実行
                joined = df_indexed.join(lookup_indexed, how='inner', rsuffix='_lookup')
                joined = joined.reset_index()
                
                logger.info(f"インデックス結合完了: {joined.shape}")
                
                # 2. 結合後の重複削除と最適化
                if joined.duplicated().any():
                    original_size = len(joined)
                    joined = joined.drop_duplicates()
                    logger.info(f"重複削除: {original_size - len(joined)} 行を削除")
                
                return joined
            else:
                logger.warning("customer_id列が見つかりません")
                return self.df

# 実際の最適化実行
optimizer = ProductionPerformanceOptimizer(df)

# ベンチマーク実行
benchmark_results = optimizer.vectorized_vs_iterative_benchmark()
print("パフォーマンスベンチマーク結果:")
for operation, metrics in benchmark_results.items():
    print(f"\n{operation}:")
    for method, time_val in metrics.items():
        if method != 'speedup':
            print(f"  {method}: {time_val:.4f}秒")
        else:
            print(f"  高速化倍率: {time_val:.1f}倍")

# メモリ最適化
optimized_df = optimizer.memory_optimization_techniques()

# 並列処理(小さなデータセットでテスト)
customer_analysis = optimizer.parallel_processing_demo(n_processes=2)
print(f"\n並列処理結果: {customer_analysis.shape}")
print("上位5顧客の価値スコア:")
print(customer_analysis.nlargest(5, 'value_score')[['customer_id', 'value_score', 'total_spent']])

# メモリクリーンアップ
gc.collect()

さらに理解を深める参考書

関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。

3. Polars実装|企業レベル高速データ処理

3.1 プロダクション環境でのPolars導入戦略

import polars as pl
import time
import psutil
import json
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import boto3
from contextlib import contextmanager

@dataclass
class BenchmarkResult:
    """ベンチマーク結果の構造化データ"""
    operation: str
    pandas_time: float
    polars_time: float
    speedup_factor: float
    memory_reduction_pct: float
    data_size: int

class EnterprisePolarsAnalyzer:
    """エンタープライズレベルのPolars分析エンジン"""
    
    def __init__(self, config_path: Optional[str] = None):
        self.config = self._load_config(config_path)
        self.benchmarks: List[BenchmarkResult] = []
        self.process = psutil.Process()
    
    def _load_config(self, config_path: Optional[str]) -> Dict:
        """設定ファイルの読み込み"""
        default_config = {
            "chunk_size": 100000,
            "max_memory_gb": 8,
            "parallel_workers": min(psutil.cpu_count() - 1, 8),
            "lazy_evaluation": True,
            "streaming": True
        }
        
        if config_path and Path(config_path).exists():
            with open(config_path) as f:
                user_config = json.load(f)
                default_config.update(user_config)
        
        return default_config
    
    @contextmanager
    def resource_monitor(self, operation_name: str):
        """リソース使用量の詳細監視"""
        start_time = time.time()
        start_memory = self.process.memory_info().rss / 1024 / 1024
        start_cpu = psutil.cpu_percent(interval=None)
        
        try:
            yield
        finally:
            end_time = time.time()
            end_memory = self.process.memory_info().rss / 1024 / 1024
            end_cpu = psutil.cpu_percent(interval=None)
            
            execution_time = end_time - start_time
            peak_memory = end_memory
            avg_cpu = (start_cpu + end_cpu) / 2
            
            logger.info(f"{operation_name} 完了:")
            logger.info(f"  実行時間: {execution_time:.3f}秒")
            logger.info(f"  ピークメモリ: {peak_memory:.1f}MB")
            logger.info(f"  平均CPU使用率: {avg_cpu:.1f}%")
    
    def comprehensive_benchmark(self, df_pandas: pd.DataFrame) -> List[BenchmarkResult]:
        """包括的なパフォーマンスベンチマーク"""
        # データをPolarsに変換
        df_polars = pl.from_pandas(df_pandas)
        data_size = len(df_pandas)
        
        benchmarks = []
        
        # 1. フィルタリング + 集計のベンチマーク
        with self.resource_monitor("複合フィルタリング-pandas"):
            start = time.time()
            pandas_result = (df_pandas
                           .query('total_amount > @df_pandas.total_amount.quantile(0.7)')
                           .groupby('category')
                           .agg({
                               'total_amount': ['sum', 'mean', 'count'],
                               'quantity': 'sum',
                               'customer_id': 'nunique'
                           }))
            pandas_time = time.time() - start
        
        with self.resource_monitor("複合フィルタリング-polars"):
            start = time.time()
            quantile_75 = df_polars.select(pl.col('total_amount').quantile(0.7)).item()
            polars_result = (df_polars
                           .filter(pl.col('total_amount') > quantile_75)
                           .groupby('category')
                           .agg([
                               pl.col('total_amount').sum().alias('total_sum'),
                               pl.col('total_amount').mean().alias('total_mean'),
                               pl.col('total_amount').count().alias('total_count'),
                               pl.col('quantity').sum().alias('quantity_sum'),
                               pl.col('customer_id').n_unique().alias('unique_customers')
                           ]))
            polars_time = time.time() - start
        
        benchmarks.append(BenchmarkResult(
            operation="複合フィルタリング・集計",
            pandas_time=pandas_time,
            polars_time=polars_time,
            speedup_factor=pandas_time / polars_time,
            memory_reduction_pct=self._calculate_memory_reduction(df_pandas, df_polars),
            data_size=data_size
        ))
        
        # 2. ウィンドウ関数のベンチマーク
        with self.resource_monitor("ウィンドウ関数-pandas"):
            start = time.time()
            df_pandas_copy = df_pandas.copy()
            df_pandas_copy['running_total'] = (df_pandas_copy
                                             .sort_values(['customer_id', 'order_date'])
                                             .groupby('customer_id')['total_amount']
                                             .cumsum())
            df_pandas_copy['customer_rank'] = (df_pandas_copy
                                             .groupby('customer_id')['total_amount']
                                             .rank(ascending=False))
            pandas_time = time.time() - start
        
        with self.resource_monitor("ウィンドウ関数-polars"):
            start = time.time()
            polars_result = (df_polars
                           .sort(['customer_id', 'order_date'])
                           .with_columns([
                               pl.col('total_amount').cumsum().over('customer_id').alias('running_total'),
                               pl.col('total_amount').rank(descending=True).over('customer_id').alias('customer_rank')
                           ]))
            polars_time = time.time() - start
        
        benchmarks.append(BenchmarkResult(
            operation="ウィンドウ関数",
            pandas_time=pandas_time,
            polars_time=polars_time,
            speedup_factor=pandas_time / polars_time,
            memory_reduction_pct=0,  # 同じデータサイズなので省略
            data_size=data_size
        ))
        
        return benchmarks
    
    def _calculate_memory_reduction(self, df_pandas: pd.DataFrame, df_polars: pl.DataFrame) -> float:
        """メモリ使用量削減率の計算"""
        pandas_memory = df_pandas.memory_usage(deep=True).sum()
        # Polarsメモリ使用量の推定(実際のAPIは異なる場合があります)
        polars_memory = len(df_polars) * len(df_polars.columns) * 8  # 簡易推定
        return (1 - polars_memory / pandas_memory) * 100 if pandas_memory > 0 else 0
    
    def lazy_evaluation_pipeline(self, file_path: str) -> pl.DataFrame:
        """遅延評価を活用した効率的なデータパイプライン"""
        
        # 遅延評価でクエリを構築
        lazy_query = (pl.scan_csv(file_path)  # ファイルスキャン(遅延)
                     .filter(pl.col('total_amount') > 0)
                     .with_columns([
                         # 特徴量エンジニアリング
                         (pl.col('total_amount') / pl.col('quantity')).alias('unit_price'),
                         pl.col('order_date').str.strptime(pl.Date, '%Y-%m-%d').alias('parsed_date'),
                         (pl.col('total_amount') > pl.col('total_amount').quantile(0.8)).alias('high_value_order')
                     ])
                     .with_columns([
                         pl.col('parsed_date').dt.weekday().alias('weekday'),
                         pl.col('parsed_date').dt.month().alias('month'),
                         pl.col('parsed_date').dt.quarter().alias('quarter')
                     ])
                     .groupby(['category', 'region', 'quarter'])
                     .agg([
                         pl.col('total_amount').sum().alias('total_revenue'),
                         pl.col('total_amount').mean().alias('avg_order_value'),
                         pl.col('customer_id').n_unique().alias('unique_customers'),
                         pl.col('high_value_order').sum().alias('high_value_count'),
                         pl.col('unit_price').std().alias('price_volatility')
                     ])
                     .sort(['total_revenue'], descending=True))
        
        # この時点まで実際のデータ処理は行われない
        logger.info("遅延評価クエリ構築完了 - まだデータは処理されていません")
        
        # collect()で実際の計算を実行
        with self.resource_monitor("遅延評価実行"):
            result = lazy_query.collect()
        
        return result
    
    def streaming_aggregation(self, file_path: str, chunk_size: int = 100000) -> pl.DataFrame:
        """ストリーミング処理による大規模データ集計"""
        aggregated_results = []
        
        # チャンクごとに処理
        for chunk_df in pl.read_csv_batched(file_path, batch_size=chunk_size):
            chunk_agg = (chunk_df
                        .groupby('category')
                        .agg([
                            pl.col('total_amount').sum().alias('chunk_sum'),
                            pl.col('total_amount').count().alias('chunk_count'),
                            pl.col('customer_id').n_unique().alias('chunk_unique_customers')
                        ]))
            aggregated_results.append(chunk_agg)
        
        # チャンク結果を最終集計
        if aggregated_results:
            final_result = (pl.concat(aggregated_results)
                          .groupby('category')
                          .agg([
                              pl.col('chunk_sum').sum().alias('total_amount'),
                              pl.col('chunk_count').sum().alias('total_orders'),
                              pl.col('chunk_unique_customers').sum().alias('total_unique_customers')  # 重複あり
                          ]))
            return final_result
        
        return pl.DataFrame()
    
    def export_benchmark_report(self, output_path: str):
        """ベンチマーク結果のレポート出力"""
        if not self.benchmarks:
            logger.warning("ベンチマークデータがありません")
            return
        
        report = {
            "benchmark_summary": {
                "total_operations": len(self.benchmarks),
                "avg_speedup": sum(b.speedup_factor for b in self.benchmarks) / len(self.benchmarks),
                "max_speedup": max(b.speedup_factor for b in self.benchmarks),
                "data_sizes_tested": list(set(b.data_size for b in self.benchmarks))
            },
            "detailed_results": [
                {
                    "operation": b.operation,
                    "pandas_time_sec": b.pandas_time,
                    "polars_time_sec": b.polars_time,
                    "speedup_factor": b.speedup_factor,
                    "memory_reduction_pct": b.memory_reduction_pct,
                    "data_size": b.data_size
                }
                for b in self.benchmarks
            ]
        }
        
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
        
        logger.info(f"ベンチマークレポートを出力: {output_path}")

# 実際の企業レベル使用例
analyzer = EnterprisePolarsAnalyzer()

# パフォーマンステストの実行
if 'df' in locals():
    benchmarks = analyzer.comprehensive_benchmark(df)
    analyzer.benchmarks.extend(benchmarks)
    
    print("Polars vs pandas エンタープライズベンチマーク結果:")
    print("=" * 60)
    
    for benchmark in benchmarks:
        print(f"\n操作: {benchmark.operation}")
        print(f"  データサイズ: {benchmark.data_size:,} 行")
        print(f"  pandas: {benchmark.pandas_time:.4f}秒")
        print(f"  Polars: {benchmark.polars_time:.4f}秒")
        print(f"  高速化倍率: {benchmark.speedup_factor:.1f}倍")
        if benchmark.memory_reduction_pct > 0:
            print(f"  メモリ削減: {benchmark.memory_reduction_pct:.1f}%")

# サンプルCSVで遅延評価テスト
if Path('ecommerce_data.csv').exists():
    lazy_result = analyzer.lazy_evaluation_pipeline('ecommerce_data.csv')
    print(f"\n遅延評価パイプライン結果: {lazy_result.shape}")
    print("上位5カテゴリ・地域の売上:")
    print(lazy_result.head())

# ベンチマークレポートの出力
analyzer.export_benchmark_report('polars_benchmark_report.json')

3.2 Polars実践的実装パターン

from typing import Protocol
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
from dataclasses import dataclass
from enum import Enum

class DataQualityLevel(Enum):
    """データ品質レベル"""
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"

@dataclass
class QueryPlan:
    """クエリ実行プランの定義"""
    query_id: str
    estimated_memory_mb: float
    estimated_time_sec: float
    parallelizable: bool
    dependencies: List[str]

class ProductionPolarsEngine:
    """プロダクション対応Polarsエンジン"""
    
    def __init__(self, max_memory_gb: float = 4.0, enable_caching: bool = True):
        self.max_memory_gb = max_memory_gb
        self.enable_caching = enable_caching
        self.query_cache: Dict[str, pl.DataFrame] = {}
        self.execution_stats: Dict[str, Dict] = {}
    
    def _generate_query_hash(self, query_str: str) -> str:
        """クエリのハッシュ値生成(キャッシュキーとして使用)"""
        return hashlib.md5(query_str.encode()).hexdigest()[:12]
    
    def _estimate_memory_usage(self, df: pl.DataFrame, operation: str) -> float:
        """メモリ使用量の推定"""
        base_size = len(df) * len(df.columns) * 8 / 1024 / 1024  # MB
        
        memory_multipliers = {
            'join': 1.5,
            'groupby': 1.2,
            'window': 2.0,
            'sort': 1.8,
            'pivot': 3.0
        }
        
        return base_size * memory_multipliers.get(operation, 1.0)
    
    def advanced_time_series_analysis(self, df: pl.DataFrame, date_col: str = 'order_date') -> pl.DataFrame:
        """高度な時系列分析パイプライン"""
        query_hash = self._generate_query_hash(f"time_series_{date_col}")
        
        if self.enable_caching and query_hash in self.query_cache:
            logger.info(f"キャッシュからクエリ結果を取得: {query_hash}")
            return self.query_cache[query_hash]
        
        start_time = time.time()
        
        # 時系列特徴量の包括的生成
        result = (df
                 .with_columns([
                     pl.col(date_col).str.strptime(pl.Datetime).alias('parsed_date')
                 ])
                 .with_columns([
                     # 基本時間特徴量
                     pl.col('parsed_date').dt.year().alias('year'),
                     pl.col('parsed_date').dt.month().alias('month'),
                     pl.col('parsed_date').dt.day().alias('day'),
                     pl.col('parsed_date').dt.weekday().alias('weekday'),
                     pl.col('parsed_date').dt.hour().alias('hour'),
                     
                     # ビジネス特徴量
                     pl.col('parsed_date').dt.is_in([6, 7]).alias('is_weekend'),
                     pl.col('parsed_date').dt.month().is_in([12, 1, 2]).alias('is_winter'),
                     (pl.col('parsed_date').dt.day() <= 7).alias('is_month_start'),
                     
                     # 季節性特徴量
                     (pl.col('parsed_date').dt.month() % 12 / 3).round(0).alias('season'),
                     
                     # 時系列ラグ特徴量
                     pl.col('total_amount').shift(1).over('customer_id').alias('prev_order_amount'),
                     pl.col('total_amount').shift(7).over('customer_id').alias('week_ago_amount'),
                     
                     # 移動統計
                     pl.col('total_amount').rolling_mean(window_size=7, by='parsed_date').alias('ma_7d'),
                     pl.col('total_amount').rolling_std(window_size=30, by='parsed_date').alias('std_30d'),
                     
                     # パーセンタイルベースの特徴量
                     (pl.col('total_amount') > pl.col('total_amount').quantile(0.75).over('category')).alias('is_top_quartile'),
                     
                     # 顧客行動パターン
                     pl.col('parsed_date').diff().dt.total_days().over('customer_id').alias('days_since_last_order'),
                     pl.col('total_amount').count().over(['customer_id', pl.col('parsed_date').dt.month()]).alias('monthly_frequency')
                 ])
                 .with_columns([
                     # 二次特徴量(他の特徴量から生成)
                     (pl.col('total_amount') / pl.col('ma_7d')).alias('amount_to_avg_ratio'),
                     (pl.col('days_since_last_order') > 30).alias('is_returning_customer'),
                     
                     # 顧客ライフサイクル特徴量
                     pl.when(pl.col('monthly_frequency') == 1)
                       .then(pl.lit('New'))
                       .when(pl.col('monthly_frequency') <= 3)
                       .then(pl.lit('Occasional'))
                       .otherwise(pl.lit('Frequent'))
                       .alias('customer_lifecycle_stage')
                 ]))
        
        execution_time = time.time() - start_time
        self.execution_stats[query_hash] = {
            'execution_time': execution_time,
            'rows_processed': len(result),
            'columns_generated': len(result.columns) - len(df.columns)
        }
        
        if self.enable_caching:
            self.query_cache[query_hash] = result
            
        logger.info(f"時系列分析完了: {execution_time:.2f}秒, {len(result.columns)} 特徴量生成")
        return result
    
    def customer_lifetime_value_analysis(self, df: pl.DataFrame) -> pl.DataFrame:
        """顧客生涯価値の高度分析"""
        
        # CLV計算のための複雑な集計
        clv_analysis = (df
                       .groupby('customer_id')
                       .agg([
                           # 基本指標
                           pl.col('total_amount').sum().alias('total_spent'),
                           pl.col('total_amount').mean().alias('avg_order_value'),
                           pl.col('total_amount').count().alias('order_frequency'),
                           pl.col('order_date').min().alias('first_order_date'),
                           pl.col('order_date').max().alias('last_order_date'),
                           
                           # 高度指標
                           pl.col('total_amount').std().alias('spending_volatility'),
                           pl.col('category').n_unique().alias('category_diversity'),
                           pl.col('total_amount').max().alias('max_single_order'),
                           
                           # 時系列指標
                           (pl.col('order_date').max() - pl.col('order_date').min()).alias('customer_lifespan_days')
                       ])
                       .with_columns([
                           # CLV計算
                           (pl.col('total_spent') / pl.col('customer_lifespan_days').dt.total_days().replace(0, 1) * 365).alias('annual_value'),
                           (pl.col('order_frequency') / pl.col('customer_lifespan_days').dt.total_days().replace(0, 1) * 30).alias('monthly_frequency'),
                           
                           # リスクスコア
                           (1 / (1 + pl.col('spending_volatility') / pl.col('avg_order_value'))).alias('stability_score'),
                           
                           # セグメンテーション
                           pl.when(pl.col('total_spent') > pl.col('total_spent').quantile(0.8))
                             .then(pl.lit('High_Value'))
                             .when(pl.col('order_frequency') > pl.col('order_frequency').quantile(0.8))
                             .then(pl.lit('High_Frequency'))
                             .when(pl.col('category_diversity') >= 3)
                             .then(pl.lit('Diverse_Shopper'))
                             .otherwise(pl.lit('Standard'))
                             .alias('customer_segment')
                       ]))
        
        return clv_analysis
    
    def parallel_aggregation_pipeline(self, df: pl.DataFrame, group_cols: List[str]) -> pl.DataFrame:
        """並列処理による高速集計パイプライン"""
        
        def create_aggregation_query(group_col: str) -> pl.LazyFrame:
            """個別集計クエリの生成"""
            return (df.lazy()
                   .groupby(group_col)
                   .agg([
                       pl.col('total_amount').sum().alias(f'total_by_{group_col}'),
                       pl.col('total_amount').mean().alias(f'avg_by_{group_col}'),
                       pl.col('customer_id').n_unique().alias(f'unique_customers_by_{group_col}'),
                       pl.col('quantity').sum().alias(f'total_quantity_by_{group_col}')
                   ])
                   .with_columns([
                       pl.lit(group_col).alias('group_type')
                   ]))
        
        # 並列でクエリを実行
        with ThreadPoolExecutor(max_workers=min(len(group_cols), 4)) as executor:
            future_to_group = {
                executor.submit(create_aggregation_query(group_col).collect): group_col 
                for group_col in group_cols
            }
            
            results = []
            for future in as_completed(future_to_group):
                group_col = future_to_group[future]
                try:
                    result = future.result()
                    results.append(result)
                    logger.info(f"並列集計完了: {group_col}")
                except Exception as exc:
                    logger.error(f"並列集計エラー {group_col}: {exc}")
        
        # 結果を統合
        if results:
            combined_result = pl.concat(results, how="diagonal")
            return combined_result
        else:
            return pl.DataFrame()
    
    def data_quality_assessment(self, df: pl.DataFrame) -> Dict[str, DataQualityLevel]:
        """データ品質の自動評価"""
        quality_scores = {}
        
        for column in df.columns:
            col_data = df.select(pl.col(column))
            
            # 欠損率
            null_ratio = col_data.null_count().item() / len(df)
            
            # ユニーク率
            if df[column].dtype in [pl.String, pl.Categorical]:
                unique_ratio = col_data.n_unique() / len(df) if len(df) > 0 else 0
            else:
                unique_ratio = 1.0  # 数値列は常に高品質とみなす
            
            # 品質スコア計算
            score = (1 - null_ratio) * 0.7 + unique_ratio * 0.3
            
            if score >= 0.8:
                quality_scores[column] = DataQualityLevel.HIGH
            elif score >= 0.5:
                quality_scores[column] = DataQualityLevel.MEDIUM
            else:
                quality_scores[column] = DataQualityLevel.LOW
        
        return quality_scores
    
    def generate_optimization_report(self) -> Dict:
        """最適化レポートの生成"""
        return {
            'execution_statistics': self.execution_stats,
            'cache_hit_ratio': len(self.query_cache) / (len(self.execution_stats) + 1),
            'memory_usage_mb': sum(self._estimate_memory_usage(df, 'cache') for df in self.query_cache.values()),
            'total_queries_executed': len(self.execution_stats),
            'avg_execution_time': sum(stat['execution_time'] for stat in self.execution_stats.values()) / len(self.execution_stats) if self.execution_stats else 0
        }

# プロダクションレベルの使用例
engine = ProductionPolarsEngine(max_memory_gb=8.0, enable_caching=True)

if 'df' in locals():
    try:
        df_polars = pl.from_pandas(df)
        
        # 時系列分析の実行
        ts_analysis = engine.advanced_time_series_analysis(df_polars)
        print(f"時系列特徴量生成完了: {ts_analysis.shape}")
        
        # CLV分析の実行  
        clv_analysis = engine.customer_lifetime_value_analysis(df_polars)
        print(f"CLV分析完了: {clv_analysis.shape}")
        print("上位5顧客の年間価値:")
        print(clv_analysis.sort('annual_value', descending=True).head().to_pandas())
        
        # 並列集計の実行
        parallel_results = engine.parallel_aggregation_pipeline(df_polars, ['category', 'region'])
        print(f"並列集計完了: {parallel_results.shape}")
        
        # データ品質評価
        quality_assessment = engine.data_quality_assessment(df_polars)
        print("\nデータ品質評価:")
        for col, level in quality_assessment.items():
            print(f"  {col}: {level.value}")
        
        # 最適化レポート
        optimization_report = engine.generate_optimization_report()
        print(f"\n最適化レポート:")
        print(f"  実行クエリ数: {optimization_report['total_queries_executed']}")
        print(f"  平均実行時間: {optimization_report['avg_execution_time']:.3f}秒")
        print(f"  キャッシュメモリ使用量: {optimization_report['memory_usage_mb']:.1f}MB")
        
    except Exception as e:
        logger.error(f"Polars処理エラー: {e}")
        print("Polarsの処理でエラーが発生しました。pandas版で続行します。")

さらに理解を深める参考書

関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。

4. データ可視化実装|エンタープライズ可視化エンジン

4.1 プロダクション対応可視化システム

import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.dates as mdates
from matplotlib.backends.backend_pdf import PdfPages
from matplotlib.patches import Rectangle, FancyBboxPatch
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.offline as pyo
from typing import Union, Optional, Callable
from pathlib import Path
import base64
from io import BytesIO
import warnings
warnings.filterwarnings('ignore', category=FutureWarning)

class EnterpriseVisualizationEngine:
    """エンタープライズレベル可視化エンジン"""
    
    def __init__(self, 
                 brand_colors: Optional[Dict[str, str]] = None,
                 output_format: str = 'both',  # 'static', 'interactive', 'both'
                 dpi: int = 300):
        
        self.brand_colors = brand_colors or self._get_default_palette()
        self.output_format = output_format
        self.dpi = dpi
        self.figures_created = []
        self._setup_plotting_environment()
    
    def _get_default_palette(self) -> Dict[str, str]:
        """デフォルトカラーパレット(企業ブランドガイドライン対応)"""
        return {
            'primary': '#2E86C1',
            'secondary': '#E74C3C', 
            'success': '#28B463',
            'warning': '#F39C12',
            'danger': '#E74C3C',
            'info': '#17A2B8',
            'light': '#F8F9FA',
            'dark': '#343A40',
            'accent1': '#9B59B6',
            'accent2': '#E67E22'
        }
    
    def _setup_plotting_environment(self):
        """プロット環境の最適化設定"""
        # 日本語フォント設定(OS別対応)
        import platform
        system = platform.system()
        
        if system == 'Darwin':  # macOS
            plt.rcParams['font.family'] = ['Hiragino Sans', 'DejaVu Sans']
        elif system == 'Windows':
            plt.rcParams['font.family'] = ['Yu Gothic', 'DejaVu Sans']
        else:  # Linux
            plt.rcParams['font.family'] = ['Noto Sans CJK JP', 'DejaVu Sans']
        
        # プロダクション品質設定
        plt.rcParams.update({
            'figure.dpi': self.dpi,
            'savefig.dpi': self.dpi,
            'font.size': 11,
            'axes.titlesize': 14,
            'axes.labelsize': 12,
            'xtick.labelsize': 10,
            'ytick.labelsize': 10,
            'legend.fontsize': 10,
            'figure.titlesize': 16,
            'axes.grid': True,
            'grid.alpha': 0.3,
            'axes.spines.top': False,
            'axes.spines.right': False,
            'figure.facecolor': 'white',
            'axes.facecolor': 'white'
        })
    
    def create_executive_dashboard(self, df: pd.DataFrame) -> Union[plt.Figure, go.Figure]:
        """エグゼクティブ向け統合ダッシュボード"""
        
        if self.output_format in ['static', 'both']:
            static_fig = self._create_static_executive_dashboard(df)
            
        if self.output_format in ['interactive', 'both']:
            interactive_fig = self._create_interactive_executive_dashboard(df)
            
        if self.output_format == 'static':
            return static_fig
        elif self.output_format == 'interactive':
            return interactive_fig
        else:
            return static_fig, interactive_fig
    
    def _create_static_executive_dashboard(self, df: pd.DataFrame) -> plt.Figure:
        """静的エグゼクティブダッシュボード"""
        fig = plt.figure(figsize=(20, 14))
        gs = fig.add_gridspec(4, 4, hspace=0.3, wspace=0.3)
        
        # メインタイトルとサマリー
        fig.suptitle('Executive Sales Dashboard', 
                    fontsize=24, fontweight='bold', y=0.95)
        
        # KPI サマリーカード
        kpi_ax = fig.add_subplot(gs[0, :])
        self._create_kpi_cards(kpi_ax, df)
        
        # 売上トレンド(時系列)
        trend_ax = fig.add_subplot(gs[1, :2])
        self._create_revenue_trend(trend_ax, df)
        
        # 地域別パフォーマンス
        region_ax = fig.add_subplot(gs[1, 2:])
        self._create_regional_performance(region_ax, df)
        
        # 顧客セグメント分析
        segment_ax = fig.add_subplot(gs[2, :2])
        self._create_customer_segmentation(segment_ax, df)
        
        # 商品カテゴリ分析
        category_ax = fig.add_subplot(gs[2, 2:])
        self._create_category_analysis(category_ax, df)
        
        # 予測とトレンド
        forecast_ax = fig.add_subplot(gs[3, :])
        self._create_forecast_visualization(forecast_ax, df)
        
        return fig
    
    def _create_kpi_cards(self, ax: plt.Axes, df: pd.DataFrame):
        """KPIカードの作成"""
        ax.axis('off')
        
        # KPI計算
        total_revenue = df['total_amount'].sum()
        total_orders = len(df)
        avg_order_value = df['total_amount'].mean()
        unique_customers = df['customer_id'].nunique()
        
        # 前期比計算(模擬データ)
        current_month = df['order_date'].max()
        prev_month_data = df[pd.to_datetime(df['order_date']) < current_month - pd.DateOffset(months=1)]
        
        if not prev_month_data.empty:
            revenue_growth = ((total_revenue - prev_month_data['total_amount'].sum()) / 
                            prev_month_data['total_amount'].sum() * 100)
        else:
            revenue_growth = 0
        
        kpis = [
            ('総売上', f'¥{total_revenue:,.0f}', f'{revenue_growth:+.1f}%', self.brand_colors['primary']),
            ('注文数', f'{total_orders:,}', '+5.2%', self.brand_colors['success']),
            ('平均注文額', f'¥{avg_order_value:,.0f}', '+2.1%', self.brand_colors['warning']),
            ('顧客数', f'{unique_customers:,}', '+8.3%', self.brand_colors['info'])
        ]
        
        card_width = 0.22
        card_height = 0.8
        
        for i, (title, value, change, color) in enumerate(kpis):
            x = i * 0.25 + 0.02
            
            # カード背景
            card = FancyBboxPatch((x, 0.1), card_width, card_height,
                                boxstyle="round,pad=0.02",
                                facecolor=color, alpha=0.1,
                                edgecolor=color, linewidth=2)
            ax.add_patch(card)
            
            # テキスト
            ax.text(x + card_width/2, 0.7, title, ha='center', va='center',
                   fontsize=12, fontweight='bold', color='black')
            ax.text(x + card_width/2, 0.5, value, ha='center', va='center',
                   fontsize=16, fontweight='bold', color=color)
            ax.text(x + card_width/2, 0.3, change, ha='center', va='center',
                   fontsize=10, color=self.brand_colors['success'] if '+' in change else self.brand_colors['danger'])
        
        ax.set_xlim(0, 1)
        ax.set_ylim(0, 1)
    
    def _create_revenue_trend(self, ax: plt.Axes, df: pd.DataFrame):
        """売上トレンド可視化"""
        # 日別売上集計
        daily_revenue = (df.groupby(pd.to_datetime(df['order_date']).dt.date)
                        ['total_amount'].sum().reset_index())
        daily_revenue.columns = ['date', 'revenue']
        daily_revenue['date'] = pd.to_datetime(daily_revenue['date'])
        
        # トレンドライン
        ax.plot(daily_revenue['date'], daily_revenue['revenue'], 
                color=self.brand_colors['primary'], linewidth=3, alpha=0.8)
        
        # 移動平均
        daily_revenue['ma7'] = daily_revenue['revenue'].rolling(7).mean()
        ax.plot(daily_revenue['date'], daily_revenue['ma7'], 
                color=self.brand_colors['secondary'], linewidth=2, 
                linestyle='--', label='7日移動平均')
        
        # 面グラフ
        ax.fill_between(daily_revenue['date'], daily_revenue['revenue'], 
                       alpha=0.3, color=self.brand_colors['primary'])
        
        # 最高売上日をハイライト
        max_revenue_day = daily_revenue.loc[daily_revenue['revenue'].idxmax()]
        ax.scatter(max_revenue_day['date'], max_revenue_day['revenue'], 
                  color=self.brand_colors['accent1'], s=100, zorder=5)
        ax.annotate(f'最高売上: ¥{max_revenue_day["revenue"]:,.0f}',
                   xy=(max_revenue_day['date'], max_revenue_day['revenue']),
                   xytext=(10, 10), textcoords='offset points',
                   bbox=dict(boxstyle='round,pad=0.3', facecolor='yellow', alpha=0.7),
                   arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=0'))
        
        ax.set_title('売上トレンド分析', fontsize=14, fontweight='bold')
        ax.set_xlabel('日付')
        ax.set_ylabel('売上金額 (¥)')
        ax.legend()
        
        # 日付フォーマット
        ax.xaxis.set_major_formatter(mdates.DateFormatter('%m/%d'))
        ax.xaxis.set_major_locator(mdates.DayLocator(interval=7))
        plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
    
    def _create_regional_performance(self, ax: plt.Axes, df: pd.DataFrame):
        """地域別パフォーマンス可視化"""
        region_performance = (df.groupby('region')
                            .agg({
                                'total_amount': ['sum', 'mean', 'count'],
                                'customer_id': 'nunique'
                            })
                            .round(0))
        
        region_performance.columns = ['total_sales', 'avg_order', 'order_count', 'customers']
        region_performance = region_performance.sort_values('total_sales', ascending=True)
        
        # 水平棒グラフ
        bars = ax.barh(region_performance.index, region_performance['total_sales'], 
                      color=[self.brand_colors['primary'], self.brand_colors['secondary'],
                            self.brand_colors['success'], self.brand_colors['warning']][:len(region_performance)])
        
        # 値をバーに表示
        for bar in bars:
            width = bar.get_width()
            ax.text(width, bar.get_y() + bar.get_height()/2, 
                   f'¥{width:,.0f}', ha='left', va='center', fontweight='bold')
        
        ax.set_title('地域別売上パフォーマンス', fontsize=14, fontweight='bold')
        ax.set_xlabel('売上金額 (¥)')
        
        # パフォーマンス指標の追加表示
        for i, (region, row) in enumerate(region_performance.iterrows()):
            ax.text(row['total_sales'] * 0.05, i, 
                   f"顧客数: {row['customers']:.0f}\n平均: ¥{row['avg_order']:,.0f}",
                   va='center', fontsize=8, color='white', fontweight='bold')
    
    def _create_interactive_executive_dashboard(self, df: pd.DataFrame) -> go.Figure:
        """インタラクティブエグゼクティブダッシュボード"""
        
        # サブプロットの設定
        fig = make_subplots(
            rows=3, cols=2,
            subplot_titles=('売上トレンド', '地域別売上', '顧客セグメント', 'カテゴリ分析', '時間別売上パターン', '予測分析'),
            specs=[[{"secondary_y": True}, {"type": "bar"}],
                   [{"type": "scatter"}, {"type": "pie"}],
                   [{"type": "heatmap"}, {"secondary_y": True}]],
            vertical_spacing=0.12
        )
        
        # 1. 売上トレンド
        daily_sales = df.groupby(pd.to_datetime(df['order_date']).dt.date)['total_amount'].sum().reset_index()
        daily_sales.columns = ['date', 'revenue']
        
        fig.add_trace(
            go.Scatter(x=daily_sales['date'], y=daily_sales['revenue'],
                      mode='lines+markers', name='日別売上',
                      line=dict(color=self.brand_colors['primary'], width=3),
                      hovertemplate='日付: %{x}<br>売上: ¥%{y:,.0f}<extra></extra>'),
            row=1, col=1
        )
        
        # 移動平均追加
        daily_sales['ma7'] = daily_sales['revenue'].rolling(7).mean()
        fig.add_trace(
            go.Scatter(x=daily_sales['date'], y=daily_sales['ma7'],
                      mode='lines', name='7日移動平均',
                      line=dict(color=self.brand_colors['secondary'], width=2, dash='dash')),
            row=1, col=1
        )
        
        # 2. 地域別売上
        region_sales = df.groupby('region')['total_amount'].sum().sort_values(ascending=True)
        fig.add_trace(
            go.Bar(y=region_sales.index, x=region_sales.values, orientation='h',
                  name='地域別売上', marker_color=self.brand_colors['success'],
                  hovertemplate='地域: %{y}<br>売上: ¥%{x:,.0f}<extra></extra>'),
            row=1, col=2
        )
        
        # 3. 顧客セグメント(RFM分析ベース)
        # 簡易RFM計算
        customer_rfm = self._calculate_simple_rfm(df)
        fig.add_trace(
            go.Scatter(x=customer_rfm['frequency'], y=customer_rfm['monetary'],
                      mode='markers', name='顧客分布',
                      marker=dict(size=customer_rfm['recency']/10, 
                                color=customer_rfm['monetary'],
                                colorscale='Viridis', showscale=True,
                                colorbar=dict(title="購入金額")),
                      hovertemplate='頻度: %{x}<br>金額: ¥%{y:,.0f}<br>最新購入: %{marker.size:.0f}日前<extra></extra>'),
            row=2, col=1
        )
        
        # 4. カテゴリ分析
        category_sales = df.groupby('category')['total_amount'].sum()
        fig.add_trace(
            go.Pie(labels=category_sales.index, values=category_sales.values,
                  name='カテゴリ別売上'),
            row=2, col=2
        )
        
        # レイアウト設定
        fig.update_layout(
            height=1200,
            title_text="エグゼクティブダッシュボード - 売上分析",
            title_font_size=20,
            showlegend=True
        )
        
        return fig
    
    def _calculate_simple_rfm(self, df: pd.DataFrame) -> pd.DataFrame:
        """簡易RFM分析"""
        analysis_date = df['order_date'].max()
        
        rfm = (df.groupby('customer_id')
               .agg({
                   'order_date': lambda x: (pd.to_datetime(analysis_date) - pd.to_datetime(x).max()).days,
                   'total_amount': ['count', 'sum']
               }))
        
        rfm.columns = ['recency', 'frequency', 'monetary']
        return rfm.reset_index()
    
    def export_dashboard(self, 
                        fig: Union[plt.Figure, go.Figure], 
                        filename: str, 
                        formats: List[str] = ['png', 'pdf']):
        """ダッシュボードのエクスポート"""
        
        output_path = Path('dashboard_exports')
        output_path.mkdir(exist_ok=True)
        
        if isinstance(fig, plt.Figure):
            # matplotlib figure
            for fmt in formats:
                filepath = output_path / f"{filename}.{fmt}"
                fig.savefig(filepath, dpi=self.dpi, bbox_inches='tight', 
                           facecolor='white', edgecolor='none')
                logger.info(f"静的ダッシュボードを保存: {filepath}")
        
        elif isinstance(fig, go.Figure):
            # plotly figure
            for fmt in formats:
                if fmt == 'html':
                    filepath = output_path / f"{filename}.html"
                    fig.write_html(filepath)
                elif fmt == 'png':
                    filepath = output_path / f"{filename}.png"
                    fig.write_image(filepath, width=1920, height=1080, scale=2)
                elif fmt == 'pdf':
                    filepath = output_path / f"{filename}.pdf"
                    fig.write_image(filepath, width=1920, height=1080)
                
                logger.info(f"インタラクティブダッシュボードを保存: {filepath}")

# プロダクション使用例
viz_engine = EnterpriseVisualizationEngine(
    output_format='both',
    dpi=300
)

if 'df' in locals():
    # エグゼクティブダッシュボードの作成
    dashboard_results = viz_engine.create_executive_dashboard(df)
    
    if isinstance(dashboard_results, tuple):
        static_dashboard, interactive_dashboard = dashboard_results
        
        # 静的ダッシュボード表示
        plt.show()
        
        # インタラクティブダッシュボード保存
        viz_engine.export_dashboard(interactive_dashboard, 'executive_dashboard_interactive', ['html', 'png'])
        
        print("エグゼクティブダッシュボード作成完了")
        print("- 静的版: 表示済み")
        print("- インタラクティブ版: dashboard_exports/ に保存")
    
    else:
        # 単一形式のダッシュボード
        viz_engine.export_dashboard(dashboard_results, 'executive_dashboard', ['png', 'pdf'])
        print("ダッシュボード作成・保存完了")

4.2 インタラクティブ可視化の実装

import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.offline as pyo

class InteractiveVisualizer:
    """インタラクティブ可視化クラス"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.df['date'] = pd.to_datetime(self.df['date'])
    
    def create_interactive_dashboard(self):
        """インタラクティブダッシュボードの作成"""
        # サブプロットの設定
        fig = make_subplots(
            rows=2, cols=2,
            subplot_titles=('時系列トレンド', 'カテゴリ別売上', '散布図分析', '地域別分析'),
            specs=[[{"secondary_y": False}, {"secondary_y": False}],
                   [{"secondary_y": False}, {"secondary_y": False}]]
        )
        
        # 1. 時系列トレンド
        daily_sales = self.df.groupby(self.df['date'].dt.date)['sales_amount'].sum().reset_index()
        daily_sales['date'] = pd.to_datetime(daily_sales['date'])
        
        fig.add_trace(
            go.Scatter(
                x=daily_sales['date'],
                y=daily_sales['sales_amount'],
                mode='lines+markers',
                name='日別売上',
                hovertemplate='日付: %{x}<br>売上: ¥%{y:,.0f}<extra></extra>'
            ),
            row=1, col=1
        )
        
        # 2. カテゴリ別売上
        category_sales = self.df.groupby('category')['sales_amount'].sum().reset_index()
        fig.add_trace(
            go.Bar(
                x=category_sales['category'],
                y=category_sales['sales_amount'],
                name='カテゴリ別売上',
                hovertemplate='カテゴリ: %{x}<br>売上: ¥%{y:,.0f}<extra></extra>'
            ),
            row=1, col=2
        )
        
        # 3. 散布図分析
        fig.add_trace(
            go.Scatter(
                x=self.df['quantity'],
                y=self.df['sales_amount'],
                mode='markers',
                name='数量vs売上',
                hovertemplate='数量: %{x}<br>売上: ¥%{y:,.0f}<extra></extra>',
                marker=dict(
                    size=8,
                    color=self.df['sales_amount'],
                    colorscale='Viridis',
                    showscale=True
                )
            ),
            row=2, col=1
        )
        
        # 4. 地域別分析
        region_sales = self.df.groupby('region')['sales_amount'].sum().reset_index()
        fig.add_trace(
            go.Pie(
                labels=region_sales['region'],
                values=region_sales['sales_amount'],
                name='地域別売上',
                hovertemplate='地域: %{label}<br>売上: ¥%{value:,.0f}<br>割合: %{percent}<extra></extra>'
            ),
            row=2, col=2
        )
        
        # レイアウト設定
        fig.update_layout(
            title_text="インタラクティブ売上分析ダッシュボード",
            showlegend=True,
            height=800
        )
        
        return fig
    
    def create_3d_analysis(self):
        """3D分析の可視化"""
        # 時間、カテゴリ、地域の3次元分析
        df_3d = (self.df.groupby(['category', 'region', pd.to_datetime(self.df['date']).dt.hour])
                ['sales_amount'].sum().reset_index())
        df_3d.columns = ['category', 'region', 'hour', 'sales_amount']
        
        fig = go.Figure(data=go.Scatter3d(
            x=df_3d['hour'],
            y=pd.Categorical(df_3d['category']).codes,
            z=pd.Categorical(df_3d['region']).codes,
            mode='markers',
            marker=dict(
                size=df_3d['sales_amount'] / 1000,  # サイズ正規化
                color=df_3d['sales_amount'],
                colorscale='Rainbow',
                showscale=True,
                opacity=0.8
            ),
            text=[f'時間: {h}時<br>カテゴリ: {c}<br>地域: {r}<br>売上: ¥{s:,.0f}'
                  for h, c, r, s in zip(df_3d['hour'], df_3d['category'], 
                                       df_3d['region'], df_3d['sales_amount'])],
            hovertemplate='%{text}<extra></extra>'
        ))
        
        fig.update_layout(
            title='3D売上分析(時間×カテゴリ×地域)',
            scene=dict(
                xaxis_title='時間',
                yaxis_title='カテゴリ',
                zaxis_title='地域'
            )
        )
        
        return fig

# インタラクティブ可視化の実行
try:
    interactive_viz = InteractiveVisualizer(df)
    
    # インタラクティブダッシュボード
    dashboard = interactive_viz.create_interactive_dashboard()
    dashboard.show()
    
    # 3D分析
    analysis_3d = interactive_viz.create_3d_analysis()
    analysis_3d.show()
    
    print("インタラクティブ可視化完了")
    
except ImportError:
    print("Plotlyがインストールされていません。")
    print("インストール: pip install plotly")

さらに理解を深める参考書

関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。

5. 統計分析・機械学習実装

5.1 統計的仮説検定の実装

from scipy import stats
from scipy.stats import normaltest, levene, mannwhitneyu, kruskal
from statsmodels.stats.power import ttest_power
from statsmodels.stats.proportion import proportions_ztest
import statsmodels.api as sm
from statsmodels.formula.api import ols
from statsmodels.stats.multicomp import pairwise_tukeyhsd

class StatisticalAnalyzer:
    """統計分析クラス"""
    
    def __init__(self, df: pd.DataFrame, alpha: float = 0.05):
        self.df = df.copy()
        self.alpha = alpha
    
    def normality_test(self, column: str) -> Dict[str, any]:
        """正規性の検定"""
        data = self.df[column].dropna()
        
        results = {
            'sample_size': len(data),
            'mean': data.mean(),
            'std': data.std(),
            'skewness': stats.skew(data),
            'kurtosis': stats.kurtosis(data)
        }
        
        # Shapiro-Wilk検定(サンプルサイズ < 5000)
        if len(data) < 5000:
            shapiro_stat, shapiro_p = stats.shapiro(data)
            results['shapiro_wilk'] = {
                'statistic': shapiro_stat,
                'p_value': shapiro_p,
                'is_normal': shapiro_p > self.alpha
            }
        
        # D'Agostino検定
        dagostino_stat, dagostino_p = normaltest(data)
        results['dagostino'] = {
            'statistic': dagostino_stat,
            'p_value': dagostino_p,
            'is_normal': dagostino_p > self.alpha
        }
        
        # Kolmogorov-Smirnov検定
        ks_stat, ks_p = stats.kstest(data, 'norm', args=(data.mean(), data.std()))
        results['kolmogorov_smirnov'] = {
            'statistic': ks_stat,
            'p_value': ks_p,
            'is_normal': ks_p > self.alpha
        }
        
        return results
    
    def compare_groups_analysis(self, group_col: str, value_col: str) -> Dict[str, any]:
        """群間比較の包括的分析"""
        groups = [group[1][value_col].dropna() for group in self.df.groupby(group_col)]
        group_names = list(self.df[group_col].unique())
        
        results = {
            'group_names': group_names,
            'group_sizes': [len(group) for group in groups],
            'group_means': [group.mean() for group in groups],
            'group_stds': [group.std() for group in groups]
        }
        
        # 等分散性の検定(Levene検定)
        levene_stat, levene_p = levene(*groups)
        results['levene_test'] = {
            'statistic': levene_stat,
            'p_value': levene_p,
            'equal_variances': levene_p > self.alpha
        }
        
        # 正規性の確認
        normality_results = []
        for i, group in enumerate(groups):
            if len(group) > 3:  # 最小サンプル数チェック
                _, p_value = normaltest(group) if len(group) >= 8 else (None, 0.05)
                normality_results.append(p_value > self.alpha if p_value is not None else False)
            else:
                normality_results.append(False)
        
        results['group_normality'] = normality_results
        all_normal = all(normality_results)
        equal_variances = results['levene_test']['equal_variances']
        
        # 適切な検定の選択と実行
        if len(groups) == 2:
            if all_normal and equal_variances:
                # 対応のないt検定
                stat, p_value = stats.ttest_ind(groups[0], groups[1])
                test_name = "Independent t-test"
            elif all_normal and not equal_variances:
                # Welchのt検定
                stat, p_value = stats.ttest_ind(groups[0], groups[1], equal_var=False)
                test_name = "Welch's t-test"
            else:
                # Mann-Whitney U検定
                stat, p_value = mannwhitneyu(groups[0], groups[1])
                test_name = "Mann-Whitney U test"
        else:
            if all_normal and equal_variances:
                # 一元配置分散分析
                stat, p_value = stats.f_oneway(*groups)
                test_name = "One-way ANOVA"
                
                # 事後検定(Tukey HSD)
                if p_value <= self.alpha:
                    df_tukey = pd.DataFrame()
                    for i, name in enumerate(group_names):
                        temp_df = pd.DataFrame({
                            group_col: name,
                            value_col: groups[i]
                        })
                        df_tukey = pd.concat([df_tukey, temp_df])
                    
                    tukey_results = pairwise_tukeyhsd(df_tukey[value_col], df_tukey[group_col])
                    results['post_hoc'] = {
                        'test': 'Tukey HSD',
                        'results': str(tukey_results)
                    }
            else:
                # Kruskal-Wallis検定
                stat, p_value = kruskal(*groups)
                test_name = "Kruskal-Wallis test"
        
        results['main_test'] = {
            'test_name': test_name,
            'statistic': stat,
            'p_value': p_value,
            'significant': p_value <= self.alpha,
            'effect_size': self._calculate_effect_size(groups, test_name)
        }
        
        return results
    
    def _calculate_effect_size(self, groups: List[np.ndarray], test_name: str) -> float:
        """効果量の計算"""
        if len(groups) == 2:
            # Cohen's d
            pooled_std = np.sqrt(((len(groups[0]) - 1) * groups[0].var() + 
                                 (len(groups[1]) - 1) * groups[1].var()) / 
                                (len(groups[0]) + len(groups[1]) - 2))
            if pooled_std > 0:
                return abs(groups[0].mean() - groups[1].mean()) / pooled_std
        else:
            # 偏イータ二乗(近似)
            all_data = np.concatenate(groups)
            between_var = sum(len(group) * (group.mean() - all_data.mean())**2 for group in groups)
            total_var = sum((x - all_data.mean())**2 for x in all_data)
            if total_var > 0:
                return between_var / total_var
        
        return 0.0
    
    def correlation_analysis(self) -> pd.DataFrame:
        """相関分析の実装"""
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        
        results = []
        for i, col1 in enumerate(numeric_cols):
            for j, col2 in enumerate(numeric_cols):
                if i < j:  # 上三角のみ
                    data1 = self.df[col1].dropna()
                    data2 = self.df[col2].dropna()
                    
                    # 共通インデックスでデータを揃える
                    common_idx = data1.index.intersection(data2.index)
                    data1_common = data1[common_idx]
                    data2_common = data2[common_idx]
                    
                    if len(data1_common) > 3:
                        # Pearson相関
                        pearson_r, pearson_p = stats.pearsonr(data1_common, data2_common)
                        
                        # Spearman相関
                        spearman_r, spearman_p = stats.spearmanr(data1_common, data2_common)
                        
                        results.append({
                            'variable1': col1,
                            'variable2': col2,
                            'n_samples': len(data1_common),
                            'pearson_r': pearson_r,
                            'pearson_p': pearson_p,
                            'pearson_significant': pearson_p <= self.alpha,
                            'spearman_r': spearman_r,
                            'spearman_p': spearman_p,
                            'spearman_significant': spearman_p <= self.alpha
                        })
        
        return pd.DataFrame(results)

# 統計分析の実行例
analyzer = StatisticalAnalyzer(df)

print("統計分析実行中...")

# 正規性検定
normality_result = analyzer.normality_test('sales_amount')
print("売上金額の正規性検定:")
print(f"  平均: {normality_result['mean']:.2f}")
print(f"  標準偏差: {normality_result['std']:.2f}")
print(f"  歪度: {normality_result['skewness']:.3f}")
print(f"  尖度: {normality_result['kurtosis']:.3f}")

if 'shapiro_wilk' in normality_result:
    print(f"  Shapiro-Wilk検定: p-value = {normality_result['shapiro_wilk']['p_value']:.6f}")
    print(f"  正規分布: {normality_result['shapiro_wilk']['is_normal']}")

# 群間比較
group_comparison = analyzer.compare_groups_analysis('category', 'sales_amount')
print(f"\nカテゴリ間売上比較:")
print(f"  使用検定: {group_comparison['main_test']['test_name']}")
print(f"  統計量: {group_comparison['main_test']['statistic']:.4f}")
print(f"  p値: {group_comparison['main_test']['p_value']:.6f}")
print(f"  有意差: {group_comparison['main_test']['significant']}")
print(f"  効果量: {group_comparison['main_test']['effect_size']:.4f}")

# 相関分析
correlation_results = analyzer.correlation_analysis()
significant_correlations = correlation_results[
    correlation_results['pearson_significant'] == True
].sort_values('pearson_r', key=abs, ascending=False)

print(f"\n有意な相関関係:")
for _, row in significant_correlations.head().iterrows():
    print(f"  {row['variable1']} - {row['variable2']}: r = {row['pearson_r']:.3f} (p = {row['pearson_p']:.6f})")

5.2 機械学習実装パターン

from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import xgboost as xgb
from sklearn.pipeline import Pipeline
import joblib

class MLModelAnalyzer:
    """機械学習モデル分析クラス"""
    
    def __init__(self, df: pd.DataFrame, target_col: str):
        self.df = df.copy()
        self.target_col = target_col
        self.models = {}
        self.results = {}
        
    def prepare_data(self) -> Tuple[np.ndarray, np.ndarray]:
        """データ前処理と特徴量エンジニアリング"""
        df_ml = self.df.copy()
        
        # 欠損値処理
        df_ml = df_ml.dropna()
        
        # カテゴリ変数のエンコーディング
        categorical_cols = df_ml.select_dtypes(include=['object', 'category']).columns
        label_encoders = {}
        
        for col in categorical_cols:
            if col != self.target_col:
                le = LabelEncoder()
                df_ml[col] = le.fit_transform(df_ml[col].astype(str))
                label_encoders[col] = le
        
        self.label_encoders = label_encoders
        
        # 日時特徴量の処理
        if 'date' in df_ml.columns:
            df_ml['date'] = pd.to_datetime(df_ml['date'])
            df_ml['hour'] = df_ml['date'].dt.hour
            df_ml['day_of_week'] = df_ml['date'].dt.dayofweek
            df_ml['month'] = df_ml['date'].dt.month
            df_ml = df_ml.drop('date', axis=1)
        
        # 特徴量とターゲットの分離
        X = df_ml.drop(self.target_col, axis=1)
        y = df_ml[self.target_col]
        
        self.feature_names = X.columns.tolist()
        
        return X.values, y.values
    
    def train_models(self, test_size: float = 0.2, random_state: int = 42):
        """複数モデルの学習と評価"""
        X, y = self.prepare_data()
        
        # データ分割
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state
        )
        
        # スケーリング
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        self.scaler = scaler
        self.X_test = X_test_scaled
        self.y_test = y_test
        
        # モデル定義
        models = {
            'Linear Regression': LinearRegression(),
            'Ridge Regression': Ridge(alpha=1.0),
            'Lasso Regression': Lasso(alpha=1.0),
            'Random Forest': RandomForestRegressor(n_estimators=100, random_state=random_state),
            'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=random_state),
            'XGBoost': xgb.XGBRegressor(n_estimators=100, random_state=random_state)
        }
        
        # 各モデルの学習と評価
        for name, model in models.items():
            print(f"学習中: {name}")
            
            # 線形モデルにはスケーリング済みデータを使用
            if 'Regression' in name:
                model.fit(X_train_scaled, y_train)
                y_pred = model.predict(X_test_scaled)
            else:
                model.fit(X_train, y_train)
                y_pred = model.predict(X_test)
            
            # 評価指標の計算
            mse = mean_squared_error(y_test, y_pred)
            rmse = np.sqrt(mse)
            mae = mean_absolute_error(y_test, y_pred)
            r2 = r2_score(y_test, y_pred)
            
            # クロスバリデーション
            if 'Regression' in name:
                cv_scores = cross_val_score(model, X_train_scaled, y_train, 
                                          cv=5, scoring='r2', n_jobs=-1)
            else:
                cv_scores = cross_val_score(model, X_train, y_train, 
                                          cv=5, scoring='r2', n_jobs=-1)
            
            self.models[name] = model
            self.results[name] = {
                'RMSE': rmse,
                'MAE': mae,
                'R2': r2,
                'CV_R2_mean': cv_scores.mean(),
                'CV_R2_std': cv_scores.std(),
                'predictions': y_pred
            }
        
        return self.results
    
    def hyperparameter_tuning(self, model_name: str = 'Random Forest'):
        """ハイパーパラメータチューニング"""
        X, y = self.prepare_data()
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        if model_name == 'Random Forest':
            model = RandomForestRegressor(random_state=42)
            param_grid = {
                'n_estimators': [50, 100, 200],
                'max_depth': [10, 20, None],
                'min_samples_split': [2, 5, 10],
                'min_samples_leaf': [1, 2, 4]
            }
        elif model_name == 'XGBoost':
            model = xgb.XGBRegressor(random_state=42)
            param_grid = {
                'n_estimators': [50, 100, 200],
                'max_depth': [3, 6, 10],
                'learning_rate': [0.01, 0.1, 0.2],
                'subsample': [0.8, 0.9, 1.0]
            }
        
        # グリッドサーチ
        grid_search = GridSearchCV(
            model, param_grid, cv=5, scoring='r2', n_jobs=-1, verbose=1
        )
        
        print(f"{model_name} ハイパーパラメータチューニング実行中...")
        grid_search.fit(X_train, y_train)
        
        # 最適モデルでの予測
        best_model = grid_search.best_estimator_
        y_pred = best_model.predict(X_test)
        
        results = {
            'best_params': grid_search.best_params_,
            'best_score': grid_search.best_score_,
            'test_r2': r2_score(y_test, y_pred),
            'test_rmse': np.sqrt(mean_squared_error(y_test, y_pred))
        }
        
        return best_model, results
    
    def feature_importance_analysis(self, model_name: str = 'Random Forest'):
        """特徴量重要度分析"""
        if model_name not in self.models:
            print(f"モデル '{model_name}' が学習されていません。")
            return None
        
        model = self.models[model_name]
        
        if hasattr(model, 'feature_importances_'):
            importances = model.feature_importances_
            indices = np.argsort(importances)[::-1]
            
            importance_df = pd.DataFrame({
                'feature': [self.feature_names[i] for i in indices],
                'importance': importances[indices]
            })
            
            # 可視化
            plt.figure(figsize=(10, 6))
            plt.bar(range(len(importance_df)), importance_df['importance'])
            plt.xticks(range(len(importance_df)), importance_df['feature'], rotation=45)
            plt.title(f'{model_name} - 特徴量重要度')
            plt.tight_layout()
            plt.show()
            
            return importance_df
        else:
            print(f"モデル '{model_name}' は特徴量重要度をサポートしていません。")
            return None
    
    def save_best_model(self, filename: str):
        """最高性能モデルの保存"""
        best_model_name = max(self.results.keys(), key=lambda x: self.results[x]['R2'])
        best_model = self.models[best_model_name]
        
        # モデルと前処理器をパイプラインとして保存
        pipeline = Pipeline([
            ('scaler', self.scaler),
            ('model', best_model)
        ])
        
        joblib.dump(pipeline, filename)
        print(f"最高性能モデル ({best_model_name}) を {filename} に保存しました。")
        print(f"R2スコア: {self.results[best_model_name]['R2']:.4f}")

# 機械学習実行例
print("機械学習分析開始...")

# 売上予測モデルの構築
ml_analyzer = MLModelAnalyzer(feature_data, 'sales_amount')

# モデル学習
results = ml_analyzer.train_models()

# 結果表示
print("\nモデル性能比較:")
performance_df = pd.DataFrame(results).T
performance_df = performance_df.sort_values('R2', ascending=False)
print(performance_df[['RMSE', 'MAE', 'R2', 'CV_R2_mean']].round(4))

# ハイパーパラメータチューニング
try:
    best_rf, tuning_results = ml_analyzer.hyperparameter_tuning('Random Forest')
    print(f"\nチューニング結果:")
    print(f"最適パラメータ: {tuning_results['best_params']}")
    print(f"テストR2: {tuning_results['test_r2']:.4f}")
except Exception as e:
    print(f"ハイパーパラメータチューニングでエラー: {e}")

# 特徴量重要度分析
importance_df = ml_analyzer.feature_importance_analysis('Random Forest')
if importance_df is not None:
    print("\n重要な特徴量トップ5:")
    print(importance_df.head())

# モデル保存
try:
    ml_analyzer.save_best_model('best_sales_prediction_model.pkl')
except Exception as e:
    print(f"モデル保存エラー: {e}")

さらに理解を深める参考書

関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。

6. パフォーマンス最適化・メモリ管理

6.1 大規模データ処理の最適化

import gc
import psutil
from memory_profiler import profile
from contextlib import contextmanager
import multiprocessing as mp
from functools import partial

class BigDataProcessor:
    """大規模データ処理最適化クラス"""
    
    def __init__(self):
        self.memory_threshold = 0.8  # メモリ使用率80%で警告
    
    @contextmanager
    def memory_monitor(self, description: str):
        """メモリ使用量監視コンテキストマネージャ"""
        process = psutil.Process()
        initial_memory = process.memory_info().rss / 1024 / 1024  # MB
        
        print(f"開始時メモリ使用量 ({description}): {initial_memory:.2f} MB")
        
        try:
            yield
        finally:
            final_memory = process.memory_info().rss / 1024 / 1024
            memory_used = final_memory - initial_memory
            print(f"終了時メモリ使用量 ({description}): {final_memory:.2f} MB")
            print(f"メモリ増加量: {memory_used:+.2f} MB")
            
            # ガベージコレクション実行
            collected = gc.collect()
            if collected > 0:
                print(f"ガベージコレクション: {collected} オブジェクトを解放")
    
    def chunk_processor(self, file_path: str, chunk_size: int = 10000) -> pd.DataFrame:
        """チャンク単位での効率的データ処理"""
        
        def process_chunk(chunk: pd.DataFrame) -> pd.DataFrame:
            """個別チャンクの処理ロジック"""
            # データ型最適化
            chunk = self._optimize_chunk_dtypes(chunk)
            
            # 必要な計算のみ実行
            chunk['sales_per_unit'] = chunk['sales_amount'] / chunk['quantity'].replace(0, 1)
            chunk['is_weekend'] = pd.to_datetime(chunk['date']).dt.dayofweek >= 5
            
            return chunk[['customer_id', 'sales_amount', 'sales_per_unit', 'is_weekend']]
        
        processed_chunks = []
        total_rows = 0
        
        with self.memory_monitor("チャンク処理"):
            try:
                for chunk_num, chunk in enumerate(pd.read_csv(file_path, chunksize=chunk_size)):
                    processed_chunk = process_chunk(chunk)
                    processed_chunks.append(processed_chunk)
                    total_rows += len(chunk)
                    
                    if chunk_num % 10 == 0:
                        print(f"処理済みチャンク: {chunk_num + 1}, 累計行数: {total_rows:,}")
                        
                        # メモリ使用率チェック
                        memory_percent = psutil.virtual_memory().percent
                        if memory_percent > self.memory_threshold * 100:
                            print(f"警告: メモリ使用率 {memory_percent:.1f}%")
                            gc.collect()
                
                # 結合
                result = pd.concat(processed_chunks, ignore_index=True)
                print(f"総処理行数: {total_rows:,}")
                
            except Exception as e:
                print(f"チャンク処理エラー: {e}")
                result = pd.DataFrame()
            
            return result
    
    def _optimize_chunk_dtypes(self, chunk: pd.DataFrame) -> pd.DataFrame:
        """チャンクのデータ型最適化"""
        for col in chunk.select_dtypes(include=['int64']).columns:
            chunk[col] = pd.to_numeric(chunk[col], downcast='integer')
        
        for col in chunk.select_dtypes(include=['float64']).columns:
            chunk[col] = pd.to_numeric(chunk[col], downcast='float')
        
        return chunk
    
    def parallel_processing_example(self, df: pd.DataFrame, n_cores: int = None) -> pd.DataFrame:
        """並列処理による高速化"""
        if n_cores is None:
            n_cores = min(mp.cpu_count() - 1, 4)  # 最大4コアまで使用
        
        def compute_customer_stats(customer_group: pd.DataFrame) -> Dict:
            """顧客別統計の計算"""
            customer_id = customer_group['customer_id'].iloc[0]
            return {
                'customer_id': customer_id,
                'total_sales': customer_group['sales_amount'].sum(),
                'avg_sales': customer_group['sales_amount'].mean(),
                'transaction_count': len(customer_group),
                'std_sales': customer_group['sales_amount'].std()
            }
        
        with self.memory_monitor("並列処理"):
            # データを顧客ごとにグループ化
            customer_groups = [group for _, group in df.groupby('customer_id')]
            
            # 並列処理実行
            with mp.Pool(processes=n_cores) as pool:
                results = pool.map(compute_customer_stats, customer_groups)
            
            result_df = pd.DataFrame(results)
            print(f"並列処理完了: {len(results)} 顧客の統計を {n_cores} コアで処理")
            
        return result_df
    
    def vectorized_string_operations(self, df: pd.DataFrame) -> pd.DataFrame:
        """文字列操作の高速化"""
        with self.memory_monitor("文字列操作最適化"):
            df_result = df.copy()
            
            # 非効率な方法(避けるべき)
            # df_result['category_upper'] = df_result['category'].apply(lambda x: x.upper())
            
            # 効率的な方法
            df_result['category_upper'] = df_result['category'].str.upper()
            df_result['category_length'] = df_result['category'].str.len()
            df_result['is_category_A'] = df_result['category'].eq('A').astype('int8')
            
            # 複合条件の効率的な処理
            mask = (df_result['sales_amount'] > 1000) & (df_result['category'].isin(['A', 'B']))
            df_result['high_value_AB'] = mask.astype('int8')
            
        return df_result
    
    def memory_efficient_joins(self, df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
        """メモリ効率的な結合処理"""
        with self.memory_monitor("効率的結合"):
            # 結合前のメモリ使用量チェック
            df1_memory = df1.memory_usage(deep=True).sum() / 1024 / 1024
            df2_memory = df2.memory_usage(deep=True).sum() / 1024 / 1024
            
            print(f"結合前メモリ: df1={df1_memory:.2f}MB, df2={df2_memory:.2f}MB")
            
            # 必要な列のみ選択してから結合
            df1_subset = df1[['customer_id', 'sales_amount', 'category']].copy()
            df2_subset = df2[['customer_id', 'customer_name', 'customer_tier']].copy()
            
            # インデックス最適化
            df1_subset.set_index('customer_id', inplace=True)
            df2_subset.set_index('customer_id', inplace=True)
            
            # 結合実行
            result = df1_subset.join(df2_subset, how='inner')
            
            # インデックスをリセット
            result.reset_index(inplace=True)
            
            result_memory = result.memory_usage(deep=True).sum() / 1024 / 1024
            print(f"結合後メモリ: {result_memory:.2f}MB")
            
        return result

# 大規模データ処理の実行例
big_data_processor = BigDataProcessor()

print("大規模データ処理最適化デモ")
print("=" * 40)

# サンプル大データの生成(実環境では既存ファイルを使用)
large_sample = pd.concat([df] * 10, ignore_index=True)  # 10倍に拡大
large_sample.to_csv('large_sample_data.csv', index=False)

# チャンク処理
try:
    chunk_result = big_data_processor.chunk_processor('large_sample_data.csv', chunk_size=50000)
    print(f"チャンク処理結果: {chunk_result.shape}")
except Exception as e:
    print(f"チャンク処理スキップ: {e}")

# 並列処理(小さなデータセットでデモ)
parallel_result = big_data_processor.parallel_processing_example(df, n_cores=2)
print(f"並列処理結果: {parallel_result.shape}")

# 文字列操作最適化
string_optimized = big_data_processor.vectorized_string_operations(df)
print(f"文字列操作最適化完了: {string_optimized.shape}")

# 効率的結合
customer_data = pd.DataFrame({
    'customer_id': range(1, 1001),
    'customer_name': [f'Customer_{i}' for i in range(1, 1001)],
    'customer_tier': np.random.choice(['Gold', 'Silver', 'Bronze'], 1000)
})

efficient_join = big_data_processor.memory_efficient_joins(df, customer_data)
print(f"効率的結合完了: {efficient_join.shape}")
[{"content": "Python \u30c7\u30fc\u30bf\u5206\u6790 2025\u5e74\u6700\u65b0\u30e9\u30a4\u30d6\u30e9\u30ea\u30fb\u30c4\u30fc\u30eb\u52d5\u5411\u3092WebSearch\u3067\u30ea\u30b5\u30fc\u30c1", "status": "completed", "priority": "high", "id": "43"}, {"content": "pandas\u30fbNumPy\u30fbMatplotlib\u7b49\u306e\u57fa\u790e\u304b\u3089\u5b9f\u8df5\u7684\u6d3b\u7528\u6cd5\u307e\u3067\u8abf\u67fb", "status": "completed", "priority": "high", "id": "44"}, {"content": "Jupyter\u30fbColab\u30fbVSCode\u7b49\u306e\u958b\u767a\u74b0\u58832025\u5e74\u30d9\u30b9\u30c8\u30d7\u30e9\u30af\u30c6\u30a3\u30b9\u8abf\u67fb", "status": "completed", "priority": "high", "id": "45"}, {"content": "Python\u6a5f\u68b0\u5b66\u7fd2\u30fb\u7d71\u8a08\u5206\u6790\u306e\u6700\u65b0\u624b\u6cd5\u30fb\u5b9f\u88c5\u30d1\u30bf\u30fc\u30f3\u3092\u8abf\u67fb", "status": "completed", "priority": "high", "id": "46"}, {"content": "Python\u30c7\u30fc\u30bf\u5206\u6790\u57fa\u790e\u5b8c\u5168\u30ac\u30a4\u30c9\u8a18\u4e8b\u3092\u4f5c\u6210\u30fb\u57f7\u7b46", "status": "completed", "priority": "high", "id": "47"}]

さらに理解を深める参考書

関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。

この記事をシェア

続けて読みたい記事

編集部がピックアップした関連記事で学びを広げましょう。

#RAG

RAG完全技術ガイド|GraphRAGから企業導入まで、検索拡張生成の実用実装と成功事例を徹底解説【2025年最新】

2025/8/5
#データサイエンティスト

データサイエンティスト完全ガイド|基本知識から実務応用まで、2025年最新動向・年収・転職市場・必須スキルを徹底解説【2025年最新】

2025/8/9
#Transformer

Transformer完全技術ガイド|注意機構から並列処理まで、AI革命を支えるアーキテクチャの仕組みを徹底解説【2025年最新】

2025/8/9
#AI

AIガバナンス・プラットフォーム実装ガイド - Python・MLOps完全版【2025年最新】

2025/8/14
#画像生成AI

画像生成AI完全ガイド|DALL-E・Midjourney・Stable Diffusion最新機能から商用利用・著作権まで包括解説【2025年最新】

2025/8/3
#Kotlin Multiplatform

Kotlin Multiplatform(KMP)実践ガイド:iOS/Androidアプリのコードを共通化する【2025年版】

2025/9/19