AIエージェント開発ツール選択の重要性
AIエージェント開発における適切なツール・ライブラリの選択は、プロジェクトの成功を決定づける重要な要素です。2025年現在、エコシステムは急速に拡大しており、目的に応じた最適なツール組み合わせの選択が開発効率と最終的な品質に大きく影響します。
ツール選択の評価軸
開発ツール選択時の主要な評価軸:
- 学習コストと導入障壁: チーム全体での習得容易性
- 統合性: 他のツールとの連携しやすさ
- 拡張性: 将来的な機能追加への対応力
- コミュニティサポート: 情報共有とトラブルシューティング
- コストパフォーマンス: 機能に対する価格の妥当性
- 企業向け機能: セキュリティ、コンプライアンス対応
最短で課題解決する一冊
この記事の内容と高い親和性が確認できたベストマッチです。早めにチェックしておきましょう。
必須開発環境・基盤ツール
Python環境管理:Poetry vs Pipenv vs Conda
AIエージェント開発では、複雑な依存関係の管理が重要です。
# Poetry を使った プロジェクト設定例
# pyproject.toml
[tool.poetry]
name = "ai-agent-project"
version = "0.1.0"
description = "Advanced AI Agent Development Project"
authors = ["Your Name <your.email@example.com>"]
[tool.poetry.dependencies]
python = "^3.11"
langchain = "^0.1.0"
openai = "^1.0.0"
pinecone-client = "^3.0.0"
redis = "^5.0.0"
fastapi = "^0.104.0"
uvicorn = "^0.24.0"
pydantic = "^2.5.0"
sqlalchemy = "^2.0.0"
alembic = "^1.13.0"
pytest = "^7.4.0"
pytest-asyncio = "^0.23.0"
black = "^23.11.0"
ruff = "^0.1.6"
mypy = "^1.7.0"
[tool.poetry.group.dev.dependencies]
jupyter = "^1.0.0"
ipython = "^8.17.0"
notebook = "^7.0.0"
pre-commit = "^3.5.0"
pytest-cov = "^4.1.0"
pytest-mock = "^3.12.0"
[tool.poetry.group.monitoring.dependencies]
prometheus-client = "^0.19.0"
grafana-api = "^1.0.3"
sentry-sdk = "^1.38.0"
structlog = "^23.2.0"
[tool.poetry.group.deployment.dependencies]
docker = "^6.1.0"
kubernetes = "^28.1.0"
gunicorn = "^21.2.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
# 開発環境セットアップスクリプト
import subprocess
import sys
from pathlib import Path
class DevelopmentEnvironmentSetup:
"""開発環境の自動セットアップ"""
def __init__(self, project_name: str):
self.project_name = project_name
self.project_path = Path(project_name)
def setup_complete_environment(self):
"""完全な開発環境のセットアップ"""
print("🚀 AIエージェント開発環境セットアップ開始")
# 1. プロジェクトディレクトリ作成
self._create_project_structure()
# 2. Poetry初期化
self._setup_poetry()
# 3. Pre-commit設定
self._setup_precommit()
# 4. Docker環境設定
self._setup_docker()
# 5. VSCode設定
self._setup_vscode()
# 6. 環境変数テンプレート
self._create_env_template()
print("✅ 開発環境セットアップ完了!")
def _create_project_structure(self):
"""プロジェクト構造の作成"""
directories = [
"src/agents",
"src/tools",
"src/models",
"src/services",
"src/utils",
"tests/unit",
"tests/integration",
"tests/performance",
"docs",
"scripts",
"config",
"data/raw",
"data/processed",
"logs",
"monitoring",
"deployment/docker",
"deployment/k8s"
]
for directory in directories:
(self.project_path / directory).mkdir(parents=True, exist_ok=True)
# __init__.py ファイルの作成
init_files = [
"src/__init__.py",
"src/agents/__init__.py",
"src/tools/__init__.py",
"src/models/__init__.py",
"src/services/__init__.py",
"src/utils/__init__.py"
]
for init_file in init_files:
(self.project_path / init_file).touch()
def _setup_poetry(self):
"""Poetry設定"""
pyproject_content = '''[tool.poetry]
name = "ai-agent-project"
version = "0.1.0"
description = "Advanced AI Agent Development Project"
authors = ["Your Name <your.email@example.com>"]
[tool.poetry.dependencies]
python = "^3.11"
langchain = "^0.1.0"
openai = "^1.0.0"
# ... (省略)
[tool.black]
line-length = 100
target-version = ['py311']
include = '\\.pyi?$'
[tool.ruff]
target-version = "py311"
line-length = 100
select = ["E", "F", "I", "W", "UP", "N", "YTT", "S", "BLE", "FBT", "B", "A", "C4", "EM", "ICN", "G", "PIE", "T20", "PT", "Q", "RET", "SIM", "TID", "ARG", "PTH", "ERA", "PL", "TRY", "RUF"]
[tool.mypy]
python_version = "3.11"
warn_return_any = true
strict_optional = true
warn_no_return = true
warn_redundant_casts = true
warn_unused_ignores = true
show_error_codes = true
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "--verbose --tb=short --cov=src --cov-report=html --cov-report=xml"
'''
(self.project_path / "pyproject.toml").write_text(pyproject_content)
def _setup_precommit(self):
"""Pre-commit設定"""
precommit_content = '''repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: check-merge-conflict
- id: debug-statements
- repo: https://github.com/psf/black
rev: 23.11.0
hooks:
- id: black
language_version: python3.11
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.1.6
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.7.0
hooks:
- id: mypy
additional_dependencies: [types-all]
'''
(self.project_path / ".pre-commit-config.yaml").write_text(precommit_content)
def _setup_docker(self):
"""Docker設定"""
dockerfile_content = '''FROM python:3.11-slim
WORKDIR /app
# システム依存関係のインストール
RUN apt-get update && apt-get install -y \\
build-essential \\
curl \\
&& rm -rf /var/lib/apt/lists/*
# Poetryのインストール
RUN pip install poetry
# プロジェクトファイルのコピー
COPY pyproject.toml poetry.lock* ./
# 依存関係のインストール
RUN poetry config virtualenvs.create false \\
&& poetry install --no-interaction --no-ansi
# ソースコードのコピー
COPY . .
# アプリケーションの実行
CMD ["python", "-m", "src.main"]
'''
docker_compose_content = '''version: '3.8'
services:
ai-agent:
build: .
ports:
- "8000:8000"
environment:
- ENVIRONMENT=development
volumes:
- ./logs:/app/logs
depends_on:
- redis
- postgres
- pinecone
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
postgres:
image: postgres:15
environment:
POSTGRES_DB: ai_agent_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
monitoring:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
volumes:
redis_data:
postgres_data:
'''
(self.project_path / "Dockerfile").write_text(dockerfile_content)
(self.project_path / "docker-compose.yml").write_text(docker_compose_content)
def _setup_vscode(self):
"""VSCode設定"""
vscode_settings = '''{
"python.defaultInterpreterPath": "./venv/bin/python",
"python.linting.enabled": true,
"python.linting.pylintEnabled": false,
"python.linting.ruffEnabled": true,
"python.formatting.provider": "black",
"python.testing.pytestEnabled": true,
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
},
"files.exclude": {
"**/__pycache__": true,
"**/.pytest_cache": true,
"**/node_modules": true
}
}'''
vscode_dir = self.project_path / ".vscode"
vscode_dir.mkdir(exist_ok=True)
(vscode_dir / "settings.json").write_text(vscode_settings)
def _create_env_template(self):
"""環境変数テンプレート"""
env_template = '''# AI Service API Keys
OPENAI_API_KEY=your_openai_api_key_here
ANTHROPIC_API_KEY=your_anthropic_api_key_here
GOOGLE_API_KEY=your_google_api_key_here
# Vector Database
PINECONE_API_KEY=your_pinecone_api_key_here
PINECONE_ENVIRONMENT=your_pinecone_environment_here
# Database
DATABASE_URL=postgresql://postgres:password@localhost:5432/ai_agent_db
REDIS_URL=redis://localhost:6379
# Monitoring
SENTRY_DSN=your_sentry_dsn_here
PROMETHEUS_GATEWAY=localhost:9091
# Application
ENVIRONMENT=development
LOG_LEVEL=INFO
DEBUG=true
'''
(self.project_path / ".env.template").write_text(env_template)
(self.project_path / ".env").write_text(env_template)
# 使用例
if __name__ == "__main__":
setup = DevelopmentEnvironmentSetup("my-ai-agent-project")
setup.setup_complete_environment()IDEとコードエディタ設定
VSCode 推奨拡張機能:
{
"recommendations": [
"ms-python.python",
"ms-python.pylint",
"ms-toolsai.jupyter",
"charliermarsh.ruff",
"ms-python.black-formatter",
"ms-python.mypy-type-checker",
"ms-vscode.vscode-json",
"redhat.vscode-yaml",
"ms-azuretools.vscode-docker",
"ms-kubernetes-tools.vscode-kubernetes-tools",
"github.copilot",
"github.copilot-chat"
]
}さらに理解を深める参考書
関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。
LLM統合・API管理ツール
統合LLMクライアント:LiteLLM
複数のLLMプロバイダーを統一的に扱うためのツールです。
# LiteLLMを使った統合LLM管理システム
import litellm
from typing import Dict, List, Any, Optional
import asyncio
import time
from dataclasses import dataclass
from enum import Enum
class LLMProvider(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
GOOGLE = "google"
COHERE = "cohere"
AZURE = "azure"
@dataclass
class LLMConfig:
provider: LLMProvider
model: str
api_key: str
base_url: Optional[str] = None
max_tokens: int = 1000
temperature: float = 0.3
timeout: int = 30
class UnifiedLLMManager:
"""統合LLM管理システム"""
def __init__(self):
self.configs: Dict[str, LLMConfig] = {}
self.usage_stats: Dict[str, Dict[str, Any]] = {}
self.rate_limits: Dict[str, Dict[str, Any]] = {}
def add_provider(self, name: str, config: LLMConfig):
"""LLMプロバイダーの追加"""
self.configs[name] = config
self.usage_stats[name] = {
"total_requests": 0,
"total_tokens": 0,
"total_cost": 0.0,
"error_count": 0
}
self.rate_limits[name] = {
"requests_per_minute": 60,
"tokens_per_minute": 50000,
"current_requests": 0,
"current_tokens": 0,
"reset_time": time.time() + 60
}
async def generate(self,
provider_name: str,
messages: List[Dict[str, str]],
**kwargs) -> Dict[str, Any]:
"""統一的な生成API"""
if provider_name not in self.configs:
raise ValueError(f"Provider {provider_name} not configured")
config = self.configs[provider_name]
# レート制限チェック
if not self._check_rate_limit(provider_name):
raise Exception(f"Rate limit exceeded for {provider_name}")
try:
# LiteLLMを使った統一呼び出し
response = await litellm.acompletion(
model=f"{config.provider.value}/{config.model}",
messages=messages,
api_key=config.api_key,
max_tokens=config.max_tokens,
temperature=config.temperature,
timeout=config.timeout,
**kwargs
)
# 使用統計の更新
self._update_usage_stats(provider_name, response)
return {
"content": response.choices[0].message.content,
"model": response.model,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
},
"cost": self._calculate_cost(provider_name, response.usage.total_tokens),
"provider": provider_name
}
except Exception as e:
self.usage_stats[provider_name]["error_count"] += 1
raise e
def _check_rate_limit(self, provider_name: str) -> bool:
"""レート制限チェック"""
limits = self.rate_limits[provider_name]
current_time = time.time()
# 制限リセット時間チェック
if current_time > limits["reset_time"]:
limits["current_requests"] = 0
limits["current_tokens"] = 0
limits["reset_time"] = current_time + 60
# 制限チェック
if limits["current_requests"] >= limits["requests_per_minute"]:
return False
return True
def _update_usage_stats(self, provider_name: str, response):
"""使用統計の更新"""
stats = self.usage_stats[provider_name]
limits = self.rate_limits[provider_name]
tokens = response.usage.total_tokens
stats["total_requests"] += 1
stats["total_tokens"] += tokens
stats["total_cost"] += self._calculate_cost(provider_name, tokens)
limits["current_requests"] += 1
limits["current_tokens"] += tokens
def _calculate_cost(self, provider_name: str, tokens: int) -> float:
"""コスト計算"""
# 簡単なコスト計算(実際の実装では各プロバイダーの料金体系を使用)
cost_per_1k_tokens = {
"openai": 0.002,
"anthropic": 0.008,
"google": 0.001,
"cohere": 0.001
}
provider = self.configs[provider_name].provider.value
rate = cost_per_1k_tokens.get(provider, 0.002)
return (tokens / 1000) * rate
def get_usage_report(self) -> Dict[str, Any]:
"""使用状況レポート"""
total_cost = sum(stats["total_cost"] for stats in self.usage_stats.values())
total_requests = sum(stats["total_requests"] for stats in self.usage_stats.values())
return {
"total_cost": total_cost,
"total_requests": total_requests,
"by_provider": self.usage_stats,
"cost_breakdown": {
name: stats["total_cost"]
for name, stats in self.usage_stats.items()
}
}
async def compare_providers(self,
prompt: str,
providers: List[str]) -> Dict[str, Any]:
"""複数プロバイダーでの比較実行"""
results = {}
messages = [{"role": "user", "content": prompt}]
tasks = [
self.generate(provider, messages)
for provider in providers
if provider in self.configs
]
provider_results = await asyncio.gather(*tasks, return_exceptions=True)
for provider, result in zip(providers, provider_results):
if isinstance(result, Exception):
results[provider] = {"error": str(result)}
else:
results[provider] = {
"content": result["content"],
"cost": result["cost"],
"tokens": result["usage"]["total_tokens"],
"model": result["model"]
}
return results
# API管理・監視ツール
class APIMonitor:
"""API使用状況の監視"""
def __init__(self):
self.metrics = {}
self.alerts = []
def track_request(self, provider: str, endpoint: str,
response_time: float, status_code: int):
"""リクエストの追跡"""
key = f"{provider}:{endpoint}"
if key not in self.metrics:
self.metrics[key] = {
"total_requests": 0,
"success_count": 0,
"error_count": 0,
"total_response_time": 0.0,
"avg_response_time": 0.0
}
metrics = self.metrics[key]
metrics["total_requests"] += 1
metrics["total_response_time"] += response_time
metrics["avg_response_time"] = metrics["total_response_time"] / metrics["total_requests"]
if status_code == 200:
metrics["success_count"] += 1
else:
metrics["error_count"] += 1
# アラートチェック
self._check_alerts(key, metrics)
def _check_alerts(self, key: str, metrics: Dict[str, Any]):
"""アラートチェック"""
error_rate = metrics["error_count"] / max(metrics["total_requests"], 1)
# エラー率が高い場合
if error_rate > 0.1: # 10%
self.alerts.append({
"type": "high_error_rate",
"endpoint": key,
"error_rate": error_rate,
"timestamp": time.time()
})
# 応答時間が長い場合
if metrics["avg_response_time"] > 10.0: # 10秒
self.alerts.append({
"type": "slow_response",
"endpoint": key,
"avg_response_time": metrics["avg_response_time"],
"timestamp": time.time()
})
# 使用例
async def demonstrate_llm_management():
"""LLM管理システムのデモ"""
# LLM管理システムの初期化
llm_manager = UnifiedLLMManager()
# プロバイダーの設定
llm_manager.add_provider("openai-gpt4", LLMConfig(
provider=LLMProvider.OPENAI,
model="gpt-4o",
api_key="your-openai-key"
))
llm_manager.add_provider("claude", LLMConfig(
provider=LLMProvider.ANTHROPIC,
model="claude-3-sonnet-20240229",
api_key="your-anthropic-key"
))
# 単一プロバイダーでの生成
messages = [{"role": "user", "content": "AIエージェントの利点を3つ教えてください"}]
result = await llm_manager.generate("openai-gpt4", messages)
print(f"OpenAI Result: {result['content']}")
print(f"Cost: ${result['cost']:.4f}")
# 複数プロバイダーでの比較
comparison = await llm_manager.compare_providers(
"機械学習とディープラーニングの違いを説明してください",
["openai-gpt4", "claude"]
)
print("\n=== プロバイダー比較結果 ===")
for provider, result in comparison.items():
if "error" not in result:
print(f"{provider}: コスト ${result['cost']:.4f}, トークン数 {result['tokens']}")
print(f"応答: {result['content'][:100]}...\n")
# 使用状況レポート
usage_report = llm_manager.get_usage_report()
print("=== 使用状況レポート ===")
print(f"総コスト: ${usage_report['total_cost']:.4f}")
print(f"総リクエスト数: {usage_report['total_requests']}")
if __name__ == "__main__":
asyncio.run(demonstrate_llm_management())さらに理解を深める参考書
関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。
ベクトルデータベース・検索ツール
主要ベクトルDB比較・統合ツール
# 統合ベクトルデータベース管理システム
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from dataclasses import dataclass
import asyncio
import time
@dataclass
class DocumentChunk:
id: str
content: str
metadata: Dict[str, Any]
embedding: Optional[List[float]] = None
@dataclass
class SearchResult:
chunk: DocumentChunk
score: float
rank: int
class VectorDBInterface(ABC):
"""ベクトルDB統一インターフェース"""
@abstractmethod
async def insert(self, chunks: List[DocumentChunk]) -> bool:
pass
@abstractmethod
async def search(self, query_vector: List[float],
top_k: int = 10,
filter_metadata: Optional[Dict] = None) -> List[SearchResult]:
pass
@abstractmethod
async def update(self, chunk_id: str,
new_content: str,
new_embedding: List[float]) -> bool:
pass
@abstractmethod
async def delete(self, chunk_ids: List[str]) -> bool:
pass
@abstractmethod
async def get_stats(self) -> Dict[str, Any]:
pass
class PineconeAdapter(VectorDBInterface):
"""Pinecone アダプター"""
def __init__(self, api_key: str, environment: str, index_name: str):
# import pinecone
# pinecone.init(api_key=api_key, environment=environment)
# self.index = pinecone.Index(index_name)
self.index_name = index_name
print(f"Pinecone initialized: {index_name}")
async def insert(self, chunks: List[DocumentChunk]) -> bool:
try:
vectors = []
for chunk in chunks:
if chunk.embedding:
vectors.append({
"id": chunk.id,
"values": chunk.embedding,
"metadata": {
**chunk.metadata,
"content": chunk.content
}
})
# self.index.upsert(vectors)
print(f"Inserted {len(vectors)} vectors into Pinecone")
return True
except Exception as e:
print(f"Pinecone insert error: {e}")
return False
async def search(self, query_vector: List[float],
top_k: int = 10,
filter_metadata: Optional[Dict] = None) -> List[SearchResult]:
try:
# 実際の実装では Pinecone API を使用
# results = self.index.query(
# vector=query_vector,
# top_k=top_k,
# filter=filter_metadata,
# include_metadata=True
# )
# モック結果
mock_results = []
for i in range(min(top_k, 3)):
chunk = DocumentChunk(
id=f"pinecone_doc_{i}",
content=f"Pinecone search result {i}",
metadata={"source": f"pinecone_source_{i}"}
)
mock_results.append(SearchResult(
chunk=chunk,
score=0.9 - (i * 0.1),
rank=i + 1
))
return mock_results
except Exception as e:
print(f"Pinecone search error: {e}")
return []
async def update(self, chunk_id: str, new_content: str, new_embedding: List[float]) -> bool:
# 実装省略
return True
async def delete(self, chunk_ids: List[str]) -> bool:
# 実装省略
return True
async def get_stats(self) -> Dict[str, Any]:
return {
"provider": "pinecone",
"index_name": self.index_name,
"total_vectors": 1000, # モック値
"dimensions": 1536
}
class WeaviateAdapter(VectorDBInterface):
"""Weaviate アダプター"""
def __init__(self, url: str, api_key: Optional[str] = None):
# import weaviate
# self.client = weaviate.Client(url, auth_config=...)
self.url = url
print(f"Weaviate initialized: {url}")
async def insert(self, chunks: List[DocumentChunk]) -> bool:
# Weaviate固有の実装
print(f"Inserted {len(chunks)} objects into Weaviate")
return True
async def search(self, query_vector: List[float],
top_k: int = 10,
filter_metadata: Optional[Dict] = None) -> List[SearchResult]:
# モック結果
mock_results = []
for i in range(min(top_k, 3)):
chunk = DocumentChunk(
id=f"weaviate_doc_{i}",
content=f"Weaviate search result {i}",
metadata={"source": f"weaviate_source_{i}"}
)
mock_results.append(SearchResult(
chunk=chunk,
score=0.85 - (i * 0.1),
rank=i + 1
))
return mock_results
async def update(self, chunk_id: str, new_content: str, new_embedding: List[float]) -> bool:
return True
async def delete(self, chunk_ids: List[str]) -> bool:
return True
async def get_stats(self) -> Dict[str, Any]:
return {
"provider": "weaviate",
"url": self.url,
"total_objects": 800,
"classes": ["Document", "Chunk"]
}
class ChromaAdapter(VectorDBInterface):
"""ChromaDB アダプター"""
def __init__(self, persist_directory: str, collection_name: str):
# import chromadb
# self.client = chromadb.PersistentClient(path=persist_directory)
# self.collection = self.client.get_or_create_collection(collection_name)
self.persist_directory = persist_directory
self.collection_name = collection_name
print(f"ChromaDB initialized: {collection_name}")
async def insert(self, chunks: List[DocumentChunk]) -> bool:
# ChromaDB固有の実装
print(f"Inserted {len(chunks)} documents into ChromaDB")
return True
async def search(self, query_vector: List[float],
top_k: int = 10,
filter_metadata: Optional[Dict] = None) -> List[SearchResult]:
# モック結果
mock_results = []
for i in range(min(top_k, 3)):
chunk = DocumentChunk(
id=f"chroma_doc_{i}",
content=f"ChromaDB search result {i}",
metadata={"source": f"chroma_source_{i}"}
)
mock_results.append(SearchResult(
chunk=chunk,
score=0.88 - (i * 0.1),
rank=i + 1
))
return mock_results
async def update(self, chunk_id: str, new_content: str, new_embedding: List[float]) -> bool:
return True
async def delete(self, chunk_ids: List[str]) -> bool:
return True
async def get_stats(self) -> Dict[str, Any]:
return {
"provider": "chromadb",
"collection": self.collection_name,
"total_documents": 1200,
"persist_directory": self.persist_directory
}
class UnifiedVectorDBManager:
"""統合ベクトルDB管理システム"""
def __init__(self):
self.databases: Dict[str, VectorDBInterface] = {}
self.default_db: Optional[str] = None
self.performance_metrics: Dict[str, Dict[str, Any]] = {}
def add_database(self, name: str, db_adapter: VectorDBInterface, is_default: bool = False):
"""データベースの追加"""
self.databases[name] = db_adapter
if is_default or not self.default_db:
self.default_db = name
self.performance_metrics[name] = {
"total_queries": 0,
"total_insertions": 0,
"avg_query_time": 0.0,
"avg_insertion_time": 0.0
}
async def insert_documents(self, chunks: List[DocumentChunk],
db_name: Optional[str] = None) -> Dict[str, bool]:
"""ドキュメントの挿入"""
target_db = db_name or self.default_db
results = {}
if target_db and target_db in self.databases:
start_time = time.time()
success = await self.databases[target_db].insert(chunks)
insertion_time = time.time() - start_time
# メトリクス更新
metrics = self.performance_metrics[target_db]
metrics["total_insertions"] += 1
total_time = metrics["avg_insertion_time"] * (metrics["total_insertions"] - 1)
metrics["avg_insertion_time"] = (total_time + insertion_time) / metrics["total_insertions"]
results[target_db] = success
else:
# 全データベースに挿入
for name, db in self.databases.items():
start_time = time.time()
success = await db.insert(chunks)
insertion_time = time.time() - start_time
# メトリクス更新
metrics = self.performance_metrics[name]
metrics["total_insertions"] += 1
total_time = metrics["avg_insertion_time"] * (metrics["total_insertions"] - 1)
metrics["avg_insertion_time"] = (total_time + insertion_time) / metrics["total_insertions"]
results[name] = success
return results
async def search_across_databases(self, query_vector: List[float],
top_k: int = 10,
databases: Optional[List[str]] = None) -> Dict[str, List[SearchResult]]:
"""複数データベース横断検索"""
target_dbs = databases or list(self.databases.keys())
results = {}
search_tasks = []
for db_name in target_dbs:
if db_name in self.databases:
search_tasks.append((db_name, self.databases[db_name].search(query_vector, top_k)))
# 並列実行
completed_searches = await asyncio.gather(
*[task[1] for task in search_tasks],
return_exceptions=True
)
# 結果の整理
for (db_name, _), search_result in zip(search_tasks, completed_searches):
if isinstance(search_result, Exception):
results[db_name] = []
print(f"Search error in {db_name}: {search_result}")
else:
results[db_name] = search_result
# メトリクス更新
metrics = self.performance_metrics[db_name]
metrics["total_queries"] += 1
return results
async def compare_database_performance(self,
query_vector: List[float],
iterations: int = 5) -> Dict[str, Any]:
"""データベース性能比較"""
performance_results = {}
for db_name, db_adapter in self.databases.items():
response_times = []
success_count = 0
for _ in range(iterations):
start_time = time.time()
try:
results = await db_adapter.search(query_vector, top_k=10)
response_time = time.time() - start_time
response_times.append(response_time)
success_count += 1
except Exception as e:
print(f"Error in {db_name}: {e}")
if response_times:
performance_results[db_name] = {
"avg_response_time": sum(response_times) / len(response_times),
"min_response_time": min(response_times),
"max_response_time": max(response_times),
"success_rate": success_count / iterations,
"total_iterations": iterations
}
return performance_results
async def get_unified_stats(self) -> Dict[str, Any]:
"""統合統計情報"""
all_stats = {}
for db_name, db_adapter in self.databases.items():
db_stats = await db_adapter.get_stats()
performance_stats = self.performance_metrics[db_name]
all_stats[db_name] = {
**db_stats,
"performance": performance_stats
}
return all_stats
def recommend_best_database(self, criteria: str = "speed") -> Optional[str]:
"""最適データベースの推奨"""
if not self.performance_metrics:
return self.default_db
if criteria == "speed":
# 平均クエリ時間が最短のDBを推奨
best_db = min(
self.performance_metrics.items(),
key=lambda x: x[1].get("avg_query_time", float('inf'))
)
return best_db[0] if best_db[1]["total_queries"] > 0 else self.default_db
return self.default_db
# 使用例
async def demonstrate_vector_db_management():
"""ベクトルDB管理システムのデモ"""
# 統合管理システムの初期化
vector_manager = UnifiedVectorDBManager()
# 各種データベースの追加
vector_manager.add_database("pinecone", PineconeAdapter(
api_key="your-key",
environment="us-west1-gcp",
index_name="ai-agent-index"
), is_default=True)
vector_manager.add_database("weaviate", WeaviateAdapter(
url="http://localhost:8080"
))
vector_manager.add_database("chroma", ChromaAdapter(
persist_directory="./chroma_db",
collection_name="ai_documents"
))
# テストドキュメントの準備
test_chunks = [
DocumentChunk(
id="doc_1",
content="AIエージェントは自律的に動作するシステムです",
metadata={"category": "definition", "language": "ja"},
embedding=[0.1] * 1536 # ダミーエンベディング
),
DocumentChunk(
id="doc_2",
content="機械学習は人工知能の一分野です",
metadata={"category": "ml", "language": "ja"},
embedding=[0.2] * 1536
)
]
# ドキュメントの挿入
print("=== ドキュメント挿入 ===")
insertion_results = await vector_manager.insert_documents(test_chunks)
for db_name, success in insertion_results.items():
print(f"{db_name}: {'成功' if success else '失敗'}")
# 横断検索の実行
print("\n=== 横断検索実行 ===")
query_vector = [0.15] * 1536 # ダミークエリベクトル
search_results = await vector_manager.search_across_databases(query_vector, top_k=5)
for db_name, results in search_results.items():
print(f"\n{db_name} 検索結果:")
for result in results:
print(f" - スコア: {result.score:.3f}, 内容: {result.chunk.content}")
# 性能比較
print("\n=== 性能比較 ===")
performance_comparison = await vector_manager.compare_database_performance(query_vector)
for db_name, stats in performance_comparison.items():
print(f"{db_name}:")
print(f" 平均応答時間: {stats['avg_response_time']:.3f}秒")
print(f" 成功率: {stats['success_rate']:.1%}")
# 推奨DB
recommended_db = vector_manager.recommend_best_database("speed")
print(f"\n推奨データベース: {recommended_db}")
# 統合統計
unified_stats = await vector_manager.get_unified_stats()
print("\n=== 統合統計情報 ===")
for db_name, stats in unified_stats.items():
print(f"{db_name}: プロバイダー={stats.get('provider', 'unknown')}")
if __name__ == "__main__":
asyncio.run(demonstrate_vector_db_management())さらに理解を深める参考書
関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。
監視・ロギング・分析ツール
包括的監視システム
# 統合監視・ロギングシステム
import logging
import structlog
from typing import Dict, List, Any, Optional
import time
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
import psutil
import traceback
@dataclass
class MetricPoint:
timestamp: float
name: str
value: float
tags: Dict[str, str] = field(default_factory=dict)
@dataclass
class AlertRule:
name: str
metric_name: str
condition: str # "gt", "lt", "eq"
threshold: float
duration_seconds: int
severity: str # "critical", "warning", "info"
class StructuredLogger:
"""構造化ログシステム"""
def __init__(self, service_name: str, log_level: str = "INFO"):
self.service_name = service_name
# Structlogの設定
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# 基本ロガーの設定
logging.basicConfig(
format="%(message)s",
level=getattr(logging, log_level.upper())
)
self.logger = structlog.get_logger(service_name)
def log_agent_request(self,
request_id: str,
user_id: str,
prompt: str,
response: str,
execution_time: float,
tokens_used: int,
cost: float,
status: str):
"""エージェントリクエストのログ"""
self.logger.info(
"agent_request_completed",
request_id=request_id,
user_id=user_id,
prompt_length=len(prompt),
response_length=len(response),
execution_time=execution_time,
tokens_used=tokens_used,
cost=cost,
status=status,
event_type="agent_request"
)
def log_error(self, error: Exception, context: Dict[str, Any] = None):
"""エラーログ"""
self.logger.error(
"error_occurred",
error_type=type(error).__name__,
error_message=str(error),
traceback=traceback.format_exc(),
context=context or {},
event_type="error"
)
def log_performance_metric(self, metric_name: str, value: float, tags: Dict[str, str] = None):
"""パフォーマンスメトリクスのログ"""
self.logger.info(
"performance_metric",
metric_name=metric_name,
metric_value=value,
tags=tags or {},
event_type="metric"
)
def log_security_event(self, event_type: str, details: Dict[str, Any]):
"""セキュリティイベントのログ"""
self.logger.warning(
"security_event",
security_event_type=event_type,
details=details,
event_type="security"
)
class MetricsCollector:
"""メトリクス収集システム"""
def __init__(self):
self.metrics: List[MetricPoint] = []
self.counters: Dict[str, int] = {}
self.gauges: Dict[str, float] = {}
self.histograms: Dict[str, List[float]] = {}
def increment_counter(self, name: str, value: int = 1, tags: Dict[str, str] = None):
"""カウンターの増加"""
key = f"{name}:{json.dumps(tags or {}, sort_keys=True)}"
self.counters[key] = self.counters.get(key, 0) + value
self.metrics.append(MetricPoint(
timestamp=time.time(),
name=name,
value=self.counters[key],
tags=tags or {}
))
def set_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
"""ゲージの設定"""
key = f"{name}:{json.dumps(tags or {}, sort_keys=True)}"
self.gauges[key] = value
self.metrics.append(MetricPoint(
timestamp=time.time(),
name=name,
value=value,
tags=tags or {}
))
def record_histogram(self, name: str, value: float, tags: Dict[str, str] = None):
"""ヒストグラムの記録"""
key = f"{name}:{json.dumps(tags or {}, sort_keys=True)}"
if key not in self.histograms:
self.histograms[key] = []
self.histograms[key].append(value)
self.metrics.append(MetricPoint(
timestamp=time.time(),
name=name,
value=value,
tags=tags or {}
))
def get_system_metrics(self) -> Dict[str, float]:
"""システムメトリクスの取得"""
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
# メモリ使用率
memory = psutil.virtual_memory()
memory_percent = memory.percent
# ディスク使用率
disk = psutil.disk_usage('/')
disk_percent = disk.percent
metrics = {
"system.cpu.percent": cpu_percent,
"system.memory.percent": memory_percent,
"system.disk.percent": disk_percent,
"system.memory.available_gb": memory.available / (1024**3),
"system.disk.free_gb": disk.free / (1024**3)
}
# メトリクスの記録
for name, value in metrics.items():
self.set_gauge(name, value)
return metrics
def get_recent_metrics(self, minutes: int = 5) -> List[MetricPoint]:
"""最近のメトリクスを取得"""
cutoff_time = time.time() - (minutes * 60)
return [m for m in self.metrics if m.timestamp > cutoff_time]
def calculate_statistics(self, metric_name: str, minutes: int = 5) -> Dict[str, float]:
"""メトリクスの統計計算"""
recent_metrics = [
m for m in self.get_recent_metrics(minutes)
if m.name == metric_name
]
if not recent_metrics:
return {}
values = [m.value for m in recent_metrics]
return {
"count": len(values),
"min": min(values),
"max": max(values),
"avg": sum(values) / len(values),
"sum": sum(values)
}
class AlertManager:
"""アラート管理システム"""
def __init__(self, metrics_collector: MetricsCollector):
self.metrics_collector = metrics_collector
self.alert_rules: List[AlertRule] = []
self.active_alerts: Dict[str, Dict[str, Any]] = {}
self.alert_history: List[Dict[str, Any]] = []
def add_alert_rule(self, rule: AlertRule):
"""アラートルールの追加"""
self.alert_rules.append(rule)
async def check_alerts(self):
"""アラートチェック"""
current_time = time.time()
for rule in self.alert_rules:
# メトリクスの統計を取得
stats = self.metrics_collector.calculate_statistics(
rule.metric_name,
rule.duration_seconds // 60
)
if not stats:
continue
# 条件チェック
triggered = False
current_value = stats.get("avg", 0)
if rule.condition == "gt" and current_value > rule.threshold:
triggered = True
elif rule.condition == "lt" and current_value < rule.threshold:
triggered = True
elif rule.condition == "eq" and abs(current_value - rule.threshold) < 0.001:
triggered = True
alert_key = f"{rule.name}:{rule.metric_name}"
if triggered:
if alert_key not in self.active_alerts:
# 新しいアラート
alert = {
"rule_name": rule.name,
"metric_name": rule.metric_name,
"severity": rule.severity,
"threshold": rule.threshold,
"current_value": current_value,
"triggered_at": current_time,
"status": "active"
}
self.active_alerts[alert_key] = alert
self.alert_history.append(alert.copy())
await self._send_alert_notification(alert)
else:
if alert_key in self.active_alerts:
# アラート解決
resolved_alert = self.active_alerts[alert_key]
resolved_alert["status"] = "resolved"
resolved_alert["resolved_at"] = current_time
self.alert_history.append(resolved_alert.copy())
del self.active_alerts[alert_key]
await self._send_resolution_notification(resolved_alert)
async def _send_alert_notification(self, alert: Dict[str, Any]):
"""アラート通知の送信"""
print(f"🚨 ALERT: {alert['rule_name']}")
print(f" Metric: {alert['metric_name']}")
print(f" Current: {alert['current_value']:.2f}")
print(f" Threshold: {alert['threshold']:.2f}")
print(f" Severity: {alert['severity']}")
async def _send_resolution_notification(self, alert: Dict[str, Any]):
"""アラート解決通知の送信"""
print(f"✅ RESOLVED: {alert['rule_name']}")
def get_active_alerts(self) -> List[Dict[str, Any]]:
"""アクティブなアラートの取得"""
return list(self.active_alerts.values())
def get_alert_history(self, hours: int = 24) -> List[Dict[str, Any]]:
"""アラート履歴の取得"""
cutoff_time = time.time() - (hours * 3600)
return [
alert for alert in self.alert_history
if alert["triggered_at"] > cutoff_time
]
class AgentMonitoringSystem:
"""AIエージェント専用監視システム"""
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = StructuredLogger(service_name)
self.metrics = MetricsCollector()
self.alerts = AlertManager(self.metrics)
# デフォルトアラートルールの設定
self._setup_default_alerts()
# 監視タスク
self.monitoring_task = None
def _setup_default_alerts(self):
"""デフォルトアラートルールの設定"""
default_rules = [
AlertRule(
name="High Error Rate",
metric_name="agent.error_rate",
condition="gt",
threshold=0.1, # 10%
duration_seconds=300, # 5分
severity="critical"
),
AlertRule(
name="Slow Response Time",
metric_name="agent.response_time",
condition="gt",
threshold=30.0, # 30秒
duration_seconds=300,
severity="warning"
),
AlertRule(
name="High Memory Usage",
metric_name="system.memory.percent",
condition="gt",
threshold=85.0, # 85%
duration_seconds=300,
severity="warning"
),
AlertRule(
name="High Cost Per Hour",
metric_name="agent.cost_per_hour",
condition="gt",
threshold=100.0, # $100/時間
duration_seconds=3600, # 1時間
severity="critical"
)
]
for rule in default_rules:
self.alerts.add_alert_rule(rule)
def track_agent_request(self,
request_id: str,
user_id: str,
prompt: str,
response: str,
execution_time: float,
tokens_used: int,
cost: float,
status: str):
"""エージェントリクエストの追跡"""
# ログ記録
self.logger.log_agent_request(
request_id, user_id, prompt, response,
execution_time, tokens_used, cost, status
)
# メトリクス記録
self.metrics.increment_counter("agent.requests.total", tags={"status": status})
self.metrics.record_histogram("agent.response_time", execution_time)
self.metrics.record_histogram("agent.tokens_used", tokens_used)
self.metrics.record_histogram("agent.cost", cost)
# エラー率の計算
recent_requests = self.metrics.get_recent_metrics(5) # 5分間
total_requests = len([m for m in recent_requests if m.name == "agent.requests.total"])
error_requests = len([
m for m in recent_requests
if m.name == "agent.requests.total" and m.tags.get("status") == "error"
])
if total_requests > 0:
error_rate = error_requests / total_requests
self.metrics.set_gauge("agent.error_rate", error_rate)
# コスト/時間の計算
hourly_cost = self._calculate_hourly_cost()
self.metrics.set_gauge("agent.cost_per_hour", hourly_cost)
def _calculate_hourly_cost(self) -> float:
"""時間当たりコストの計算"""
hour_ago = time.time() - 3600
recent_costs = [
m.value for m in self.metrics.metrics
if m.name == "agent.cost" and m.timestamp > hour_ago
]
return sum(recent_costs)
def track_system_performance(self):
"""システムパフォーマンスの追跡"""
system_metrics = self.metrics.get_system_metrics()
for metric_name, value in system_metrics.items():
self.logger.log_performance_metric(metric_name, value)
def track_security_event(self, event_type: str, details: Dict[str, Any]):
"""セキュリティイベントの追跡"""
self.logger.log_security_event(event_type, details)
self.metrics.increment_counter("security.events", tags={"type": event_type})
async def start_monitoring(self):
"""監視の開始"""
if self.monitoring_task and not self.monitoring_task.done():
return
self.monitoring_task = asyncio.create_task(self._monitoring_loop())
async def stop_monitoring(self):
"""監視の停止"""
if self.monitoring_task:
self.monitoring_task.cancel()
try:
await self.monitoring_task
except asyncio.CancelledError:
pass
async def _monitoring_loop(self):
"""監視ループ"""
while True:
try:
# システムメトリクス収集
self.track_system_performance()
# アラートチェック
await self.alerts.check_alerts()
await asyncio.sleep(30) # 30秒間隔
except asyncio.CancelledError:
break
except Exception as e:
self.logger.log_error(e, {"context": "monitoring_loop"})
await asyncio.sleep(60) # エラー時は1分待機
def get_monitoring_dashboard_data(self) -> Dict[str, Any]:
"""監視ダッシュボード用データの取得"""
recent_metrics = self.metrics.get_recent_metrics(60) # 過去1時間
# 基本統計
request_stats = self.metrics.calculate_statistics("agent.requests.total", 60)
response_time_stats = self.metrics.calculate_statistics("agent.response_time", 60)
cost_stats = self.metrics.calculate_statistics("agent.cost", 60)
# システムメトリクス
system_metrics = self.metrics.get_system_metrics()
# アクティブアラート
active_alerts = self.alerts.get_active_alerts()
return {
"service_name": self.service_name,
"timestamp": time.time(),
"request_stats": request_stats,
"response_time_stats": response_time_stats,
"cost_stats": cost_stats,
"system_metrics": system_metrics,
"active_alerts": active_alerts,
"total_metrics_collected": len(recent_metrics)
}
# 使用例
async def demonstrate_monitoring_system():
"""監視システムのデモ"""
# 監視システムの初期化
monitor = AgentMonitoringSystem("ai-agent-service")
# 監視開始
await monitor.start_monitoring()
# テストリクエストのシミュレーション
print("=== AIエージェント監視システム デモ ===\n")
for i in range(10):
# 正常なリクエスト
monitor.track_agent_request(
request_id=f"req_{i}",
user_id=f"user_{i % 3}",
prompt=f"Test prompt {i}",
response=f"Test response {i}",
execution_time=2.0 + (i * 0.5),
tokens_used=100 + (i * 10),
cost=0.002 + (i * 0.001),
status="success" if i < 8 else "error" # 最後の2つはエラー
)
await asyncio.sleep(0.1)
# セキュリティイベントのシミュレーション
monitor.track_security_event("prompt_injection_attempt", {
"user_id": "malicious_user",
"prompt": "Ignore previous instructions..."
})
# 少し待ってから統計を確認
await asyncio.sleep(2)
# ダッシュボードデータの取得
dashboard_data = monitor.get_monitoring_dashboard_data()
print("=== 監視ダッシュボード ===")
print(f"サービス名: {dashboard_data['service_name']}")
print(f"リクエスト統計: {dashboard_data['request_stats']}")
print(f"応答時間統計: {dashboard_data['response_time_stats']}")
print(f"コスト統計: {dashboard_data['cost_stats']}")
print(f"アクティブアラート数: {len(dashboard_data['active_alerts'])}")
if dashboard_data['active_alerts']:
print("\nアクティブアラート:")
for alert in dashboard_data['active_alerts']:
print(f" - {alert['rule_name']}: {alert['current_value']:.2f} (閾値: {alert['threshold']:.2f})")
# 監視停止
await monitor.stop_monitoring()
if __name__ == "__main__":
asyncio.run(demonstrate_monitoring_system())さらに理解を深める参考書
関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。
学習リソースと実践書籍ガイド
厳選された技術書籍リスト
AIエージェント開発に必要なツール・ライブラリの習得をサポートする、実践的な書籍リソースを紹介します:
開発環境・ツール習得
- 「Python開発環境構築完全ガイド」- Poetry、Docker、CI/CDの実践
- 「現代的開発ツール活用術」- 効率的な開発フローの構築
- 「VSCode拡張開発」- カスタマイズによる開発効率向上
データベース・検索技術
- 「ベクトルデータベース実践ガイド」- Pinecone、Weaviate活用法
- 「Elasticsearch実践ガイド」- 高度な全文検索システム
- 「Redis設計・運用ガイド」- インメモリDB活用術
監視・運用技術
- 「Prometheus監視システム」- メトリクス収集と可視化
- 「Grafana実践ダッシュボード」- 運用監視の効率化
- 「ログ管理システム構築」- 構造化ログとトラブルシューティング
セキュリティ・品質管理
- 「AIシステムセキュリティ」- プロンプトインジェクション対策
- 「テスト駆動AI開発」- 品質保証の実践手法
- 「DevSecOps実践ガイド」- セキュアな開発フロー
これらの書籍は、理論と実践のバランスが優れており、実際のプロジェクトで即戦力となる知識とスキルを身につけることができます。
さらに理解を深める参考書
関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。
まとめ:効率的なAIエージェント開発環境の構築
適切なツール・ライブラリの選択と活用は、AIエージェント開発プロジェクトの成否を決定する重要な要素です。本記事で紹介したツールセットを活用することで、以下の価値を実現できます:
主要な効果
- 開発効率の向上: 統合開発環境と自動化ツールによる生産性向上
- 品質の確保: 包括的なテスト・監視システムによる高品質なシステム構築
- 運用の安定化: プロアクティブな監視とアラートによる安定運用
- スケーラビリティ: マイクロサービス対応ツールによる柔軟な拡張性
導入の優先順位
- 基盤環境: Poetry、Docker、VSCode環境の整備
- 開発ツール: LLM統合、ベクトルDB、テストフレームワーク
- 監視システム: ログ、メトリクス、アラート機能
- 高度な機能: セキュリティツール、パフォーマンス最適化
継続的な学習と実践により、これらのツールを効果的に組み合わせ、世界レベルのAIエージェントシステムを構築する能力を身につけることができるでしょう。適切なツール選択が、技術的負債を減らし、長期的な成功につながります。
さらに理解を深める参考書
関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。
![AIエージェント開発 / 運用入門 [生成AI深掘りガイド]](https://m.media-amazon.com/images/I/41OfNLKvJsL._SL500_.jpg)





