Pythonデータ分析完全実践ガイド|pandas・NumPy・Matplotlib実装からPolars最適化・統計分析まで、実務レベル技術を徹底解説【2025年最新】
「Pythonでのデータ分析を基礎から実務レベルまで体系的にマスターしたい」 「pandas・NumPyの効率的な使い方から最新のPolarsまで実装技術を学びたい」 「統計分析・可視化・機械学習の実践的コードを習得したい」
Pythonデータ分析は、データサイエンス分野において最も重要な基盤技術であり、2025年現在ではpandas 2.0の大幅な性能向上、Polarsの台頭による超高速データ処理、Jupyter環境の進化など、ツールとライブラリが急速に進歩しています。実務では単なるライブラリの使い方ではなく、大量データの効率的な処理、メモリ最適化、可読性の高いコード実装が求められます。
特に注目すべきは、Polarsライブラリの急速な普及です。従来のpandasと比較して10-100倍の処理速度向上を実現し、大規模データセットでの分析において圧倒的な優位性を示しています。また、pandas 2.0ではApache Arrowバックエンドの導入により、メモリ効率と処理速度が大幅に改善され、実務での使い勝手が劇的に向上しました。
本記事では、実際のビジネスデータを想定した実装例を通じて、Pythonデータ分析の実務技術を完全習得できる内容を提供します。
1. Python環境構築|2025年最新のベストプラクティス
1.1 開発環境の選択と最適化
Jupyter Lab vs VSCode vs Google Colab の実用比較
2025年現在、データ分析における開発環境は用途に応じて使い分けることが重要です:
# 環境構築の実装例
import sys
import platform
import pandas as pd
import numpy as np
import polars as pl
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Union, List, Dict, Optional
import warnings
warnings.filterwarnings('ignore')
def setup_analysis_environment() -> Dict[str, str]:
"""
データ分析環境の情報を取得し、最適な設定を適用する
Returns:
Dict[str, str]: 環境情報辞書
"""
env_info = {
'python_version': platform.python_version(),
'system': platform.system(),
'pandas_version': pd.__version__,
'numpy_version': np.__version__,
'polars_version': pl.__version__ if 'pl' in locals() else 'Not installed'
}
# matplotlib の日本語フォント設定
plt.rcParams['font.family'] = ['DejaVu Sans', 'Hiragino Sans', 'Yu Gothic', 'Meiryo']
plt.rcParams['axes.unicode_minus'] = False
# pandas の表示設定最適化
pd.set_option('display.max_columns', 20)
pd.set_option('display.max_rows', 100)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 50)
return env_info
# 環境情報の確認
env = setup_analysis_environment()
print("データ分析環境構築完了:")
for key, value in env.items():
print(f" {key}: {value}")推奨パッケージ管理(conda vs pip)
# conda環境の作成(推奨)
conda create -n data_analysis_2025 python=3.11
conda activate data_analysis_2025
# 必須ライブラリの一括インストール
conda install pandas=2.1.* numpy=1.24.* matplotlib=3.7.* seaborn=0.12.*
conda install scikit-learn=1.3.* jupyter=1.0.* jupyterlab=4.0.*
pip install polars[all]==0.20.* # Polarsは現在pipが推奨
# パフォーマンス向上ライブラリ
conda install numba=0.58.* bottleneck=1.3.*1.2 実務レベルのプロジェクト構造
data_analysis_project/
├── notebooks/ # Jupyter notebooks
│ ├── 01_data_exploration.ipynb
│ ├── 02_data_cleaning.ipynb
│ └── 03_analysis_modeling.ipynb
├── src/ # Python modules
│ ├── __init__.py
│ ├── data_loader.py
│ ├── preprocessor.py
│ └── analyzer.py
├── data/
│ ├── raw/ # 生データ
│ ├── processed/ # 前処理済みデータ
│ └── external/ # 外部データ
├── outputs/
│ ├── figures/ # 図表出力
│ └── reports/ # 分析レポート
├── tests/ # テストコード
├── requirements.txt # 依存関係
└── README.md # プロジェクト説明最短で課題解決する一冊
この記事の内容と高い親和性が確認できたベストマッチです。早めにチェックしておきましょう。
2. pandas実装技術|効率的なデータ操作と処理最適化
2.1 データ読み込みと基本操作
import pandas as pd
import numpy as np
from pathlib import Path
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataLoader:
"""プロダクション環境対応のデータローダー"""
def __init__(self, chunk_size: int = 50000, memory_threshold: float = 0.5):
self.chunk_size = chunk_size
self.memory_threshold = memory_threshold
def load_csv_optimized(self, file_path: str, **kwargs) -> pd.DataFrame:
"""メモリ効率を考慮したCSV読み込み"""
if not Path(file_path).exists():
raise FileNotFoundError(f"File not found: {file_path}")
file_size_gb = Path(file_path).stat().st_size / (1024**3)
logger.info(f"Loading file: {file_path} ({file_size_gb:.2f} GB)")
if file_size_gb > self.memory_threshold:
return self._load_large_file(file_path, **kwargs)
else:
df = pd.read_csv(file_path, **kwargs)
return self._optimize_dtypes(df)
def _load_large_file(self, file_path: str, **kwargs) -> pd.DataFrame:
"""大きなファイルのチャンク読み込み"""
chunks = []
total_rows = 0
try:
for i, chunk in enumerate(pd.read_csv(file_path, chunksize=self.chunk_size, **kwargs)):
chunk = self._optimize_dtypes(chunk)
chunks.append(chunk)
total_rows += len(chunk)
if i % 10 == 0:
logger.info(f"Processed {i+1} chunks, {total_rows:,} rows")
logger.info(f"Concatenating {len(chunks)} chunks...")
return pd.concat(chunks, ignore_index=True)
except Exception as e:
logger.error(f"Error during chunk processing: {e}")
raise
def _optimize_dtypes(self, df: pd.DataFrame) -> pd.DataFrame:
"""データ型の自動最適化"""
original_size = df.memory_usage(deep=True).sum()
# 整数型の最適化
int_cols = df.select_dtypes(include=['int64']).columns
for col in int_cols:
col_min, col_max = df[col].min(), df[col].max()
if col_min >= 0:
if col_max <= 255:
df[col] = df[col].astype('uint8')
elif col_max <= 65535:
df[col] = df[col].astype('uint16')
elif col_max <= 4294967295:
df[col] = df[col].astype('uint32')
else:
if col_min >= -128 and col_max <= 127:
df[col] = df[col].astype('int8')
elif col_min >= -32768 and col_max <= 32767:
df[col] = df[col].astype('int16')
elif col_min >= -2147483648 and col_max <= 2147483647:
df[col] = df[col].astype('int32')
# 浮動小数点型の最適化
float_cols = df.select_dtypes(include=['float64']).columns
for col in float_cols:
df[col] = pd.to_numeric(df[col], downcast='float')
# カテゴリカル変数の最適化
object_cols = df.select_dtypes(include=['object']).columns
for col in object_cols:
if df[col].dtype == 'object':
unique_ratio = df[col].nunique() / len(df)
if unique_ratio < 0.5:
df[col] = df[col].astype('category')
optimized_size = df.memory_usage(deep=True).sum()
reduction_pct = (1 - optimized_size / original_size) * 100
logger.info(f"Memory optimization: {reduction_pct:.1f}% reduction "
f"({original_size/1024/1024:.1f}MB → {optimized_size/1024/1024:.1f}MB)")
return df
# 実際のデータでテスト
np.random.seed(42)
sample_size = 1_000_000
# リアルなECデータの生成
sample_data = pd.DataFrame({
'order_id': range(1, sample_size + 1),
'customer_id': np.random.randint(1, 50000, sample_size),
'product_id': np.random.randint(1, 5000, sample_size),
'order_date': pd.date_range('2023-01-01', periods=sample_size, freq='30S'),
'quantity': np.random.poisson(2, sample_size) + 1,
'unit_price': np.random.lognormal(3, 0.5, sample_size).round(2),
'category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home', 'Sports'],
sample_size, p=[0.3, 0.25, 0.15, 0.2, 0.1]),
'region': np.random.choice(['US-East', 'US-West', 'EU', 'APAC'],
sample_size, p=[0.35, 0.25, 0.25, 0.15]),
'discount_rate': np.random.beta(1, 9, sample_size)
})
sample_data['total_amount'] = sample_data['quantity'] * sample_data['unit_price'] * (1 - sample_data['discount_rate'])
sample_data.to_csv('ecommerce_data.csv', index=False)
# データローダーのテスト
loader = DataLoader()
df = loader.load_csv_optimized('ecommerce_data.csv')
print(f"データ形状: {df.shape}")
print(f"メモリ使用量: {df.memory_usage(deep=True).sum() / 1024 / 1024:.1f}MB")
print(f"\nデータ型:")
print(df.dtypes)2.2 高度なデータ操作・集計技術
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')
class AdvancedDataProcessor:
"""実務レベルのデータ処理エンジン"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self._validate_dataframe()
def _validate_dataframe(self):
"""データフレームの妥当性チェック"""
if self.df.empty:
raise ValueError("DataFrame is empty")
required_cols = ['order_date', 'customer_id', 'total_amount']
missing_cols = [col for col in required_cols if col not in self.df.columns]
if missing_cols:
logger.warning(f"Missing recommended columns: {missing_cols}")
def customer_segmentation_analysis(self) -> pd.DataFrame:
"""RFM分析による顧客セグメンテーション"""
analysis_date = self.df['order_date'].max()
rfm = (self.df.groupby('customer_id')
.agg({
'order_date': lambda x: (analysis_date - x.max()).days,
'order_id': 'count',
'total_amount': 'sum'
})
.rename(columns={
'order_date': 'recency',
'order_id': 'frequency',
'total_amount': 'monetary'
}))
# RFMスコア計算(5点満点)
rfm['r_score'] = pd.qcut(rfm['recency'].rank(method='first'), 5, labels=range(5, 0, -1))
rfm['f_score'] = pd.qcut(rfm['frequency'].rank(method='first'), 5, labels=range(1, 6))
rfm['m_score'] = pd.qcut(rfm['monetary'].rank(method='first'), 5, labels=range(1, 6))
# セグメント分類
def classify_segment(row):
r, f, m = int(row['r_score']), int(row['f_score']), int(row['m_score'])
if r >= 4 and f >= 4 and m >= 4:
return 'Champions'
elif r >= 4 and f >= 3:
return 'Loyal Customers'
elif r >= 4 and f <= 2:
return 'Potential Loyalists'
elif r >= 3 and f >= 3 and m >= 3:
return 'New Customers'
elif r <= 2 and f >= 3:
return 'At Risk'
elif r <= 2 and f <= 2 and m >= 3:
return 'Cannot Lose Them'
elif r <= 2 and f <= 2 and m <= 2:
return 'Hibernating'
else:
return 'Others'
rfm['segment'] = rfm.apply(classify_segment, axis=1)
# セグメント統計
segment_stats = (rfm.groupby('segment')
.agg({
'recency': ['mean', 'count'],
'frequency': 'mean',
'monetary': 'mean'
})
.round(2))
segment_stats.columns = ['avg_recency', 'customer_count', 'avg_frequency', 'avg_monetary']
segment_stats['segment_value'] = (segment_stats['avg_frequency'] *
segment_stats['avg_monetary'] /
segment_stats['avg_recency']).round(2)
return rfm.reset_index(), segment_stats.reset_index()
def cohort_analysis(self) -> pd.DataFrame:
"""コホート分析による顧客リテンション分析"""
cohort_data = self.df.copy()
cohort_data['order_date'] = pd.to_datetime(cohort_data['order_date'])
cohort_data['order_month'] = cohort_data['order_date'].dt.to_period('M')
# 初回購入月の特定
cohort_data['cohort_month'] = (cohort_data.groupby('customer_id')['order_date']
.transform('min')
.dt.to_period('M'))
# 期間番号の計算
def get_period_number(df):
return (df['order_month'] - df['cohort_month']).apply(attrgetter('n'))
from operator import attrgetter
cohort_data['period_number'] = get_period_number(cohort_data)
# コホートテーブルの作成
cohort_table = (cohort_data.groupby(['cohort_month', 'period_number'])['customer_id']
.nunique()
.unstack(level=1)
.fillna(0))
# リテンション率の計算
cohort_sizes = cohort_data.groupby('cohort_month')['customer_id'].nunique()
retention_table = cohort_table.divide(cohort_sizes, axis=0).round(4)
return retention_table
def advanced_time_series_features(self) -> pd.DataFrame:
"""高度な時系列特徴量生成"""
ts_df = self.df.copy()
ts_df['order_date'] = pd.to_datetime(ts_df['order_date'])
# 時間特徴量
ts_df['year'] = ts_df['order_date'].dt.year
ts_df['month'] = ts_df['order_date'].dt.month
ts_df['day'] = ts_df['order_date'].dt.day
ts_df['weekday'] = ts_df['order_date'].dt.dayofweek
ts_df['hour'] = ts_df['order_date'].dt.hour
ts_df['is_weekend'] = (ts_df['weekday'] >= 5).astype(int)
ts_df['is_month_start'] = ts_df['order_date'].dt.is_month_start.astype(int)
ts_df['is_month_end'] = ts_df['order_date'].dt.is_month_end.astype(int)
# 季節性特徴量
ts_df['season'] = ts_df['month'].map({
12: 'Winter', 1: 'Winter', 2: 'Winter',
3: 'Spring', 4: 'Spring', 5: 'Spring',
6: 'Summer', 7: 'Summer', 8: 'Summer',
9: 'Fall', 10: 'Fall', 11: 'Fall'
})
# ビジネス特徴量
def get_business_quarter(month):
return f'Q{(month-1)//3 + 1}'
ts_df['quarter'] = ts_df['month'].apply(get_business_quarter)
# 時間帯分類
def classify_time_of_day(hour):
if 5 <= hour < 12:
return 'Morning'
elif 12 <= hour < 17:
return 'Afternoon'
elif 17 <= hour < 21:
return 'Evening'
else:
return 'Night'
ts_df['time_of_day'] = ts_df['hour'].apply(classify_time_of_day)
return ts_df
def calculate_business_metrics(self) -> Dict[str, float]:
"""重要なビジネス指標の計算"""
metrics = {}
# 基本指標
metrics['total_revenue'] = self.df['total_amount'].sum()
metrics['total_orders'] = len(self.df)
metrics['unique_customers'] = self.df['customer_id'].nunique()
metrics['avg_order_value'] = self.df['total_amount'].mean()
# 顧客あたり指標
customer_metrics = (self.df.groupby('customer_id')
.agg({
'total_amount': ['sum', 'count'],
'order_date': ['min', 'max']
}))
customer_metrics.columns = ['total_spent', 'order_count', 'first_order', 'last_order']
metrics['avg_customer_ltv'] = customer_metrics['total_spent'].mean()
metrics['avg_orders_per_customer'] = customer_metrics['order_count'].mean()
# リピート率
repeat_customers = (customer_metrics['order_count'] > 1).sum()
metrics['repeat_rate'] = repeat_customers / len(customer_metrics)
# 売上上位20%の顧客による売上割合(パレート分析)
customer_revenues = customer_metrics['total_spent'].sort_values(ascending=False)
top_20_pct_count = int(len(customer_revenues) * 0.2)
top_20_pct_revenue = customer_revenues.iloc[:top_20_pct_count].sum()
metrics['pareto_ratio'] = top_20_pct_revenue / metrics['total_revenue']
# 月次成長率
monthly_revenue = (self.df.groupby(pd.to_datetime(self.df['order_date']).dt.to_period('M'))
['total_amount'].sum().sort_index())
if len(monthly_revenue) > 1:
metrics['monthly_growth_rate'] = ((monthly_revenue.iloc[-1] / monthly_revenue.iloc[-2]) - 1) * 100
else:
metrics['monthly_growth_rate'] = 0
return metrics
# 実際のデータ処理実行
processor = AdvancedDataProcessor(df)
# 顧客セグメンテーション
rfm_data, segment_stats = processor.customer_segmentation_analysis()
print("顧客セグメンテーション結果:")
print(segment_stats.sort_values('segment_value', ascending=False))
# コホート分析
retention_table = processor.cohort_analysis()
print(f"\nコホート分析完了: {retention_table.shape[0]} 個のコホートを分析")
print("1ヶ月後リテンション率:")
print(retention_table[1].describe())
# 時系列特徴量生成
ts_features = processor.advanced_time_series_features()
print(f"\n時系列特徴量: {len(ts_features.columns)} 個の特徴量を生成")
# ビジネス指標
business_metrics = processor.calculate_business_metrics()
print("\n主要ビジネス指標:")
for metric, value in business_metrics.items():
if isinstance(value, float):
print(f" {metric}: {value:,.2f}")
else:
print(f" {metric}: {value}")2.3 パフォーマンス最適化技術
import time
import psutil
import gc
from contextlib import contextmanager
from typing import Callable, Any
import multiprocessing as mp
from functools import partial
import concurrent.futures
class ProductionPerformanceOptimizer:
"""本格的パフォーマンス最適化クラス"""
def __init__(self, df: pd.DataFrame):
self.df = df
self.process = psutil.Process()
@contextmanager
def performance_monitor(self, operation_name: str):
"""パフォーマンス監視コンテキストマネージャ"""
start_time = time.time()
start_memory = self.process.memory_info().rss / 1024 / 1024
cpu_start = self.process.cpu_percent()
logger.info(f"開始: {operation_name}")
try:
yield
finally:
end_time = time.time()
end_memory = self.process.memory_info().rss / 1024 / 1024
cpu_end = self.process.cpu_percent()
execution_time = end_time - start_time
memory_diff = end_memory - start_memory
logger.info(f"完了: {operation_name}")
logger.info(f" 実行時間: {execution_time:.3f}秒")
logger.info(f" メモリ使用量: {memory_diff:+.1f}MB")
logger.info(f" CPU使用率: {cpu_end:.1f}%")
def vectorized_vs_iterative_benchmark(self) -> Dict[str, Dict[str, float]]:
"""ベクトル化 vs 反復処理のベンチマーク"""
results = {}
sample_size = min(100000, len(self.df))
test_df = self.df.head(sample_size).copy()
# 1. 条件付き計算の比較
with self.performance_monitor("条件付き計算比較"):
# 反復処理(避けるべき方法)
start = time.time()
iterative_result = []
for _, row in test_df.iterrows():
if row['total_amount'] > test_df['total_amount'].median():
iterative_result.append(row['total_amount'] * 0.1)
else:
iterative_result.append(0)
iterative_time = time.time() - start
# ベクトル化処理(推奨方法)
start = time.time()
median_value = test_df['total_amount'].median()
vectorized_result = np.where(
test_df['total_amount'] > median_value,
test_df['total_amount'] * 0.1,
0
)
vectorized_time = time.time() - start
results['conditional_calculation'] = {
'iterative': iterative_time,
'vectorized': vectorized_time,
'speedup': iterative_time / vectorized_time if vectorized_time > 0 else 0
}
# 2. 集計処理の比較
with self.performance_monitor("集計処理比較"):
# apply使用(低効率)
start = time.time()
apply_result = test_df.groupby('category')['total_amount'].apply(
lambda x: x.sum() / x.count()
)
apply_time = time.time() - start
# 直接的な集計(高効率)
start = time.time()
direct_result = test_df.groupby('category')['total_amount'].mean()
direct_time = time.time() - start
results['aggregation'] = {
'apply_method': apply_time,
'direct_method': direct_time,
'speedup': apply_time / direct_time if direct_time > 0 else 0
}
return results
def memory_optimization_techniques(self) -> pd.DataFrame:
"""メモリ最適化技術の実装"""
with self.performance_monitor("メモリ最適化"):
# 元のメモリ使用量
original_memory = self.df.memory_usage(deep=True).sum() / 1024 / 1024
# 最適化されたコピーを作成
optimized_df = self.df.copy()
# 1. カテゴリカル変数の最適化
categorical_candidates = ['category', 'region']
for col in categorical_candidates:
if col in optimized_df.columns:
if optimized_df[col].dtype == 'object':
unique_ratio = optimized_df[col].nunique() / len(optimized_df)
if unique_ratio < 0.1: # ユニーク値が10%未満
optimized_df[col] = optimized_df[col].astype('category')
# 2. 数値型の最適化
for col in optimized_df.select_dtypes(include=['int64', 'float64']).columns:
if optimized_df[col].dtype == 'int64':
optimized_df[col] = pd.to_numeric(optimized_df[col], downcast='integer')
elif optimized_df[col].dtype == 'float64':
optimized_df[col] = pd.to_numeric(optimized_df[col], downcast='float')
# 3. 不要な列の削除(例:重複情報)
cols_to_check = ['order_id'] # 一意識別子は分析に不要な場合が多い
for col in cols_to_check:
if col in optimized_df.columns:
if optimized_df[col].nunique() == len(optimized_df): # 全て一意の場合
optimized_df = optimized_df.drop(columns=[col])
logger.info(f"削除された列: {col} (一意値のため分析不要)")
# メモリ削減効果の計算
optimized_memory = optimized_df.memory_usage(deep=True).sum() / 1024 / 1024
reduction_pct = (1 - optimized_memory / original_memory) * 100
logger.info(f"メモリ最適化結果:")
logger.info(f" 元のサイズ: {original_memory:.1f}MB")
logger.info(f" 最適化後: {optimized_memory:.1f}MB")
logger.info(f" 削減率: {reduction_pct:.1f}%")
return optimized_df
def parallel_processing_demo(self, n_processes: int = None) -> pd.DataFrame:
"""並列処理による高速化デモ"""
if n_processes is None:
n_processes = min(mp.cpu_count() - 1, 4)
def process_customer_group(customer_data: pd.DataFrame) -> Dict:
"""個別顧客グループの処理"""
customer_id = customer_data['customer_id'].iloc[0]
# 複雑な計算の例
total_spent = customer_data['total_amount'].sum()
avg_order = customer_data['total_amount'].mean()
order_frequency = len(customer_data)
# 時系列特徴量
if 'order_date' in customer_data.columns:
customer_data['order_date'] = pd.to_datetime(customer_data['order_date'])
date_range = (customer_data['order_date'].max() - customer_data['order_date'].min()).days
# 購買パターン分析
hourly_pattern = customer_data['order_date'].dt.hour.mode().iloc[0] if len(customer_data) > 0 else 12
weekend_ratio = (customer_data['order_date'].dt.dayofweek >= 5).mean()
else:
date_range = 0
hourly_pattern = 12
weekend_ratio = 0.0
return {
'customer_id': customer_id,
'total_spent': total_spent,
'avg_order_value': avg_order,
'order_frequency': order_frequency,
'customer_lifetime_days': date_range,
'preferred_hour': hourly_pattern,
'weekend_shopper_ratio': weekend_ratio,
'value_score': (total_spent * order_frequency) / max(date_range, 1)
}
with self.performance_monitor("並列処理実行"):
# データをグループ化
customer_groups = [group for _, group in self.df.groupby('customer_id')]
# 並列処理実行
with concurrent.futures.ProcessPoolExecutor(max_workers=n_processes) as executor:
results = list(executor.map(process_customer_group, customer_groups))
result_df = pd.DataFrame(results)
logger.info(f"並列処理完了: {len(results)} 顧客を {n_processes} プロセスで処理")
return result_df
def efficient_join_strategies(self, lookup_data: pd.DataFrame) -> pd.DataFrame:
"""効率的な結合戦略"""
with self.performance_monitor("効率的結合"):
# 1. インデックス設定による高速化
if 'customer_id' in self.df.columns and 'customer_id' in lookup_data.columns:
# インデックス設定
df_indexed = self.df.set_index('customer_id')
lookup_indexed = lookup_data.set_index('customer_id')
# 結合実行
joined = df_indexed.join(lookup_indexed, how='inner', rsuffix='_lookup')
joined = joined.reset_index()
logger.info(f"インデックス結合完了: {joined.shape}")
# 2. 結合後の重複削除と最適化
if joined.duplicated().any():
original_size = len(joined)
joined = joined.drop_duplicates()
logger.info(f"重複削除: {original_size - len(joined)} 行を削除")
return joined
else:
logger.warning("customer_id列が見つかりません")
return self.df
# 実際の最適化実行
optimizer = ProductionPerformanceOptimizer(df)
# ベンチマーク実行
benchmark_results = optimizer.vectorized_vs_iterative_benchmark()
print("パフォーマンスベンチマーク結果:")
for operation, metrics in benchmark_results.items():
print(f"\n{operation}:")
for method, time_val in metrics.items():
if method != 'speedup':
print(f" {method}: {time_val:.4f}秒")
else:
print(f" 高速化倍率: {time_val:.1f}倍")
# メモリ最適化
optimized_df = optimizer.memory_optimization_techniques()
# 並列処理(小さなデータセットでテスト)
customer_analysis = optimizer.parallel_processing_demo(n_processes=2)
print(f"\n並列処理結果: {customer_analysis.shape}")
print("上位5顧客の価値スコア:")
print(customer_analysis.nlargest(5, 'value_score')[['customer_id', 'value_score', 'total_spent']])
# メモリクリーンアップ
gc.collect()さらに理解を深める参考書
関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。
3. Polars実装|企業レベル高速データ処理
3.1 プロダクション環境でのPolars導入戦略
import polars as pl
import time
import psutil
import json
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import boto3
from contextlib import contextmanager
@dataclass
class BenchmarkResult:
"""ベンチマーク結果の構造化データ"""
operation: str
pandas_time: float
polars_time: float
speedup_factor: float
memory_reduction_pct: float
data_size: int
class EnterprisePolarsAnalyzer:
"""エンタープライズレベルのPolars分析エンジン"""
def __init__(self, config_path: Optional[str] = None):
self.config = self._load_config(config_path)
self.benchmarks: List[BenchmarkResult] = []
self.process = psutil.Process()
def _load_config(self, config_path: Optional[str]) -> Dict:
"""設定ファイルの読み込み"""
default_config = {
"chunk_size": 100000,
"max_memory_gb": 8,
"parallel_workers": min(psutil.cpu_count() - 1, 8),
"lazy_evaluation": True,
"streaming": True
}
if config_path and Path(config_path).exists():
with open(config_path) as f:
user_config = json.load(f)
default_config.update(user_config)
return default_config
@contextmanager
def resource_monitor(self, operation_name: str):
"""リソース使用量の詳細監視"""
start_time = time.time()
start_memory = self.process.memory_info().rss / 1024 / 1024
start_cpu = psutil.cpu_percent(interval=None)
try:
yield
finally:
end_time = time.time()
end_memory = self.process.memory_info().rss / 1024 / 1024
end_cpu = psutil.cpu_percent(interval=None)
execution_time = end_time - start_time
peak_memory = end_memory
avg_cpu = (start_cpu + end_cpu) / 2
logger.info(f"{operation_name} 完了:")
logger.info(f" 実行時間: {execution_time:.3f}秒")
logger.info(f" ピークメモリ: {peak_memory:.1f}MB")
logger.info(f" 平均CPU使用率: {avg_cpu:.1f}%")
def comprehensive_benchmark(self, df_pandas: pd.DataFrame) -> List[BenchmarkResult]:
"""包括的なパフォーマンスベンチマーク"""
# データをPolarsに変換
df_polars = pl.from_pandas(df_pandas)
data_size = len(df_pandas)
benchmarks = []
# 1. フィルタリング + 集計のベンチマーク
with self.resource_monitor("複合フィルタリング-pandas"):
start = time.time()
pandas_result = (df_pandas
.query('total_amount > @df_pandas.total_amount.quantile(0.7)')
.groupby('category')
.agg({
'total_amount': ['sum', 'mean', 'count'],
'quantity': 'sum',
'customer_id': 'nunique'
}))
pandas_time = time.time() - start
with self.resource_monitor("複合フィルタリング-polars"):
start = time.time()
quantile_75 = df_polars.select(pl.col('total_amount').quantile(0.7)).item()
polars_result = (df_polars
.filter(pl.col('total_amount') > quantile_75)
.groupby('category')
.agg([
pl.col('total_amount').sum().alias('total_sum'),
pl.col('total_amount').mean().alias('total_mean'),
pl.col('total_amount').count().alias('total_count'),
pl.col('quantity').sum().alias('quantity_sum'),
pl.col('customer_id').n_unique().alias('unique_customers')
]))
polars_time = time.time() - start
benchmarks.append(BenchmarkResult(
operation="複合フィルタリング・集計",
pandas_time=pandas_time,
polars_time=polars_time,
speedup_factor=pandas_time / polars_time,
memory_reduction_pct=self._calculate_memory_reduction(df_pandas, df_polars),
data_size=data_size
))
# 2. ウィンドウ関数のベンチマーク
with self.resource_monitor("ウィンドウ関数-pandas"):
start = time.time()
df_pandas_copy = df_pandas.copy()
df_pandas_copy['running_total'] = (df_pandas_copy
.sort_values(['customer_id', 'order_date'])
.groupby('customer_id')['total_amount']
.cumsum())
df_pandas_copy['customer_rank'] = (df_pandas_copy
.groupby('customer_id')['total_amount']
.rank(ascending=False))
pandas_time = time.time() - start
with self.resource_monitor("ウィンドウ関数-polars"):
start = time.time()
polars_result = (df_polars
.sort(['customer_id', 'order_date'])
.with_columns([
pl.col('total_amount').cumsum().over('customer_id').alias('running_total'),
pl.col('total_amount').rank(descending=True).over('customer_id').alias('customer_rank')
]))
polars_time = time.time() - start
benchmarks.append(BenchmarkResult(
operation="ウィンドウ関数",
pandas_time=pandas_time,
polars_time=polars_time,
speedup_factor=pandas_time / polars_time,
memory_reduction_pct=0, # 同じデータサイズなので省略
data_size=data_size
))
return benchmarks
def _calculate_memory_reduction(self, df_pandas: pd.DataFrame, df_polars: pl.DataFrame) -> float:
"""メモリ使用量削減率の計算"""
pandas_memory = df_pandas.memory_usage(deep=True).sum()
# Polarsメモリ使用量の推定(実際のAPIは異なる場合があります)
polars_memory = len(df_polars) * len(df_polars.columns) * 8 # 簡易推定
return (1 - polars_memory / pandas_memory) * 100 if pandas_memory > 0 else 0
def lazy_evaluation_pipeline(self, file_path: str) -> pl.DataFrame:
"""遅延評価を活用した効率的なデータパイプライン"""
# 遅延評価でクエリを構築
lazy_query = (pl.scan_csv(file_path) # ファイルスキャン(遅延)
.filter(pl.col('total_amount') > 0)
.with_columns([
# 特徴量エンジニアリング
(pl.col('total_amount') / pl.col('quantity')).alias('unit_price'),
pl.col('order_date').str.strptime(pl.Date, '%Y-%m-%d').alias('parsed_date'),
(pl.col('total_amount') > pl.col('total_amount').quantile(0.8)).alias('high_value_order')
])
.with_columns([
pl.col('parsed_date').dt.weekday().alias('weekday'),
pl.col('parsed_date').dt.month().alias('month'),
pl.col('parsed_date').dt.quarter().alias('quarter')
])
.groupby(['category', 'region', 'quarter'])
.agg([
pl.col('total_amount').sum().alias('total_revenue'),
pl.col('total_amount').mean().alias('avg_order_value'),
pl.col('customer_id').n_unique().alias('unique_customers'),
pl.col('high_value_order').sum().alias('high_value_count'),
pl.col('unit_price').std().alias('price_volatility')
])
.sort(['total_revenue'], descending=True))
# この時点まで実際のデータ処理は行われない
logger.info("遅延評価クエリ構築完了 - まだデータは処理されていません")
# collect()で実際の計算を実行
with self.resource_monitor("遅延評価実行"):
result = lazy_query.collect()
return result
def streaming_aggregation(self, file_path: str, chunk_size: int = 100000) -> pl.DataFrame:
"""ストリーミング処理による大規模データ集計"""
aggregated_results = []
# チャンクごとに処理
for chunk_df in pl.read_csv_batched(file_path, batch_size=chunk_size):
chunk_agg = (chunk_df
.groupby('category')
.agg([
pl.col('total_amount').sum().alias('chunk_sum'),
pl.col('total_amount').count().alias('chunk_count'),
pl.col('customer_id').n_unique().alias('chunk_unique_customers')
]))
aggregated_results.append(chunk_agg)
# チャンク結果を最終集計
if aggregated_results:
final_result = (pl.concat(aggregated_results)
.groupby('category')
.agg([
pl.col('chunk_sum').sum().alias('total_amount'),
pl.col('chunk_count').sum().alias('total_orders'),
pl.col('chunk_unique_customers').sum().alias('total_unique_customers') # 重複あり
]))
return final_result
return pl.DataFrame()
def export_benchmark_report(self, output_path: str):
"""ベンチマーク結果のレポート出力"""
if not self.benchmarks:
logger.warning("ベンチマークデータがありません")
return
report = {
"benchmark_summary": {
"total_operations": len(self.benchmarks),
"avg_speedup": sum(b.speedup_factor for b in self.benchmarks) / len(self.benchmarks),
"max_speedup": max(b.speedup_factor for b in self.benchmarks),
"data_sizes_tested": list(set(b.data_size for b in self.benchmarks))
},
"detailed_results": [
{
"operation": b.operation,
"pandas_time_sec": b.pandas_time,
"polars_time_sec": b.polars_time,
"speedup_factor": b.speedup_factor,
"memory_reduction_pct": b.memory_reduction_pct,
"data_size": b.data_size
}
for b in self.benchmarks
]
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
logger.info(f"ベンチマークレポートを出力: {output_path}")
# 実際の企業レベル使用例
analyzer = EnterprisePolarsAnalyzer()
# パフォーマンステストの実行
if 'df' in locals():
benchmarks = analyzer.comprehensive_benchmark(df)
analyzer.benchmarks.extend(benchmarks)
print("Polars vs pandas エンタープライズベンチマーク結果:")
print("=" * 60)
for benchmark in benchmarks:
print(f"\n操作: {benchmark.operation}")
print(f" データサイズ: {benchmark.data_size:,} 行")
print(f" pandas: {benchmark.pandas_time:.4f}秒")
print(f" Polars: {benchmark.polars_time:.4f}秒")
print(f" 高速化倍率: {benchmark.speedup_factor:.1f}倍")
if benchmark.memory_reduction_pct > 0:
print(f" メモリ削減: {benchmark.memory_reduction_pct:.1f}%")
# サンプルCSVで遅延評価テスト
if Path('ecommerce_data.csv').exists():
lazy_result = analyzer.lazy_evaluation_pipeline('ecommerce_data.csv')
print(f"\n遅延評価パイプライン結果: {lazy_result.shape}")
print("上位5カテゴリ・地域の売上:")
print(lazy_result.head())
# ベンチマークレポートの出力
analyzer.export_benchmark_report('polars_benchmark_report.json')3.2 Polars実践的実装パターン
from typing import Protocol
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
from dataclasses import dataclass
from enum import Enum
class DataQualityLevel(Enum):
"""データ品質レベル"""
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
@dataclass
class QueryPlan:
"""クエリ実行プランの定義"""
query_id: str
estimated_memory_mb: float
estimated_time_sec: float
parallelizable: bool
dependencies: List[str]
class ProductionPolarsEngine:
"""プロダクション対応Polarsエンジン"""
def __init__(self, max_memory_gb: float = 4.0, enable_caching: bool = True):
self.max_memory_gb = max_memory_gb
self.enable_caching = enable_caching
self.query_cache: Dict[str, pl.DataFrame] = {}
self.execution_stats: Dict[str, Dict] = {}
def _generate_query_hash(self, query_str: str) -> str:
"""クエリのハッシュ値生成(キャッシュキーとして使用)"""
return hashlib.md5(query_str.encode()).hexdigest()[:12]
def _estimate_memory_usage(self, df: pl.DataFrame, operation: str) -> float:
"""メモリ使用量の推定"""
base_size = len(df) * len(df.columns) * 8 / 1024 / 1024 # MB
memory_multipliers = {
'join': 1.5,
'groupby': 1.2,
'window': 2.0,
'sort': 1.8,
'pivot': 3.0
}
return base_size * memory_multipliers.get(operation, 1.0)
def advanced_time_series_analysis(self, df: pl.DataFrame, date_col: str = 'order_date') -> pl.DataFrame:
"""高度な時系列分析パイプライン"""
query_hash = self._generate_query_hash(f"time_series_{date_col}")
if self.enable_caching and query_hash in self.query_cache:
logger.info(f"キャッシュからクエリ結果を取得: {query_hash}")
return self.query_cache[query_hash]
start_time = time.time()
# 時系列特徴量の包括的生成
result = (df
.with_columns([
pl.col(date_col).str.strptime(pl.Datetime).alias('parsed_date')
])
.with_columns([
# 基本時間特徴量
pl.col('parsed_date').dt.year().alias('year'),
pl.col('parsed_date').dt.month().alias('month'),
pl.col('parsed_date').dt.day().alias('day'),
pl.col('parsed_date').dt.weekday().alias('weekday'),
pl.col('parsed_date').dt.hour().alias('hour'),
# ビジネス特徴量
pl.col('parsed_date').dt.is_in([6, 7]).alias('is_weekend'),
pl.col('parsed_date').dt.month().is_in([12, 1, 2]).alias('is_winter'),
(pl.col('parsed_date').dt.day() <= 7).alias('is_month_start'),
# 季節性特徴量
(pl.col('parsed_date').dt.month() % 12 / 3).round(0).alias('season'),
# 時系列ラグ特徴量
pl.col('total_amount').shift(1).over('customer_id').alias('prev_order_amount'),
pl.col('total_amount').shift(7).over('customer_id').alias('week_ago_amount'),
# 移動統計
pl.col('total_amount').rolling_mean(window_size=7, by='parsed_date').alias('ma_7d'),
pl.col('total_amount').rolling_std(window_size=30, by='parsed_date').alias('std_30d'),
# パーセンタイルベースの特徴量
(pl.col('total_amount') > pl.col('total_amount').quantile(0.75).over('category')).alias('is_top_quartile'),
# 顧客行動パターン
pl.col('parsed_date').diff().dt.total_days().over('customer_id').alias('days_since_last_order'),
pl.col('total_amount').count().over(['customer_id', pl.col('parsed_date').dt.month()]).alias('monthly_frequency')
])
.with_columns([
# 二次特徴量(他の特徴量から生成)
(pl.col('total_amount') / pl.col('ma_7d')).alias('amount_to_avg_ratio'),
(pl.col('days_since_last_order') > 30).alias('is_returning_customer'),
# 顧客ライフサイクル特徴量
pl.when(pl.col('monthly_frequency') == 1)
.then(pl.lit('New'))
.when(pl.col('monthly_frequency') <= 3)
.then(pl.lit('Occasional'))
.otherwise(pl.lit('Frequent'))
.alias('customer_lifecycle_stage')
]))
execution_time = time.time() - start_time
self.execution_stats[query_hash] = {
'execution_time': execution_time,
'rows_processed': len(result),
'columns_generated': len(result.columns) - len(df.columns)
}
if self.enable_caching:
self.query_cache[query_hash] = result
logger.info(f"時系列分析完了: {execution_time:.2f}秒, {len(result.columns)} 特徴量生成")
return result
def customer_lifetime_value_analysis(self, df: pl.DataFrame) -> pl.DataFrame:
"""顧客生涯価値の高度分析"""
# CLV計算のための複雑な集計
clv_analysis = (df
.groupby('customer_id')
.agg([
# 基本指標
pl.col('total_amount').sum().alias('total_spent'),
pl.col('total_amount').mean().alias('avg_order_value'),
pl.col('total_amount').count().alias('order_frequency'),
pl.col('order_date').min().alias('first_order_date'),
pl.col('order_date').max().alias('last_order_date'),
# 高度指標
pl.col('total_amount').std().alias('spending_volatility'),
pl.col('category').n_unique().alias('category_diversity'),
pl.col('total_amount').max().alias('max_single_order'),
# 時系列指標
(pl.col('order_date').max() - pl.col('order_date').min()).alias('customer_lifespan_days')
])
.with_columns([
# CLV計算
(pl.col('total_spent') / pl.col('customer_lifespan_days').dt.total_days().replace(0, 1) * 365).alias('annual_value'),
(pl.col('order_frequency') / pl.col('customer_lifespan_days').dt.total_days().replace(0, 1) * 30).alias('monthly_frequency'),
# リスクスコア
(1 / (1 + pl.col('spending_volatility') / pl.col('avg_order_value'))).alias('stability_score'),
# セグメンテーション
pl.when(pl.col('total_spent') > pl.col('total_spent').quantile(0.8))
.then(pl.lit('High_Value'))
.when(pl.col('order_frequency') > pl.col('order_frequency').quantile(0.8))
.then(pl.lit('High_Frequency'))
.when(pl.col('category_diversity') >= 3)
.then(pl.lit('Diverse_Shopper'))
.otherwise(pl.lit('Standard'))
.alias('customer_segment')
]))
return clv_analysis
def parallel_aggregation_pipeline(self, df: pl.DataFrame, group_cols: List[str]) -> pl.DataFrame:
"""並列処理による高速集計パイプライン"""
def create_aggregation_query(group_col: str) -> pl.LazyFrame:
"""個別集計クエリの生成"""
return (df.lazy()
.groupby(group_col)
.agg([
pl.col('total_amount').sum().alias(f'total_by_{group_col}'),
pl.col('total_amount').mean().alias(f'avg_by_{group_col}'),
pl.col('customer_id').n_unique().alias(f'unique_customers_by_{group_col}'),
pl.col('quantity').sum().alias(f'total_quantity_by_{group_col}')
])
.with_columns([
pl.lit(group_col).alias('group_type')
]))
# 並列でクエリを実行
with ThreadPoolExecutor(max_workers=min(len(group_cols), 4)) as executor:
future_to_group = {
executor.submit(create_aggregation_query(group_col).collect): group_col
for group_col in group_cols
}
results = []
for future in as_completed(future_to_group):
group_col = future_to_group[future]
try:
result = future.result()
results.append(result)
logger.info(f"並列集計完了: {group_col}")
except Exception as exc:
logger.error(f"並列集計エラー {group_col}: {exc}")
# 結果を統合
if results:
combined_result = pl.concat(results, how="diagonal")
return combined_result
else:
return pl.DataFrame()
def data_quality_assessment(self, df: pl.DataFrame) -> Dict[str, DataQualityLevel]:
"""データ品質の自動評価"""
quality_scores = {}
for column in df.columns:
col_data = df.select(pl.col(column))
# 欠損率
null_ratio = col_data.null_count().item() / len(df)
# ユニーク率
if df[column].dtype in [pl.String, pl.Categorical]:
unique_ratio = col_data.n_unique() / len(df) if len(df) > 0 else 0
else:
unique_ratio = 1.0 # 数値列は常に高品質とみなす
# 品質スコア計算
score = (1 - null_ratio) * 0.7 + unique_ratio * 0.3
if score >= 0.8:
quality_scores[column] = DataQualityLevel.HIGH
elif score >= 0.5:
quality_scores[column] = DataQualityLevel.MEDIUM
else:
quality_scores[column] = DataQualityLevel.LOW
return quality_scores
def generate_optimization_report(self) -> Dict:
"""最適化レポートの生成"""
return {
'execution_statistics': self.execution_stats,
'cache_hit_ratio': len(self.query_cache) / (len(self.execution_stats) + 1),
'memory_usage_mb': sum(self._estimate_memory_usage(df, 'cache') for df in self.query_cache.values()),
'total_queries_executed': len(self.execution_stats),
'avg_execution_time': sum(stat['execution_time'] for stat in self.execution_stats.values()) / len(self.execution_stats) if self.execution_stats else 0
}
# プロダクションレベルの使用例
engine = ProductionPolarsEngine(max_memory_gb=8.0, enable_caching=True)
if 'df' in locals():
try:
df_polars = pl.from_pandas(df)
# 時系列分析の実行
ts_analysis = engine.advanced_time_series_analysis(df_polars)
print(f"時系列特徴量生成完了: {ts_analysis.shape}")
# CLV分析の実行
clv_analysis = engine.customer_lifetime_value_analysis(df_polars)
print(f"CLV分析完了: {clv_analysis.shape}")
print("上位5顧客の年間価値:")
print(clv_analysis.sort('annual_value', descending=True).head().to_pandas())
# 並列集計の実行
parallel_results = engine.parallel_aggregation_pipeline(df_polars, ['category', 'region'])
print(f"並列集計完了: {parallel_results.shape}")
# データ品質評価
quality_assessment = engine.data_quality_assessment(df_polars)
print("\nデータ品質評価:")
for col, level in quality_assessment.items():
print(f" {col}: {level.value}")
# 最適化レポート
optimization_report = engine.generate_optimization_report()
print(f"\n最適化レポート:")
print(f" 実行クエリ数: {optimization_report['total_queries_executed']}")
print(f" 平均実行時間: {optimization_report['avg_execution_time']:.3f}秒")
print(f" キャッシュメモリ使用量: {optimization_report['memory_usage_mb']:.1f}MB")
except Exception as e:
logger.error(f"Polars処理エラー: {e}")
print("Polarsの処理でエラーが発生しました。pandas版で続行します。")さらに理解を深める参考書
関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。
4. データ可視化実装|エンタープライズ可視化エンジン
4.1 プロダクション対応可視化システム
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.dates as mdates
from matplotlib.backends.backend_pdf import PdfPages
from matplotlib.patches import Rectangle, FancyBboxPatch
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.offline as pyo
from typing import Union, Optional, Callable
from pathlib import Path
import base64
from io import BytesIO
import warnings
warnings.filterwarnings('ignore', category=FutureWarning)
class EnterpriseVisualizationEngine:
"""エンタープライズレベル可視化エンジン"""
def __init__(self,
brand_colors: Optional[Dict[str, str]] = None,
output_format: str = 'both', # 'static', 'interactive', 'both'
dpi: int = 300):
self.brand_colors = brand_colors or self._get_default_palette()
self.output_format = output_format
self.dpi = dpi
self.figures_created = []
self._setup_plotting_environment()
def _get_default_palette(self) -> Dict[str, str]:
"""デフォルトカラーパレット(企業ブランドガイドライン対応)"""
return {
'primary': '#2E86C1',
'secondary': '#E74C3C',
'success': '#28B463',
'warning': '#F39C12',
'danger': '#E74C3C',
'info': '#17A2B8',
'light': '#F8F9FA',
'dark': '#343A40',
'accent1': '#9B59B6',
'accent2': '#E67E22'
}
def _setup_plotting_environment(self):
"""プロット環境の最適化設定"""
# 日本語フォント設定(OS別対応)
import platform
system = platform.system()
if system == 'Darwin': # macOS
plt.rcParams['font.family'] = ['Hiragino Sans', 'DejaVu Sans']
elif system == 'Windows':
plt.rcParams['font.family'] = ['Yu Gothic', 'DejaVu Sans']
else: # Linux
plt.rcParams['font.family'] = ['Noto Sans CJK JP', 'DejaVu Sans']
# プロダクション品質設定
plt.rcParams.update({
'figure.dpi': self.dpi,
'savefig.dpi': self.dpi,
'font.size': 11,
'axes.titlesize': 14,
'axes.labelsize': 12,
'xtick.labelsize': 10,
'ytick.labelsize': 10,
'legend.fontsize': 10,
'figure.titlesize': 16,
'axes.grid': True,
'grid.alpha': 0.3,
'axes.spines.top': False,
'axes.spines.right': False,
'figure.facecolor': 'white',
'axes.facecolor': 'white'
})
def create_executive_dashboard(self, df: pd.DataFrame) -> Union[plt.Figure, go.Figure]:
"""エグゼクティブ向け統合ダッシュボード"""
if self.output_format in ['static', 'both']:
static_fig = self._create_static_executive_dashboard(df)
if self.output_format in ['interactive', 'both']:
interactive_fig = self._create_interactive_executive_dashboard(df)
if self.output_format == 'static':
return static_fig
elif self.output_format == 'interactive':
return interactive_fig
else:
return static_fig, interactive_fig
def _create_static_executive_dashboard(self, df: pd.DataFrame) -> plt.Figure:
"""静的エグゼクティブダッシュボード"""
fig = plt.figure(figsize=(20, 14))
gs = fig.add_gridspec(4, 4, hspace=0.3, wspace=0.3)
# メインタイトルとサマリー
fig.suptitle('Executive Sales Dashboard',
fontsize=24, fontweight='bold', y=0.95)
# KPI サマリーカード
kpi_ax = fig.add_subplot(gs[0, :])
self._create_kpi_cards(kpi_ax, df)
# 売上トレンド(時系列)
trend_ax = fig.add_subplot(gs[1, :2])
self._create_revenue_trend(trend_ax, df)
# 地域別パフォーマンス
region_ax = fig.add_subplot(gs[1, 2:])
self._create_regional_performance(region_ax, df)
# 顧客セグメント分析
segment_ax = fig.add_subplot(gs[2, :2])
self._create_customer_segmentation(segment_ax, df)
# 商品カテゴリ分析
category_ax = fig.add_subplot(gs[2, 2:])
self._create_category_analysis(category_ax, df)
# 予測とトレンド
forecast_ax = fig.add_subplot(gs[3, :])
self._create_forecast_visualization(forecast_ax, df)
return fig
def _create_kpi_cards(self, ax: plt.Axes, df: pd.DataFrame):
"""KPIカードの作成"""
ax.axis('off')
# KPI計算
total_revenue = df['total_amount'].sum()
total_orders = len(df)
avg_order_value = df['total_amount'].mean()
unique_customers = df['customer_id'].nunique()
# 前期比計算(模擬データ)
current_month = df['order_date'].max()
prev_month_data = df[pd.to_datetime(df['order_date']) < current_month - pd.DateOffset(months=1)]
if not prev_month_data.empty:
revenue_growth = ((total_revenue - prev_month_data['total_amount'].sum()) /
prev_month_data['total_amount'].sum() * 100)
else:
revenue_growth = 0
kpis = [
('総売上', f'¥{total_revenue:,.0f}', f'{revenue_growth:+.1f}%', self.brand_colors['primary']),
('注文数', f'{total_orders:,}', '+5.2%', self.brand_colors['success']),
('平均注文額', f'¥{avg_order_value:,.0f}', '+2.1%', self.brand_colors['warning']),
('顧客数', f'{unique_customers:,}', '+8.3%', self.brand_colors['info'])
]
card_width = 0.22
card_height = 0.8
for i, (title, value, change, color) in enumerate(kpis):
x = i * 0.25 + 0.02
# カード背景
card = FancyBboxPatch((x, 0.1), card_width, card_height,
boxstyle="round,pad=0.02",
facecolor=color, alpha=0.1,
edgecolor=color, linewidth=2)
ax.add_patch(card)
# テキスト
ax.text(x + card_width/2, 0.7, title, ha='center', va='center',
fontsize=12, fontweight='bold', color='black')
ax.text(x + card_width/2, 0.5, value, ha='center', va='center',
fontsize=16, fontweight='bold', color=color)
ax.text(x + card_width/2, 0.3, change, ha='center', va='center',
fontsize=10, color=self.brand_colors['success'] if '+' in change else self.brand_colors['danger'])
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
def _create_revenue_trend(self, ax: plt.Axes, df: pd.DataFrame):
"""売上トレンド可視化"""
# 日別売上集計
daily_revenue = (df.groupby(pd.to_datetime(df['order_date']).dt.date)
['total_amount'].sum().reset_index())
daily_revenue.columns = ['date', 'revenue']
daily_revenue['date'] = pd.to_datetime(daily_revenue['date'])
# トレンドライン
ax.plot(daily_revenue['date'], daily_revenue['revenue'],
color=self.brand_colors['primary'], linewidth=3, alpha=0.8)
# 移動平均
daily_revenue['ma7'] = daily_revenue['revenue'].rolling(7).mean()
ax.plot(daily_revenue['date'], daily_revenue['ma7'],
color=self.brand_colors['secondary'], linewidth=2,
linestyle='--', label='7日移動平均')
# 面グラフ
ax.fill_between(daily_revenue['date'], daily_revenue['revenue'],
alpha=0.3, color=self.brand_colors['primary'])
# 最高売上日をハイライト
max_revenue_day = daily_revenue.loc[daily_revenue['revenue'].idxmax()]
ax.scatter(max_revenue_day['date'], max_revenue_day['revenue'],
color=self.brand_colors['accent1'], s=100, zorder=5)
ax.annotate(f'最高売上: ¥{max_revenue_day["revenue"]:,.0f}',
xy=(max_revenue_day['date'], max_revenue_day['revenue']),
xytext=(10, 10), textcoords='offset points',
bbox=dict(boxstyle='round,pad=0.3', facecolor='yellow', alpha=0.7),
arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=0'))
ax.set_title('売上トレンド分析', fontsize=14, fontweight='bold')
ax.set_xlabel('日付')
ax.set_ylabel('売上金額 (¥)')
ax.legend()
# 日付フォーマット
ax.xaxis.set_major_formatter(mdates.DateFormatter('%m/%d'))
ax.xaxis.set_major_locator(mdates.DayLocator(interval=7))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
def _create_regional_performance(self, ax: plt.Axes, df: pd.DataFrame):
"""地域別パフォーマンス可視化"""
region_performance = (df.groupby('region')
.agg({
'total_amount': ['sum', 'mean', 'count'],
'customer_id': 'nunique'
})
.round(0))
region_performance.columns = ['total_sales', 'avg_order', 'order_count', 'customers']
region_performance = region_performance.sort_values('total_sales', ascending=True)
# 水平棒グラフ
bars = ax.barh(region_performance.index, region_performance['total_sales'],
color=[self.brand_colors['primary'], self.brand_colors['secondary'],
self.brand_colors['success'], self.brand_colors['warning']][:len(region_performance)])
# 値をバーに表示
for bar in bars:
width = bar.get_width()
ax.text(width, bar.get_y() + bar.get_height()/2,
f'¥{width:,.0f}', ha='left', va='center', fontweight='bold')
ax.set_title('地域別売上パフォーマンス', fontsize=14, fontweight='bold')
ax.set_xlabel('売上金額 (¥)')
# パフォーマンス指標の追加表示
for i, (region, row) in enumerate(region_performance.iterrows()):
ax.text(row['total_sales'] * 0.05, i,
f"顧客数: {row['customers']:.0f}\n平均: ¥{row['avg_order']:,.0f}",
va='center', fontsize=8, color='white', fontweight='bold')
def _create_interactive_executive_dashboard(self, df: pd.DataFrame) -> go.Figure:
"""インタラクティブエグゼクティブダッシュボード"""
# サブプロットの設定
fig = make_subplots(
rows=3, cols=2,
subplot_titles=('売上トレンド', '地域別売上', '顧客セグメント', 'カテゴリ分析', '時間別売上パターン', '予測分析'),
specs=[[{"secondary_y": True}, {"type": "bar"}],
[{"type": "scatter"}, {"type": "pie"}],
[{"type": "heatmap"}, {"secondary_y": True}]],
vertical_spacing=0.12
)
# 1. 売上トレンド
daily_sales = df.groupby(pd.to_datetime(df['order_date']).dt.date)['total_amount'].sum().reset_index()
daily_sales.columns = ['date', 'revenue']
fig.add_trace(
go.Scatter(x=daily_sales['date'], y=daily_sales['revenue'],
mode='lines+markers', name='日別売上',
line=dict(color=self.brand_colors['primary'], width=3),
hovertemplate='日付: %{x}<br>売上: ¥%{y:,.0f}<extra></extra>'),
row=1, col=1
)
# 移動平均追加
daily_sales['ma7'] = daily_sales['revenue'].rolling(7).mean()
fig.add_trace(
go.Scatter(x=daily_sales['date'], y=daily_sales['ma7'],
mode='lines', name='7日移動平均',
line=dict(color=self.brand_colors['secondary'], width=2, dash='dash')),
row=1, col=1
)
# 2. 地域別売上
region_sales = df.groupby('region')['total_amount'].sum().sort_values(ascending=True)
fig.add_trace(
go.Bar(y=region_sales.index, x=region_sales.values, orientation='h',
name='地域別売上', marker_color=self.brand_colors['success'],
hovertemplate='地域: %{y}<br>売上: ¥%{x:,.0f}<extra></extra>'),
row=1, col=2
)
# 3. 顧客セグメント(RFM分析ベース)
# 簡易RFM計算
customer_rfm = self._calculate_simple_rfm(df)
fig.add_trace(
go.Scatter(x=customer_rfm['frequency'], y=customer_rfm['monetary'],
mode='markers', name='顧客分布',
marker=dict(size=customer_rfm['recency']/10,
color=customer_rfm['monetary'],
colorscale='Viridis', showscale=True,
colorbar=dict(title="購入金額")),
hovertemplate='頻度: %{x}<br>金額: ¥%{y:,.0f}<br>最新購入: %{marker.size:.0f}日前<extra></extra>'),
row=2, col=1
)
# 4. カテゴリ分析
category_sales = df.groupby('category')['total_amount'].sum()
fig.add_trace(
go.Pie(labels=category_sales.index, values=category_sales.values,
name='カテゴリ別売上'),
row=2, col=2
)
# レイアウト設定
fig.update_layout(
height=1200,
title_text="エグゼクティブダッシュボード - 売上分析",
title_font_size=20,
showlegend=True
)
return fig
def _calculate_simple_rfm(self, df: pd.DataFrame) -> pd.DataFrame:
"""簡易RFM分析"""
analysis_date = df['order_date'].max()
rfm = (df.groupby('customer_id')
.agg({
'order_date': lambda x: (pd.to_datetime(analysis_date) - pd.to_datetime(x).max()).days,
'total_amount': ['count', 'sum']
}))
rfm.columns = ['recency', 'frequency', 'monetary']
return rfm.reset_index()
def export_dashboard(self,
fig: Union[plt.Figure, go.Figure],
filename: str,
formats: List[str] = ['png', 'pdf']):
"""ダッシュボードのエクスポート"""
output_path = Path('dashboard_exports')
output_path.mkdir(exist_ok=True)
if isinstance(fig, plt.Figure):
# matplotlib figure
for fmt in formats:
filepath = output_path / f"{filename}.{fmt}"
fig.savefig(filepath, dpi=self.dpi, bbox_inches='tight',
facecolor='white', edgecolor='none')
logger.info(f"静的ダッシュボードを保存: {filepath}")
elif isinstance(fig, go.Figure):
# plotly figure
for fmt in formats:
if fmt == 'html':
filepath = output_path / f"{filename}.html"
fig.write_html(filepath)
elif fmt == 'png':
filepath = output_path / f"{filename}.png"
fig.write_image(filepath, width=1920, height=1080, scale=2)
elif fmt == 'pdf':
filepath = output_path / f"{filename}.pdf"
fig.write_image(filepath, width=1920, height=1080)
logger.info(f"インタラクティブダッシュボードを保存: {filepath}")
# プロダクション使用例
viz_engine = EnterpriseVisualizationEngine(
output_format='both',
dpi=300
)
if 'df' in locals():
# エグゼクティブダッシュボードの作成
dashboard_results = viz_engine.create_executive_dashboard(df)
if isinstance(dashboard_results, tuple):
static_dashboard, interactive_dashboard = dashboard_results
# 静的ダッシュボード表示
plt.show()
# インタラクティブダッシュボード保存
viz_engine.export_dashboard(interactive_dashboard, 'executive_dashboard_interactive', ['html', 'png'])
print("エグゼクティブダッシュボード作成完了")
print("- 静的版: 表示済み")
print("- インタラクティブ版: dashboard_exports/ に保存")
else:
# 単一形式のダッシュボード
viz_engine.export_dashboard(dashboard_results, 'executive_dashboard', ['png', 'pdf'])
print("ダッシュボード作成・保存完了")4.2 インタラクティブ可視化の実装
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.offline as pyo
class InteractiveVisualizer:
"""インタラクティブ可視化クラス"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.df['date'] = pd.to_datetime(self.df['date'])
def create_interactive_dashboard(self):
"""インタラクティブダッシュボードの作成"""
# サブプロットの設定
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('時系列トレンド', 'カテゴリ別売上', '散布図分析', '地域別分析'),
specs=[[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": False}, {"secondary_y": False}]]
)
# 1. 時系列トレンド
daily_sales = self.df.groupby(self.df['date'].dt.date)['sales_amount'].sum().reset_index()
daily_sales['date'] = pd.to_datetime(daily_sales['date'])
fig.add_trace(
go.Scatter(
x=daily_sales['date'],
y=daily_sales['sales_amount'],
mode='lines+markers',
name='日別売上',
hovertemplate='日付: %{x}<br>売上: ¥%{y:,.0f}<extra></extra>'
),
row=1, col=1
)
# 2. カテゴリ別売上
category_sales = self.df.groupby('category')['sales_amount'].sum().reset_index()
fig.add_trace(
go.Bar(
x=category_sales['category'],
y=category_sales['sales_amount'],
name='カテゴリ別売上',
hovertemplate='カテゴリ: %{x}<br>売上: ¥%{y:,.0f}<extra></extra>'
),
row=1, col=2
)
# 3. 散布図分析
fig.add_trace(
go.Scatter(
x=self.df['quantity'],
y=self.df['sales_amount'],
mode='markers',
name='数量vs売上',
hovertemplate='数量: %{x}<br>売上: ¥%{y:,.0f}<extra></extra>',
marker=dict(
size=8,
color=self.df['sales_amount'],
colorscale='Viridis',
showscale=True
)
),
row=2, col=1
)
# 4. 地域別分析
region_sales = self.df.groupby('region')['sales_amount'].sum().reset_index()
fig.add_trace(
go.Pie(
labels=region_sales['region'],
values=region_sales['sales_amount'],
name='地域別売上',
hovertemplate='地域: %{label}<br>売上: ¥%{value:,.0f}<br>割合: %{percent}<extra></extra>'
),
row=2, col=2
)
# レイアウト設定
fig.update_layout(
title_text="インタラクティブ売上分析ダッシュボード",
showlegend=True,
height=800
)
return fig
def create_3d_analysis(self):
"""3D分析の可視化"""
# 時間、カテゴリ、地域の3次元分析
df_3d = (self.df.groupby(['category', 'region', pd.to_datetime(self.df['date']).dt.hour])
['sales_amount'].sum().reset_index())
df_3d.columns = ['category', 'region', 'hour', 'sales_amount']
fig = go.Figure(data=go.Scatter3d(
x=df_3d['hour'],
y=pd.Categorical(df_3d['category']).codes,
z=pd.Categorical(df_3d['region']).codes,
mode='markers',
marker=dict(
size=df_3d['sales_amount'] / 1000, # サイズ正規化
color=df_3d['sales_amount'],
colorscale='Rainbow',
showscale=True,
opacity=0.8
),
text=[f'時間: {h}時<br>カテゴリ: {c}<br>地域: {r}<br>売上: ¥{s:,.0f}'
for h, c, r, s in zip(df_3d['hour'], df_3d['category'],
df_3d['region'], df_3d['sales_amount'])],
hovertemplate='%{text}<extra></extra>'
))
fig.update_layout(
title='3D売上分析(時間×カテゴリ×地域)',
scene=dict(
xaxis_title='時間',
yaxis_title='カテゴリ',
zaxis_title='地域'
)
)
return fig
# インタラクティブ可視化の実行
try:
interactive_viz = InteractiveVisualizer(df)
# インタラクティブダッシュボード
dashboard = interactive_viz.create_interactive_dashboard()
dashboard.show()
# 3D分析
analysis_3d = interactive_viz.create_3d_analysis()
analysis_3d.show()
print("インタラクティブ可視化完了")
except ImportError:
print("Plotlyがインストールされていません。")
print("インストール: pip install plotly")さらに理解を深める参考書
関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。
5. 統計分析・機械学習実装
5.1 統計的仮説検定の実装
from scipy import stats
from scipy.stats import normaltest, levene, mannwhitneyu, kruskal
from statsmodels.stats.power import ttest_power
from statsmodels.stats.proportion import proportions_ztest
import statsmodels.api as sm
from statsmodels.formula.api import ols
from statsmodels.stats.multicomp import pairwise_tukeyhsd
class StatisticalAnalyzer:
"""統計分析クラス"""
def __init__(self, df: pd.DataFrame, alpha: float = 0.05):
self.df = df.copy()
self.alpha = alpha
def normality_test(self, column: str) -> Dict[str, any]:
"""正規性の検定"""
data = self.df[column].dropna()
results = {
'sample_size': len(data),
'mean': data.mean(),
'std': data.std(),
'skewness': stats.skew(data),
'kurtosis': stats.kurtosis(data)
}
# Shapiro-Wilk検定(サンプルサイズ < 5000)
if len(data) < 5000:
shapiro_stat, shapiro_p = stats.shapiro(data)
results['shapiro_wilk'] = {
'statistic': shapiro_stat,
'p_value': shapiro_p,
'is_normal': shapiro_p > self.alpha
}
# D'Agostino検定
dagostino_stat, dagostino_p = normaltest(data)
results['dagostino'] = {
'statistic': dagostino_stat,
'p_value': dagostino_p,
'is_normal': dagostino_p > self.alpha
}
# Kolmogorov-Smirnov検定
ks_stat, ks_p = stats.kstest(data, 'norm', args=(data.mean(), data.std()))
results['kolmogorov_smirnov'] = {
'statistic': ks_stat,
'p_value': ks_p,
'is_normal': ks_p > self.alpha
}
return results
def compare_groups_analysis(self, group_col: str, value_col: str) -> Dict[str, any]:
"""群間比較の包括的分析"""
groups = [group[1][value_col].dropna() for group in self.df.groupby(group_col)]
group_names = list(self.df[group_col].unique())
results = {
'group_names': group_names,
'group_sizes': [len(group) for group in groups],
'group_means': [group.mean() for group in groups],
'group_stds': [group.std() for group in groups]
}
# 等分散性の検定(Levene検定)
levene_stat, levene_p = levene(*groups)
results['levene_test'] = {
'statistic': levene_stat,
'p_value': levene_p,
'equal_variances': levene_p > self.alpha
}
# 正規性の確認
normality_results = []
for i, group in enumerate(groups):
if len(group) > 3: # 最小サンプル数チェック
_, p_value = normaltest(group) if len(group) >= 8 else (None, 0.05)
normality_results.append(p_value > self.alpha if p_value is not None else False)
else:
normality_results.append(False)
results['group_normality'] = normality_results
all_normal = all(normality_results)
equal_variances = results['levene_test']['equal_variances']
# 適切な検定の選択と実行
if len(groups) == 2:
if all_normal and equal_variances:
# 対応のないt検定
stat, p_value = stats.ttest_ind(groups[0], groups[1])
test_name = "Independent t-test"
elif all_normal and not equal_variances:
# Welchのt検定
stat, p_value = stats.ttest_ind(groups[0], groups[1], equal_var=False)
test_name = "Welch's t-test"
else:
# Mann-Whitney U検定
stat, p_value = mannwhitneyu(groups[0], groups[1])
test_name = "Mann-Whitney U test"
else:
if all_normal and equal_variances:
# 一元配置分散分析
stat, p_value = stats.f_oneway(*groups)
test_name = "One-way ANOVA"
# 事後検定(Tukey HSD)
if p_value <= self.alpha:
df_tukey = pd.DataFrame()
for i, name in enumerate(group_names):
temp_df = pd.DataFrame({
group_col: name,
value_col: groups[i]
})
df_tukey = pd.concat([df_tukey, temp_df])
tukey_results = pairwise_tukeyhsd(df_tukey[value_col], df_tukey[group_col])
results['post_hoc'] = {
'test': 'Tukey HSD',
'results': str(tukey_results)
}
else:
# Kruskal-Wallis検定
stat, p_value = kruskal(*groups)
test_name = "Kruskal-Wallis test"
results['main_test'] = {
'test_name': test_name,
'statistic': stat,
'p_value': p_value,
'significant': p_value <= self.alpha,
'effect_size': self._calculate_effect_size(groups, test_name)
}
return results
def _calculate_effect_size(self, groups: List[np.ndarray], test_name: str) -> float:
"""効果量の計算"""
if len(groups) == 2:
# Cohen's d
pooled_std = np.sqrt(((len(groups[0]) - 1) * groups[0].var() +
(len(groups[1]) - 1) * groups[1].var()) /
(len(groups[0]) + len(groups[1]) - 2))
if pooled_std > 0:
return abs(groups[0].mean() - groups[1].mean()) / pooled_std
else:
# 偏イータ二乗(近似)
all_data = np.concatenate(groups)
between_var = sum(len(group) * (group.mean() - all_data.mean())**2 for group in groups)
total_var = sum((x - all_data.mean())**2 for x in all_data)
if total_var > 0:
return between_var / total_var
return 0.0
def correlation_analysis(self) -> pd.DataFrame:
"""相関分析の実装"""
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
results = []
for i, col1 in enumerate(numeric_cols):
for j, col2 in enumerate(numeric_cols):
if i < j: # 上三角のみ
data1 = self.df[col1].dropna()
data2 = self.df[col2].dropna()
# 共通インデックスでデータを揃える
common_idx = data1.index.intersection(data2.index)
data1_common = data1[common_idx]
data2_common = data2[common_idx]
if len(data1_common) > 3:
# Pearson相関
pearson_r, pearson_p = stats.pearsonr(data1_common, data2_common)
# Spearman相関
spearman_r, spearman_p = stats.spearmanr(data1_common, data2_common)
results.append({
'variable1': col1,
'variable2': col2,
'n_samples': len(data1_common),
'pearson_r': pearson_r,
'pearson_p': pearson_p,
'pearson_significant': pearson_p <= self.alpha,
'spearman_r': spearman_r,
'spearman_p': spearman_p,
'spearman_significant': spearman_p <= self.alpha
})
return pd.DataFrame(results)
# 統計分析の実行例
analyzer = StatisticalAnalyzer(df)
print("統計分析実行中...")
# 正規性検定
normality_result = analyzer.normality_test('sales_amount')
print("売上金額の正規性検定:")
print(f" 平均: {normality_result['mean']:.2f}")
print(f" 標準偏差: {normality_result['std']:.2f}")
print(f" 歪度: {normality_result['skewness']:.3f}")
print(f" 尖度: {normality_result['kurtosis']:.3f}")
if 'shapiro_wilk' in normality_result:
print(f" Shapiro-Wilk検定: p-value = {normality_result['shapiro_wilk']['p_value']:.6f}")
print(f" 正規分布: {normality_result['shapiro_wilk']['is_normal']}")
# 群間比較
group_comparison = analyzer.compare_groups_analysis('category', 'sales_amount')
print(f"\nカテゴリ間売上比較:")
print(f" 使用検定: {group_comparison['main_test']['test_name']}")
print(f" 統計量: {group_comparison['main_test']['statistic']:.4f}")
print(f" p値: {group_comparison['main_test']['p_value']:.6f}")
print(f" 有意差: {group_comparison['main_test']['significant']}")
print(f" 効果量: {group_comparison['main_test']['effect_size']:.4f}")
# 相関分析
correlation_results = analyzer.correlation_analysis()
significant_correlations = correlation_results[
correlation_results['pearson_significant'] == True
].sort_values('pearson_r', key=abs, ascending=False)
print(f"\n有意な相関関係:")
for _, row in significant_correlations.head().iterrows():
print(f" {row['variable1']} - {row['variable2']}: r = {row['pearson_r']:.3f} (p = {row['pearson_p']:.6f})")5.2 機械学習実装パターン
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import xgboost as xgb
from sklearn.pipeline import Pipeline
import joblib
class MLModelAnalyzer:
"""機械学習モデル分析クラス"""
def __init__(self, df: pd.DataFrame, target_col: str):
self.df = df.copy()
self.target_col = target_col
self.models = {}
self.results = {}
def prepare_data(self) -> Tuple[np.ndarray, np.ndarray]:
"""データ前処理と特徴量エンジニアリング"""
df_ml = self.df.copy()
# 欠損値処理
df_ml = df_ml.dropna()
# カテゴリ変数のエンコーディング
categorical_cols = df_ml.select_dtypes(include=['object', 'category']).columns
label_encoders = {}
for col in categorical_cols:
if col != self.target_col:
le = LabelEncoder()
df_ml[col] = le.fit_transform(df_ml[col].astype(str))
label_encoders[col] = le
self.label_encoders = label_encoders
# 日時特徴量の処理
if 'date' in df_ml.columns:
df_ml['date'] = pd.to_datetime(df_ml['date'])
df_ml['hour'] = df_ml['date'].dt.hour
df_ml['day_of_week'] = df_ml['date'].dt.dayofweek
df_ml['month'] = df_ml['date'].dt.month
df_ml = df_ml.drop('date', axis=1)
# 特徴量とターゲットの分離
X = df_ml.drop(self.target_col, axis=1)
y = df_ml[self.target_col]
self.feature_names = X.columns.tolist()
return X.values, y.values
def train_models(self, test_size: float = 0.2, random_state: int = 42):
"""複数モデルの学習と評価"""
X, y = self.prepare_data()
# データ分割
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
# スケーリング
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
self.scaler = scaler
self.X_test = X_test_scaled
self.y_test = y_test
# モデル定義
models = {
'Linear Regression': LinearRegression(),
'Ridge Regression': Ridge(alpha=1.0),
'Lasso Regression': Lasso(alpha=1.0),
'Random Forest': RandomForestRegressor(n_estimators=100, random_state=random_state),
'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=random_state),
'XGBoost': xgb.XGBRegressor(n_estimators=100, random_state=random_state)
}
# 各モデルの学習と評価
for name, model in models.items():
print(f"学習中: {name}")
# 線形モデルにはスケーリング済みデータを使用
if 'Regression' in name:
model.fit(X_train_scaled, y_train)
y_pred = model.predict(X_test_scaled)
else:
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
# 評価指標の計算
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
# クロスバリデーション
if 'Regression' in name:
cv_scores = cross_val_score(model, X_train_scaled, y_train,
cv=5, scoring='r2', n_jobs=-1)
else:
cv_scores = cross_val_score(model, X_train, y_train,
cv=5, scoring='r2', n_jobs=-1)
self.models[name] = model
self.results[name] = {
'RMSE': rmse,
'MAE': mae,
'R2': r2,
'CV_R2_mean': cv_scores.mean(),
'CV_R2_std': cv_scores.std(),
'predictions': y_pred
}
return self.results
def hyperparameter_tuning(self, model_name: str = 'Random Forest'):
"""ハイパーパラメータチューニング"""
X, y = self.prepare_data()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
if model_name == 'Random Forest':
model = RandomForestRegressor(random_state=42)
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [10, 20, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
}
elif model_name == 'XGBoost':
model = xgb.XGBRegressor(random_state=42)
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [3, 6, 10],
'learning_rate': [0.01, 0.1, 0.2],
'subsample': [0.8, 0.9, 1.0]
}
# グリッドサーチ
grid_search = GridSearchCV(
model, param_grid, cv=5, scoring='r2', n_jobs=-1, verbose=1
)
print(f"{model_name} ハイパーパラメータチューニング実行中...")
grid_search.fit(X_train, y_train)
# 最適モデルでの予測
best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test)
results = {
'best_params': grid_search.best_params_,
'best_score': grid_search.best_score_,
'test_r2': r2_score(y_test, y_pred),
'test_rmse': np.sqrt(mean_squared_error(y_test, y_pred))
}
return best_model, results
def feature_importance_analysis(self, model_name: str = 'Random Forest'):
"""特徴量重要度分析"""
if model_name not in self.models:
print(f"モデル '{model_name}' が学習されていません。")
return None
model = self.models[model_name]
if hasattr(model, 'feature_importances_'):
importances = model.feature_importances_
indices = np.argsort(importances)[::-1]
importance_df = pd.DataFrame({
'feature': [self.feature_names[i] for i in indices],
'importance': importances[indices]
})
# 可視化
plt.figure(figsize=(10, 6))
plt.bar(range(len(importance_df)), importance_df['importance'])
plt.xticks(range(len(importance_df)), importance_df['feature'], rotation=45)
plt.title(f'{model_name} - 特徴量重要度')
plt.tight_layout()
plt.show()
return importance_df
else:
print(f"モデル '{model_name}' は特徴量重要度をサポートしていません。")
return None
def save_best_model(self, filename: str):
"""最高性能モデルの保存"""
best_model_name = max(self.results.keys(), key=lambda x: self.results[x]['R2'])
best_model = self.models[best_model_name]
# モデルと前処理器をパイプラインとして保存
pipeline = Pipeline([
('scaler', self.scaler),
('model', best_model)
])
joblib.dump(pipeline, filename)
print(f"最高性能モデル ({best_model_name}) を {filename} に保存しました。")
print(f"R2スコア: {self.results[best_model_name]['R2']:.4f}")
# 機械学習実行例
print("機械学習分析開始...")
# 売上予測モデルの構築
ml_analyzer = MLModelAnalyzer(feature_data, 'sales_amount')
# モデル学習
results = ml_analyzer.train_models()
# 結果表示
print("\nモデル性能比較:")
performance_df = pd.DataFrame(results).T
performance_df = performance_df.sort_values('R2', ascending=False)
print(performance_df[['RMSE', 'MAE', 'R2', 'CV_R2_mean']].round(4))
# ハイパーパラメータチューニング
try:
best_rf, tuning_results = ml_analyzer.hyperparameter_tuning('Random Forest')
print(f"\nチューニング結果:")
print(f"最適パラメータ: {tuning_results['best_params']}")
print(f"テストR2: {tuning_results['test_r2']:.4f}")
except Exception as e:
print(f"ハイパーパラメータチューニングでエラー: {e}")
# 特徴量重要度分析
importance_df = ml_analyzer.feature_importance_analysis('Random Forest')
if importance_df is not None:
print("\n重要な特徴量トップ5:")
print(importance_df.head())
# モデル保存
try:
ml_analyzer.save_best_model('best_sales_prediction_model.pkl')
except Exception as e:
print(f"モデル保存エラー: {e}")さらに理解を深める参考書
関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。
6. パフォーマンス最適化・メモリ管理
6.1 大規模データ処理の最適化
import gc
import psutil
from memory_profiler import profile
from contextlib import contextmanager
import multiprocessing as mp
from functools import partial
class BigDataProcessor:
"""大規模データ処理最適化クラス"""
def __init__(self):
self.memory_threshold = 0.8 # メモリ使用率80%で警告
@contextmanager
def memory_monitor(self, description: str):
"""メモリ使用量監視コンテキストマネージャ"""
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
print(f"開始時メモリ使用量 ({description}): {initial_memory:.2f} MB")
try:
yield
finally:
final_memory = process.memory_info().rss / 1024 / 1024
memory_used = final_memory - initial_memory
print(f"終了時メモリ使用量 ({description}): {final_memory:.2f} MB")
print(f"メモリ増加量: {memory_used:+.2f} MB")
# ガベージコレクション実行
collected = gc.collect()
if collected > 0:
print(f"ガベージコレクション: {collected} オブジェクトを解放")
def chunk_processor(self, file_path: str, chunk_size: int = 10000) -> pd.DataFrame:
"""チャンク単位での効率的データ処理"""
def process_chunk(chunk: pd.DataFrame) -> pd.DataFrame:
"""個別チャンクの処理ロジック"""
# データ型最適化
chunk = self._optimize_chunk_dtypes(chunk)
# 必要な計算のみ実行
chunk['sales_per_unit'] = chunk['sales_amount'] / chunk['quantity'].replace(0, 1)
chunk['is_weekend'] = pd.to_datetime(chunk['date']).dt.dayofweek >= 5
return chunk[['customer_id', 'sales_amount', 'sales_per_unit', 'is_weekend']]
processed_chunks = []
total_rows = 0
with self.memory_monitor("チャンク処理"):
try:
for chunk_num, chunk in enumerate(pd.read_csv(file_path, chunksize=chunk_size)):
processed_chunk = process_chunk(chunk)
processed_chunks.append(processed_chunk)
total_rows += len(chunk)
if chunk_num % 10 == 0:
print(f"処理済みチャンク: {chunk_num + 1}, 累計行数: {total_rows:,}")
# メモリ使用率チェック
memory_percent = psutil.virtual_memory().percent
if memory_percent > self.memory_threshold * 100:
print(f"警告: メモリ使用率 {memory_percent:.1f}%")
gc.collect()
# 結合
result = pd.concat(processed_chunks, ignore_index=True)
print(f"総処理行数: {total_rows:,}")
except Exception as e:
print(f"チャンク処理エラー: {e}")
result = pd.DataFrame()
return result
def _optimize_chunk_dtypes(self, chunk: pd.DataFrame) -> pd.DataFrame:
"""チャンクのデータ型最適化"""
for col in chunk.select_dtypes(include=['int64']).columns:
chunk[col] = pd.to_numeric(chunk[col], downcast='integer')
for col in chunk.select_dtypes(include=['float64']).columns:
chunk[col] = pd.to_numeric(chunk[col], downcast='float')
return chunk
def parallel_processing_example(self, df: pd.DataFrame, n_cores: int = None) -> pd.DataFrame:
"""並列処理による高速化"""
if n_cores is None:
n_cores = min(mp.cpu_count() - 1, 4) # 最大4コアまで使用
def compute_customer_stats(customer_group: pd.DataFrame) -> Dict:
"""顧客別統計の計算"""
customer_id = customer_group['customer_id'].iloc[0]
return {
'customer_id': customer_id,
'total_sales': customer_group['sales_amount'].sum(),
'avg_sales': customer_group['sales_amount'].mean(),
'transaction_count': len(customer_group),
'std_sales': customer_group['sales_amount'].std()
}
with self.memory_monitor("並列処理"):
# データを顧客ごとにグループ化
customer_groups = [group for _, group in df.groupby('customer_id')]
# 並列処理実行
with mp.Pool(processes=n_cores) as pool:
results = pool.map(compute_customer_stats, customer_groups)
result_df = pd.DataFrame(results)
print(f"並列処理完了: {len(results)} 顧客の統計を {n_cores} コアで処理")
return result_df
def vectorized_string_operations(self, df: pd.DataFrame) -> pd.DataFrame:
"""文字列操作の高速化"""
with self.memory_monitor("文字列操作最適化"):
df_result = df.copy()
# 非効率な方法(避けるべき)
# df_result['category_upper'] = df_result['category'].apply(lambda x: x.upper())
# 効率的な方法
df_result['category_upper'] = df_result['category'].str.upper()
df_result['category_length'] = df_result['category'].str.len()
df_result['is_category_A'] = df_result['category'].eq('A').astype('int8')
# 複合条件の効率的な処理
mask = (df_result['sales_amount'] > 1000) & (df_result['category'].isin(['A', 'B']))
df_result['high_value_AB'] = mask.astype('int8')
return df_result
def memory_efficient_joins(self, df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
"""メモリ効率的な結合処理"""
with self.memory_monitor("効率的結合"):
# 結合前のメモリ使用量チェック
df1_memory = df1.memory_usage(deep=True).sum() / 1024 / 1024
df2_memory = df2.memory_usage(deep=True).sum() / 1024 / 1024
print(f"結合前メモリ: df1={df1_memory:.2f}MB, df2={df2_memory:.2f}MB")
# 必要な列のみ選択してから結合
df1_subset = df1[['customer_id', 'sales_amount', 'category']].copy()
df2_subset = df2[['customer_id', 'customer_name', 'customer_tier']].copy()
# インデックス最適化
df1_subset.set_index('customer_id', inplace=True)
df2_subset.set_index('customer_id', inplace=True)
# 結合実行
result = df1_subset.join(df2_subset, how='inner')
# インデックスをリセット
result.reset_index(inplace=True)
result_memory = result.memory_usage(deep=True).sum() / 1024 / 1024
print(f"結合後メモリ: {result_memory:.2f}MB")
return result
# 大規模データ処理の実行例
big_data_processor = BigDataProcessor()
print("大規模データ処理最適化デモ")
print("=" * 40)
# サンプル大データの生成(実環境では既存ファイルを使用)
large_sample = pd.concat([df] * 10, ignore_index=True) # 10倍に拡大
large_sample.to_csv('large_sample_data.csv', index=False)
# チャンク処理
try:
chunk_result = big_data_processor.chunk_processor('large_sample_data.csv', chunk_size=50000)
print(f"チャンク処理結果: {chunk_result.shape}")
except Exception as e:
print(f"チャンク処理スキップ: {e}")
# 並列処理(小さなデータセットでデモ)
parallel_result = big_data_processor.parallel_processing_example(df, n_cores=2)
print(f"並列処理結果: {parallel_result.shape}")
# 文字列操作最適化
string_optimized = big_data_processor.vectorized_string_operations(df)
print(f"文字列操作最適化完了: {string_optimized.shape}")
# 効率的結合
customer_data = pd.DataFrame({
'customer_id': range(1, 1001),
'customer_name': [f'Customer_{i}' for i in range(1, 1001)],
'customer_tier': np.random.choice(['Gold', 'Silver', 'Bronze'], 1000)
})
efficient_join = big_data_processor.memory_efficient_joins(df, customer_data)
print(f"効率的結合完了: {efficient_join.shape}")
さらに理解を深める参考書
関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。


![Pythonクローリング&スクレイピング[増補改訂版] -データ収集・解析のための実践開発ガイド-](https://m.media-amazon.com/images/I/41M0fHtnwxL._SL500_.jpg)


![Ansible実践ガイド 第4版[基礎編] impress top gearシリーズ](https://m.media-amazon.com/images/I/516W+QJKg1L._SL500_.jpg)