Tasuke HubLearn · Solve · Grow
#AIエージェント

【2025年完全版】AIエージェント開発必須ツール・ライブラリ大全:効率的な開発環境構築から運用まで

AIエージェント開発に必要なツール・ライブラリを完全網羅。LLM統合、ベクトルDB、監視ツール、セキュリティ対策まで、現場で使える厳選ツールセットを実装例と共に詳しく解説。開発効率を劇的に向上させる環境構築ガイド。

時計のアイコン23 July, 2025
TH

Tasuke Hub管理人

東証プライム市場上場企業エンジニア

情報系修士卒業後、大手IT企業にてフルスタックエンジニアとして活躍。 Webアプリケーション開発からクラウドインフラ構築まで幅広い技術に精通し、 複数のプロジェクトでリードエンジニアを担当。 技術ブログやオープンソースへの貢献を通じて、日本のIT技術コミュニティに積極的に関わっている。

🎓情報系修士🏢東証プライム上場企業💻フルスタックエンジニア📝技術ブログ執筆者

AIエージェント開発ツール選択の重要性

AIエージェント開発における適切なツール・ライブラリの選択は、プロジェクトの成功を決定づける重要な要素です。2025年現在、エコシステムは急速に拡大しており、目的に応じた最適なツール組み合わせの選択が開発効率と最終的な品質に大きく影響します。

ツール選択の評価軸

開発ツール選択時の主要な評価軸:

  1. 学習コストと導入障壁: チーム全体での習得容易性
  2. 統合性: 他のツールとの連携しやすさ
  3. 拡張性: 将来的な機能追加への対応力
  4. コミュニティサポート: 情報共有とトラブルシューティング
  5. コストパフォーマンス: 機能に対する価格の妥当性
  6. 企業向け機能: セキュリティ、コンプライアンス対応
ベストマッチ

最短で課題解決する一冊

この記事の内容と高い親和性が確認できたベストマッチです。早めにチェックしておきましょう。

必須開発環境・基盤ツール

Python環境管理:Poetry vs Pipenv vs Conda

AIエージェント開発では、複雑な依存関係の管理が重要です。

# Poetry を使った プロジェクト設定例
# pyproject.toml

[tool.poetry]
name = "ai-agent-project"
version = "0.1.0"
description = "Advanced AI Agent Development Project"
authors = ["Your Name <your.email@example.com>"]

[tool.poetry.dependencies]
python = "^3.11"
langchain = "^0.1.0"
openai = "^1.0.0"
pinecone-client = "^3.0.0"
redis = "^5.0.0"
fastapi = "^0.104.0"
uvicorn = "^0.24.0"
pydantic = "^2.5.0"
sqlalchemy = "^2.0.0"
alembic = "^1.13.0"
pytest = "^7.4.0"
pytest-asyncio = "^0.23.0"
black = "^23.11.0"
ruff = "^0.1.6"
mypy = "^1.7.0"

[tool.poetry.group.dev.dependencies]
jupyter = "^1.0.0"
ipython = "^8.17.0"
notebook = "^7.0.0"
pre-commit = "^3.5.0"
pytest-cov = "^4.1.0"
pytest-mock = "^3.12.0"

[tool.poetry.group.monitoring.dependencies]
prometheus-client = "^0.19.0"
grafana-api = "^1.0.3"
sentry-sdk = "^1.38.0"
structlog = "^23.2.0"

[tool.poetry.group.deployment.dependencies]
docker = "^6.1.0"
kubernetes = "^28.1.0"
gunicorn = "^21.2.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

# 開発環境セットアップスクリプト
import subprocess
import sys
from pathlib import Path

class DevelopmentEnvironmentSetup:
    """開発環境の自動セットアップ"""
    
    def __init__(self, project_name: str):
        self.project_name = project_name
        self.project_path = Path(project_name)
    
    def setup_complete_environment(self):
        """完全な開発環境のセットアップ"""
        
        print("🚀 AIエージェント開発環境セットアップ開始")
        
        # 1. プロジェクトディレクトリ作成
        self._create_project_structure()
        
        # 2. Poetry初期化
        self._setup_poetry()
        
        # 3. Pre-commit設定
        self._setup_precommit()
        
        # 4. Docker環境設定
        self._setup_docker()
        
        # 5. VSCode設定
        self._setup_vscode()
        
        # 6. 環境変数テンプレート
        self._create_env_template()
        
        print("✅ 開発環境セットアップ完了!")
    
    def _create_project_structure(self):
        """プロジェクト構造の作成"""
        directories = [
            "src/agents",
            "src/tools", 
            "src/models",
            "src/services",
            "src/utils",
            "tests/unit",
            "tests/integration", 
            "tests/performance",
            "docs",
            "scripts",
            "config",
            "data/raw",
            "data/processed",
            "logs",
            "monitoring",
            "deployment/docker",
            "deployment/k8s"
        ]
        
        for directory in directories:
            (self.project_path / directory).mkdir(parents=True, exist_ok=True)
            
        # __init__.py ファイルの作成
        init_files = [
            "src/__init__.py",
            "src/agents/__init__.py", 
            "src/tools/__init__.py",
            "src/models/__init__.py",
            "src/services/__init__.py",
            "src/utils/__init__.py"
        ]
        
        for init_file in init_files:
            (self.project_path / init_file).touch()
    
    def _setup_poetry(self):
        """Poetry設定"""
        pyproject_content = '''[tool.poetry]
name = "ai-agent-project"
version = "0.1.0"
description = "Advanced AI Agent Development Project"
authors = ["Your Name <your.email@example.com>"]

[tool.poetry.dependencies]
python = "^3.11"
langchain = "^0.1.0"
openai = "^1.0.0"
# ... (省略)

[tool.black]
line-length = 100
target-version = ['py311']
include = '\\.pyi?$'

[tool.ruff]
target-version = "py311"
line-length = 100
select = ["E", "F", "I", "W", "UP", "N", "YTT", "S", "BLE", "FBT", "B", "A", "C4", "EM", "ICN", "G", "PIE", "T20", "PT", "Q", "RET", "SIM", "TID", "ARG", "PTH", "ERA", "PL", "TRY", "RUF"]

[tool.mypy]
python_version = "3.11"
warn_return_any = true
strict_optional = true
warn_no_return = true
warn_redundant_casts = true
warn_unused_ignores = true
show_error_codes = true

[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "--verbose --tb=short --cov=src --cov-report=html --cov-report=xml"
'''
        
        (self.project_path / "pyproject.toml").write_text(pyproject_content)
    
    def _setup_precommit(self):
        """Pre-commit設定"""
        precommit_content = '''repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.5.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-added-large-files
      - id: check-merge-conflict
      - id: debug-statements

  - repo: https://github.com/psf/black
    rev: 23.11.0
    hooks:
      - id: black
        language_version: python3.11

  - repo: https://github.com/charliermarsh/ruff-pre-commit
    rev: v0.1.6
    hooks:
      - id: ruff
        args: [--fix, --exit-non-zero-on-fix]

  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: v1.7.0
    hooks:
      - id: mypy
        additional_dependencies: [types-all]
'''
        
        (self.project_path / ".pre-commit-config.yaml").write_text(precommit_content)
    
    def _setup_docker(self):
        """Docker設定"""
        dockerfile_content = '''FROM python:3.11-slim

WORKDIR /app

# システム依存関係のインストール
RUN apt-get update && apt-get install -y \\
    build-essential \\
    curl \\
    && rm -rf /var/lib/apt/lists/*

# Poetryのインストール
RUN pip install poetry

# プロジェクトファイルのコピー
COPY pyproject.toml poetry.lock* ./

# 依存関係のインストール
RUN poetry config virtualenvs.create false \\
    && poetry install --no-interaction --no-ansi

# ソースコードのコピー
COPY . .

# アプリケーションの実行
CMD ["python", "-m", "src.main"]
'''
        
        docker_compose_content = '''version: '3.8'

services:
  ai-agent:
    build: .
    ports:
      - "8000:8000"
    environment:
      - ENVIRONMENT=development
    volumes:
      - ./logs:/app/logs
    depends_on:
      - redis
      - postgres
      - pinecone

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: ai_agent_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  monitoring:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml

volumes:
  redis_data:
  postgres_data:
'''
        
        (self.project_path / "Dockerfile").write_text(dockerfile_content)
        (self.project_path / "docker-compose.yml").write_text(docker_compose_content)
    
    def _setup_vscode(self):
        """VSCode設定"""
        vscode_settings = '''{
    "python.defaultInterpreterPath": "./venv/bin/python",
    "python.linting.enabled": true,
    "python.linting.pylintEnabled": false,
    "python.linting.ruffEnabled": true,
    "python.formatting.provider": "black",
    "python.testing.pytestEnabled": true,
    "editor.formatOnSave": true,
    "editor.codeActionsOnSave": {
        "source.organizeImports": true
    },
    "files.exclude": {
        "**/__pycache__": true,
        "**/.pytest_cache": true,
        "**/node_modules": true
    }
}'''
        
        vscode_dir = self.project_path / ".vscode"
        vscode_dir.mkdir(exist_ok=True)
        (vscode_dir / "settings.json").write_text(vscode_settings)
    
    def _create_env_template(self):
        """環境変数テンプレート"""
        env_template = '''# AI Service API Keys
OPENAI_API_KEY=your_openai_api_key_here
ANTHROPIC_API_KEY=your_anthropic_api_key_here
GOOGLE_API_KEY=your_google_api_key_here

# Vector Database
PINECONE_API_KEY=your_pinecone_api_key_here
PINECONE_ENVIRONMENT=your_pinecone_environment_here

# Database
DATABASE_URL=postgresql://postgres:password@localhost:5432/ai_agent_db
REDIS_URL=redis://localhost:6379

# Monitoring
SENTRY_DSN=your_sentry_dsn_here
PROMETHEUS_GATEWAY=localhost:9091

# Application
ENVIRONMENT=development
LOG_LEVEL=INFO
DEBUG=true
'''
        
        (self.project_path / ".env.template").write_text(env_template)
        (self.project_path / ".env").write_text(env_template)

# 使用例
if __name__ == "__main__":
    setup = DevelopmentEnvironmentSetup("my-ai-agent-project")
    setup.setup_complete_environment()

IDEとコードエディタ設定

VSCode 推奨拡張機能:

{
  "recommendations": [
    "ms-python.python",
    "ms-python.pylint",
    "ms-toolsai.jupyter",
    "charliermarsh.ruff",
    "ms-python.black-formatter",
    "ms-python.mypy-type-checker",
    "ms-vscode.vscode-json",
    "redhat.vscode-yaml",
    "ms-azuretools.vscode-docker",
    "ms-kubernetes-tools.vscode-kubernetes-tools",
    "github.copilot",
    "github.copilot-chat"
  ]
}

さらに理解を深める参考書

関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。

LLM統合・API管理ツール

統合LLMクライアント:LiteLLM

複数のLLMプロバイダーを統一的に扱うためのツールです。

# LiteLLMを使った統合LLM管理システム
import litellm
from typing import Dict, List, Any, Optional
import asyncio
import time
from dataclasses import dataclass
from enum import Enum

class LLMProvider(Enum):
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    GOOGLE = "google"
    COHERE = "cohere"
    AZURE = "azure"

@dataclass
class LLMConfig:
    provider: LLMProvider
    model: str
    api_key: str
    base_url: Optional[str] = None
    max_tokens: int = 1000
    temperature: float = 0.3
    timeout: int = 30

class UnifiedLLMManager:
    """統合LLM管理システム"""
    
    def __init__(self):
        self.configs: Dict[str, LLMConfig] = {}
        self.usage_stats: Dict[str, Dict[str, Any]] = {}
        self.rate_limits: Dict[str, Dict[str, Any]] = {}
        
    def add_provider(self, name: str, config: LLMConfig):
        """LLMプロバイダーの追加"""
        self.configs[name] = config
        self.usage_stats[name] = {
            "total_requests": 0,
            "total_tokens": 0,
            "total_cost": 0.0,
            "error_count": 0
        }
        self.rate_limits[name] = {
            "requests_per_minute": 60,
            "tokens_per_minute": 50000,
            "current_requests": 0,
            "current_tokens": 0,
            "reset_time": time.time() + 60
        }
    
    async def generate(self, 
                      provider_name: str, 
                      messages: List[Dict[str, str]], 
                      **kwargs) -> Dict[str, Any]:
        """統一的な生成API"""
        
        if provider_name not in self.configs:
            raise ValueError(f"Provider {provider_name} not configured")
        
        config = self.configs[provider_name]
        
        # レート制限チェック
        if not self._check_rate_limit(provider_name):
            raise Exception(f"Rate limit exceeded for {provider_name}")
        
        try:
            # LiteLLMを使った統一呼び出し
            response = await litellm.acompletion(
                model=f"{config.provider.value}/{config.model}",
                messages=messages,
                api_key=config.api_key,
                max_tokens=config.max_tokens,
                temperature=config.temperature,
                timeout=config.timeout,
                **kwargs
            )
            
            # 使用統計の更新
            self._update_usage_stats(provider_name, response)
            
            return {
                "content": response.choices[0].message.content,
                "model": response.model,
                "usage": {
                    "prompt_tokens": response.usage.prompt_tokens,
                    "completion_tokens": response.usage.completion_tokens,
                    "total_tokens": response.usage.total_tokens
                },
                "cost": self._calculate_cost(provider_name, response.usage.total_tokens),
                "provider": provider_name
            }
            
        except Exception as e:
            self.usage_stats[provider_name]["error_count"] += 1
            raise e
    
    def _check_rate_limit(self, provider_name: str) -> bool:
        """レート制限チェック"""
        limits = self.rate_limits[provider_name]
        current_time = time.time()
        
        # 制限リセット時間チェック
        if current_time > limits["reset_time"]:
            limits["current_requests"] = 0
            limits["current_tokens"] = 0
            limits["reset_time"] = current_time + 60
        
        # 制限チェック
        if limits["current_requests"] >= limits["requests_per_minute"]:
            return False
        
        return True
    
    def _update_usage_stats(self, provider_name: str, response):
        """使用統計の更新"""
        stats = self.usage_stats[provider_name]
        limits = self.rate_limits[provider_name]
        
        tokens = response.usage.total_tokens
        
        stats["total_requests"] += 1
        stats["total_tokens"] += tokens
        stats["total_cost"] += self._calculate_cost(provider_name, tokens)
        
        limits["current_requests"] += 1
        limits["current_tokens"] += tokens
    
    def _calculate_cost(self, provider_name: str, tokens: int) -> float:
        """コスト計算"""
        # 簡単なコスト計算(実際の実装では各プロバイダーの料金体系を使用)
        cost_per_1k_tokens = {
            "openai": 0.002,
            "anthropic": 0.008,
            "google": 0.001,
            "cohere": 0.001
        }
        
        provider = self.configs[provider_name].provider.value
        rate = cost_per_1k_tokens.get(provider, 0.002)
        
        return (tokens / 1000) * rate
    
    def get_usage_report(self) -> Dict[str, Any]:
        """使用状況レポート"""
        total_cost = sum(stats["total_cost"] for stats in self.usage_stats.values())
        total_requests = sum(stats["total_requests"] for stats in self.usage_stats.values())
        
        return {
            "total_cost": total_cost,
            "total_requests": total_requests,
            "by_provider": self.usage_stats,
            "cost_breakdown": {
                name: stats["total_cost"] 
                for name, stats in self.usage_stats.items()
            }
        }
    
    async def compare_providers(self, 
                               prompt: str, 
                               providers: List[str]) -> Dict[str, Any]:
        """複数プロバイダーでの比較実行"""
        results = {}
        
        messages = [{"role": "user", "content": prompt}]
        
        tasks = [
            self.generate(provider, messages)
            for provider in providers
            if provider in self.configs
        ]
        
        provider_results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for provider, result in zip(providers, provider_results):
            if isinstance(result, Exception):
                results[provider] = {"error": str(result)}
            else:
                results[provider] = {
                    "content": result["content"],
                    "cost": result["cost"],
                    "tokens": result["usage"]["total_tokens"],
                    "model": result["model"]
                }
        
        return results

# API管理・監視ツール
class APIMonitor:
    """API使用状況の監視"""
    
    def __init__(self):
        self.metrics = {}
        self.alerts = []
    
    def track_request(self, provider: str, endpoint: str, 
                     response_time: float, status_code: int):
        """リクエストの追跡"""
        key = f"{provider}:{endpoint}"
        
        if key not in self.metrics:
            self.metrics[key] = {
                "total_requests": 0,
                "success_count": 0,
                "error_count": 0,
                "total_response_time": 0.0,
                "avg_response_time": 0.0
            }
        
        metrics = self.metrics[key]
        metrics["total_requests"] += 1
        metrics["total_response_time"] += response_time
        metrics["avg_response_time"] = metrics["total_response_time"] / metrics["total_requests"]
        
        if status_code == 200:
            metrics["success_count"] += 1
        else:
            metrics["error_count"] += 1
            
        # アラートチェック
        self._check_alerts(key, metrics)
    
    def _check_alerts(self, key: str, metrics: Dict[str, Any]):
        """アラートチェック"""
        error_rate = metrics["error_count"] / max(metrics["total_requests"], 1)
        
        # エラー率が高い場合
        if error_rate > 0.1:  # 10%
            self.alerts.append({
                "type": "high_error_rate",
                "endpoint": key,
                "error_rate": error_rate,
                "timestamp": time.time()
            })
        
        # 応答時間が長い場合
        if metrics["avg_response_time"] > 10.0:  # 10秒
            self.alerts.append({
                "type": "slow_response",
                "endpoint": key,
                "avg_response_time": metrics["avg_response_time"],
                "timestamp": time.time()
            })

# 使用例
async def demonstrate_llm_management():
    """LLM管理システムのデモ"""
    
    # LLM管理システムの初期化
    llm_manager = UnifiedLLMManager()
    
    # プロバイダーの設定
    llm_manager.add_provider("openai-gpt4", LLMConfig(
        provider=LLMProvider.OPENAI,
        model="gpt-4o",
        api_key="your-openai-key"
    ))
    
    llm_manager.add_provider("claude", LLMConfig(
        provider=LLMProvider.ANTHROPIC,
        model="claude-3-sonnet-20240229",
        api_key="your-anthropic-key"
    ))
    
    # 単一プロバイダーでの生成
    messages = [{"role": "user", "content": "AIエージェントの利点を3つ教えてください"}]
    
    result = await llm_manager.generate("openai-gpt4", messages)
    print(f"OpenAI Result: {result['content']}")
    print(f"Cost: ${result['cost']:.4f}")
    
    # 複数プロバイダーでの比較
    comparison = await llm_manager.compare_providers(
        "機械学習とディープラーニングの違いを説明してください",
        ["openai-gpt4", "claude"]
    )
    
    print("\n=== プロバイダー比較結果 ===")
    for provider, result in comparison.items():
        if "error" not in result:
            print(f"{provider}: コスト ${result['cost']:.4f}, トークン数 {result['tokens']}")
            print(f"応答: {result['content'][:100]}...\n")
    
    # 使用状況レポート
    usage_report = llm_manager.get_usage_report()
    print("=== 使用状況レポート ===")
    print(f"総コスト: ${usage_report['total_cost']:.4f}")
    print(f"総リクエスト数: {usage_report['total_requests']}")

if __name__ == "__main__":
    asyncio.run(demonstrate_llm_management())

さらに理解を深める参考書

関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。

ベクトルデータベース・検索ツール

主要ベクトルDB比較・統合ツール

# 統合ベクトルデータベース管理システム
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from dataclasses import dataclass
import asyncio
import time

@dataclass
class DocumentChunk:
    id: str
    content: str
    metadata: Dict[str, Any]
    embedding: Optional[List[float]] = None

@dataclass
class SearchResult:
    chunk: DocumentChunk
    score: float
    rank: int

class VectorDBInterface(ABC):
    """ベクトルDB統一インターフェース"""
    
    @abstractmethod
    async def insert(self, chunks: List[DocumentChunk]) -> bool:
        pass
    
    @abstractmethod
    async def search(self, query_vector: List[float], 
                    top_k: int = 10, 
                    filter_metadata: Optional[Dict] = None) -> List[SearchResult]:
        pass
    
    @abstractmethod
    async def update(self, chunk_id: str, 
                    new_content: str, 
                    new_embedding: List[float]) -> bool:
        pass
    
    @abstractmethod
    async def delete(self, chunk_ids: List[str]) -> bool:
        pass
    
    @abstractmethod
    async def get_stats(self) -> Dict[str, Any]:
        pass

class PineconeAdapter(VectorDBInterface):
    """Pinecone アダプター"""
    
    def __init__(self, api_key: str, environment: str, index_name: str):
        # import pinecone
        # pinecone.init(api_key=api_key, environment=environment)
        # self.index = pinecone.Index(index_name)
        self.index_name = index_name
        print(f"Pinecone initialized: {index_name}")
    
    async def insert(self, chunks: List[DocumentChunk]) -> bool:
        try:
            vectors = []
            for chunk in chunks:
                if chunk.embedding:
                    vectors.append({
                        "id": chunk.id,
                        "values": chunk.embedding,
                        "metadata": {
                            **chunk.metadata,
                            "content": chunk.content
                        }
                    })
            
            # self.index.upsert(vectors)
            print(f"Inserted {len(vectors)} vectors into Pinecone")
            return True
            
        except Exception as e:
            print(f"Pinecone insert error: {e}")
            return False
    
    async def search(self, query_vector: List[float], 
                    top_k: int = 10, 
                    filter_metadata: Optional[Dict] = None) -> List[SearchResult]:
        try:
            # 実際の実装では Pinecone API を使用
            # results = self.index.query(
            #     vector=query_vector,
            #     top_k=top_k,
            #     filter=filter_metadata,
            #     include_metadata=True
            # )
            
            # モック結果
            mock_results = []
            for i in range(min(top_k, 3)):
                chunk = DocumentChunk(
                    id=f"pinecone_doc_{i}",
                    content=f"Pinecone search result {i}",
                    metadata={"source": f"pinecone_source_{i}"}
                )
                mock_results.append(SearchResult(
                    chunk=chunk,
                    score=0.9 - (i * 0.1),
                    rank=i + 1
                ))
            
            return mock_results
            
        except Exception as e:
            print(f"Pinecone search error: {e}")
            return []
    
    async def update(self, chunk_id: str, new_content: str, new_embedding: List[float]) -> bool:
        # 実装省略
        return True
    
    async def delete(self, chunk_ids: List[str]) -> bool:
        # 実装省略
        return True
    
    async def get_stats(self) -> Dict[str, Any]:
        return {
            "provider": "pinecone",
            "index_name": self.index_name,
            "total_vectors": 1000,  # モック値
            "dimensions": 1536
        }

class WeaviateAdapter(VectorDBInterface):
    """Weaviate アダプター"""
    
    def __init__(self, url: str, api_key: Optional[str] = None):
        # import weaviate
        # self.client = weaviate.Client(url, auth_config=...)
        self.url = url
        print(f"Weaviate initialized: {url}")
    
    async def insert(self, chunks: List[DocumentChunk]) -> bool:
        # Weaviate固有の実装
        print(f"Inserted {len(chunks)} objects into Weaviate")
        return True
    
    async def search(self, query_vector: List[float], 
                    top_k: int = 10, 
                    filter_metadata: Optional[Dict] = None) -> List[SearchResult]:
        # モック結果
        mock_results = []
        for i in range(min(top_k, 3)):
            chunk = DocumentChunk(
                id=f"weaviate_doc_{i}",
                content=f"Weaviate search result {i}",
                metadata={"source": f"weaviate_source_{i}"}
            )
            mock_results.append(SearchResult(
                chunk=chunk,
                score=0.85 - (i * 0.1),
                rank=i + 1
            ))
        
        return mock_results
    
    async def update(self, chunk_id: str, new_content: str, new_embedding: List[float]) -> bool:
        return True
    
    async def delete(self, chunk_ids: List[str]) -> bool:
        return True
    
    async def get_stats(self) -> Dict[str, Any]:
        return {
            "provider": "weaviate",
            "url": self.url,
            "total_objects": 800,
            "classes": ["Document", "Chunk"]
        }

class ChromaAdapter(VectorDBInterface):
    """ChromaDB アダプター"""
    
    def __init__(self, persist_directory: str, collection_name: str):
        # import chromadb
        # self.client = chromadb.PersistentClient(path=persist_directory)
        # self.collection = self.client.get_or_create_collection(collection_name)
        self.persist_directory = persist_directory
        self.collection_name = collection_name
        print(f"ChromaDB initialized: {collection_name}")
    
    async def insert(self, chunks: List[DocumentChunk]) -> bool:
        # ChromaDB固有の実装
        print(f"Inserted {len(chunks)} documents into ChromaDB")
        return True
    
    async def search(self, query_vector: List[float], 
                    top_k: int = 10, 
                    filter_metadata: Optional[Dict] = None) -> List[SearchResult]:
        # モック結果
        mock_results = []
        for i in range(min(top_k, 3)):
            chunk = DocumentChunk(
                id=f"chroma_doc_{i}",
                content=f"ChromaDB search result {i}",
                metadata={"source": f"chroma_source_{i}"}
            )
            mock_results.append(SearchResult(
                chunk=chunk,
                score=0.88 - (i * 0.1),
                rank=i + 1
            ))
        
        return mock_results
    
    async def update(self, chunk_id: str, new_content: str, new_embedding: List[float]) -> bool:
        return True
    
    async def delete(self, chunk_ids: List[str]) -> bool:
        return True
    
    async def get_stats(self) -> Dict[str, Any]:
        return {
            "provider": "chromadb",
            "collection": self.collection_name,
            "total_documents": 1200,
            "persist_directory": self.persist_directory
        }

class UnifiedVectorDBManager:
    """統合ベクトルDB管理システム"""
    
    def __init__(self):
        self.databases: Dict[str, VectorDBInterface] = {}
        self.default_db: Optional[str] = None
        self.performance_metrics: Dict[str, Dict[str, Any]] = {}
    
    def add_database(self, name: str, db_adapter: VectorDBInterface, is_default: bool = False):
        """データベースの追加"""
        self.databases[name] = db_adapter
        if is_default or not self.default_db:
            self.default_db = name
        
        self.performance_metrics[name] = {
            "total_queries": 0,
            "total_insertions": 0,
            "avg_query_time": 0.0,
            "avg_insertion_time": 0.0
        }
    
    async def insert_documents(self, chunks: List[DocumentChunk], 
                              db_name: Optional[str] = None) -> Dict[str, bool]:
        """ドキュメントの挿入"""
        target_db = db_name or self.default_db
        results = {}
        
        if target_db and target_db in self.databases:
            start_time = time.time()
            success = await self.databases[target_db].insert(chunks)
            insertion_time = time.time() - start_time
            
            # メトリクス更新
            metrics = self.performance_metrics[target_db]
            metrics["total_insertions"] += 1
            total_time = metrics["avg_insertion_time"] * (metrics["total_insertions"] - 1)
            metrics["avg_insertion_time"] = (total_time + insertion_time) / metrics["total_insertions"]
            
            results[target_db] = success
        else:
            # 全データベースに挿入
            for name, db in self.databases.items():
                start_time = time.time()
                success = await db.insert(chunks)
                insertion_time = time.time() - start_time
                
                # メトリクス更新
                metrics = self.performance_metrics[name]
                metrics["total_insertions"] += 1
                total_time = metrics["avg_insertion_time"] * (metrics["total_insertions"] - 1)
                metrics["avg_insertion_time"] = (total_time + insertion_time) / metrics["total_insertions"]
                
                results[name] = success
        
        return results
    
    async def search_across_databases(self, query_vector: List[float], 
                                    top_k: int = 10,
                                    databases: Optional[List[str]] = None) -> Dict[str, List[SearchResult]]:
        """複数データベース横断検索"""
        target_dbs = databases or list(self.databases.keys())
        results = {}
        
        search_tasks = []
        for db_name in target_dbs:
            if db_name in self.databases:
                search_tasks.append((db_name, self.databases[db_name].search(query_vector, top_k)))
        
        # 並列実行
        completed_searches = await asyncio.gather(
            *[task[1] for task in search_tasks], 
            return_exceptions=True
        )
        
        # 結果の整理
        for (db_name, _), search_result in zip(search_tasks, completed_searches):
            if isinstance(search_result, Exception):
                results[db_name] = []
                print(f"Search error in {db_name}: {search_result}")
            else:
                results[db_name] = search_result
                
                # メトリクス更新
                metrics = self.performance_metrics[db_name]
                metrics["total_queries"] += 1
        
        return results
    
    async def compare_database_performance(self, 
                                         query_vector: List[float],
                                         iterations: int = 5) -> Dict[str, Any]:
        """データベース性能比較"""
        performance_results = {}
        
        for db_name, db_adapter in self.databases.items():
            response_times = []
            success_count = 0
            
            for _ in range(iterations):
                start_time = time.time()
                try:
                    results = await db_adapter.search(query_vector, top_k=10)
                    response_time = time.time() - start_time
                    response_times.append(response_time)
                    success_count += 1
                except Exception as e:
                    print(f"Error in {db_name}: {e}")
            
            if response_times:
                performance_results[db_name] = {
                    "avg_response_time": sum(response_times) / len(response_times),
                    "min_response_time": min(response_times),
                    "max_response_time": max(response_times),
                    "success_rate": success_count / iterations,
                    "total_iterations": iterations
                }
        
        return performance_results
    
    async def get_unified_stats(self) -> Dict[str, Any]:
        """統合統計情報"""
        all_stats = {}
        
        for db_name, db_adapter in self.databases.items():
            db_stats = await db_adapter.get_stats()
            performance_stats = self.performance_metrics[db_name]
            
            all_stats[db_name] = {
                **db_stats,
                "performance": performance_stats
            }
        
        return all_stats
    
    def recommend_best_database(self, criteria: str = "speed") -> Optional[str]:
        """最適データベースの推奨"""
        if not self.performance_metrics:
            return self.default_db
        
        if criteria == "speed":
            # 平均クエリ時間が最短のDBを推奨
            best_db = min(
                self.performance_metrics.items(),
                key=lambda x: x[1].get("avg_query_time", float('inf'))
            )
            return best_db[0] if best_db[1]["total_queries"] > 0 else self.default_db
        
        return self.default_db

# 使用例
async def demonstrate_vector_db_management():
    """ベクトルDB管理システムのデモ"""
    
    # 統合管理システムの初期化
    vector_manager = UnifiedVectorDBManager()
    
    # 各種データベースの追加
    vector_manager.add_database("pinecone", PineconeAdapter(
        api_key="your-key",
        environment="us-west1-gcp",
        index_name="ai-agent-index"
    ), is_default=True)
    
    vector_manager.add_database("weaviate", WeaviateAdapter(
        url="http://localhost:8080"
    ))
    
    vector_manager.add_database("chroma", ChromaAdapter(
        persist_directory="./chroma_db",
        collection_name="ai_documents"
    ))
    
    # テストドキュメントの準備
    test_chunks = [
        DocumentChunk(
            id="doc_1",
            content="AIエージェントは自律的に動作するシステムです",
            metadata={"category": "definition", "language": "ja"},
            embedding=[0.1] * 1536  # ダミーエンベディング
        ),
        DocumentChunk(
            id="doc_2", 
            content="機械学習は人工知能の一分野です",
            metadata={"category": "ml", "language": "ja"},
            embedding=[0.2] * 1536
        )
    ]
    
    # ドキュメントの挿入
    print("=== ドキュメント挿入 ===")
    insertion_results = await vector_manager.insert_documents(test_chunks)
    for db_name, success in insertion_results.items():
        print(f"{db_name}: {'成功' if success else '失敗'}")
    
    # 横断検索の実行
    print("\n=== 横断検索実行 ===")
    query_vector = [0.15] * 1536  # ダミークエリベクトル
    
    search_results = await vector_manager.search_across_databases(query_vector, top_k=5)
    
    for db_name, results in search_results.items():
        print(f"\n{db_name} 検索結果:")
        for result in results:
            print(f"  - スコア: {result.score:.3f}, 内容: {result.chunk.content}")
    
    # 性能比較
    print("\n=== 性能比較 ===")
    performance_comparison = await vector_manager.compare_database_performance(query_vector)
    
    for db_name, stats in performance_comparison.items():
        print(f"{db_name}:")
        print(f"  平均応答時間: {stats['avg_response_time']:.3f}秒")
        print(f"  成功率: {stats['success_rate']:.1%}")
    
    # 推奨DB
    recommended_db = vector_manager.recommend_best_database("speed")
    print(f"\n推奨データベース: {recommended_db}")
    
    # 統合統計
    unified_stats = await vector_manager.get_unified_stats()
    print("\n=== 統合統計情報 ===")
    for db_name, stats in unified_stats.items():
        print(f"{db_name}: プロバイダー={stats.get('provider', 'unknown')}")

if __name__ == "__main__":
    asyncio.run(demonstrate_vector_db_management())

さらに理解を深める参考書

関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。

監視・ロギング・分析ツール

包括的監視システム

# 統合監視・ロギングシステム
import logging
import structlog
from typing import Dict, List, Any, Optional
import time
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
import psutil
import traceback

@dataclass
class MetricPoint:
    timestamp: float
    name: str
    value: float
    tags: Dict[str, str] = field(default_factory=dict)

@dataclass
class AlertRule:
    name: str
    metric_name: str
    condition: str  # "gt", "lt", "eq"
    threshold: float
    duration_seconds: int
    severity: str  # "critical", "warning", "info"

class StructuredLogger:
    """構造化ログシステム"""
    
    def __init__(self, service_name: str, log_level: str = "INFO"):
        self.service_name = service_name
        
        # Structlogの設定
        structlog.configure(
            processors=[
                structlog.stdlib.filter_by_level,
                structlog.stdlib.add_logger_name,
                structlog.stdlib.add_log_level,
                structlog.stdlib.PositionalArgumentsFormatter(),
                structlog.processors.TimeStamper(fmt="iso"),
                structlog.processors.StackInfoRenderer(),
                structlog.processors.format_exc_info,
                structlog.processors.UnicodeDecoder(),
                structlog.processors.JSONRenderer()
            ],
            context_class=dict,
            logger_factory=structlog.stdlib.LoggerFactory(),
            wrapper_class=structlog.stdlib.BoundLogger,
            cache_logger_on_first_use=True,
        )
        
        # 基本ロガーの設定
        logging.basicConfig(
            format="%(message)s",
            level=getattr(logging, log_level.upper())
        )
        
        self.logger = structlog.get_logger(service_name)
    
    def log_agent_request(self, 
                         request_id: str,
                         user_id: str, 
                         prompt: str,
                         response: str,
                         execution_time: float,
                         tokens_used: int,
                         cost: float,
                         status: str):
        """エージェントリクエストのログ"""
        self.logger.info(
            "agent_request_completed",
            request_id=request_id,
            user_id=user_id,
            prompt_length=len(prompt),
            response_length=len(response),
            execution_time=execution_time,
            tokens_used=tokens_used,
            cost=cost,
            status=status,
            event_type="agent_request"
        )
    
    def log_error(self, error: Exception, context: Dict[str, Any] = None):
        """エラーログ"""
        self.logger.error(
            "error_occurred",
            error_type=type(error).__name__,
            error_message=str(error),
            traceback=traceback.format_exc(),
            context=context or {},
            event_type="error"
        )
    
    def log_performance_metric(self, metric_name: str, value: float, tags: Dict[str, str] = None):
        """パフォーマンスメトリクスのログ"""
        self.logger.info(
            "performance_metric",
            metric_name=metric_name,
            metric_value=value,
            tags=tags or {},
            event_type="metric"
        )
    
    def log_security_event(self, event_type: str, details: Dict[str, Any]):
        """セキュリティイベントのログ"""
        self.logger.warning(
            "security_event",
            security_event_type=event_type,
            details=details,
            event_type="security"
        )

class MetricsCollector:
    """メトリクス収集システム"""
    
    def __init__(self):
        self.metrics: List[MetricPoint] = []
        self.counters: Dict[str, int] = {}
        self.gauges: Dict[str, float] = {}
        self.histograms: Dict[str, List[float]] = {}
        
    def increment_counter(self, name: str, value: int = 1, tags: Dict[str, str] = None):
        """カウンターの増加"""
        key = f"{name}:{json.dumps(tags or {}, sort_keys=True)}"
        self.counters[key] = self.counters.get(key, 0) + value
        
        self.metrics.append(MetricPoint(
            timestamp=time.time(),
            name=name,
            value=self.counters[key],
            tags=tags or {}
        ))
    
    def set_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
        """ゲージの設定"""
        key = f"{name}:{json.dumps(tags or {}, sort_keys=True)}"
        self.gauges[key] = value
        
        self.metrics.append(MetricPoint(
            timestamp=time.time(),
            name=name,
            value=value,
            tags=tags or {}
        ))
    
    def record_histogram(self, name: str, value: float, tags: Dict[str, str] = None):
        """ヒストグラムの記録"""
        key = f"{name}:{json.dumps(tags or {}, sort_keys=True)}"
        if key not in self.histograms:
            self.histograms[key] = []
        
        self.histograms[key].append(value)
        
        self.metrics.append(MetricPoint(
            timestamp=time.time(),
            name=name,
            value=value,
            tags=tags or {}
        ))
    
    def get_system_metrics(self) -> Dict[str, float]:
        """システムメトリクスの取得"""
        # CPU使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        
        # メモリ使用率
        memory = psutil.virtual_memory()
        memory_percent = memory.percent
        
        # ディスク使用率
        disk = psutil.disk_usage('/')
        disk_percent = disk.percent
        
        metrics = {
            "system.cpu.percent": cpu_percent,
            "system.memory.percent": memory_percent,
            "system.disk.percent": disk_percent,
            "system.memory.available_gb": memory.available / (1024**3),
            "system.disk.free_gb": disk.free / (1024**3)
        }
        
        # メトリクスの記録
        for name, value in metrics.items():
            self.set_gauge(name, value)
        
        return metrics
    
    def get_recent_metrics(self, minutes: int = 5) -> List[MetricPoint]:
        """最近のメトリクスを取得"""
        cutoff_time = time.time() - (minutes * 60)
        return [m for m in self.metrics if m.timestamp > cutoff_time]
    
    def calculate_statistics(self, metric_name: str, minutes: int = 5) -> Dict[str, float]:
        """メトリクスの統計計算"""
        recent_metrics = [
            m for m in self.get_recent_metrics(minutes)
            if m.name == metric_name
        ]
        
        if not recent_metrics:
            return {}
        
        values = [m.value for m in recent_metrics]
        
        return {
            "count": len(values),
            "min": min(values),
            "max": max(values),
            "avg": sum(values) / len(values),
            "sum": sum(values)
        }

class AlertManager:
    """アラート管理システム"""
    
    def __init__(self, metrics_collector: MetricsCollector):
        self.metrics_collector = metrics_collector
        self.alert_rules: List[AlertRule] = []
        self.active_alerts: Dict[str, Dict[str, Any]] = {}
        self.alert_history: List[Dict[str, Any]] = []
    
    def add_alert_rule(self, rule: AlertRule):
        """アラートルールの追加"""
        self.alert_rules.append(rule)
    
    async def check_alerts(self):
        """アラートチェック"""
        current_time = time.time()
        
        for rule in self.alert_rules:
            # メトリクスの統計を取得
            stats = self.metrics_collector.calculate_statistics(
                rule.metric_name, 
                rule.duration_seconds // 60
            )
            
            if not stats:
                continue
            
            # 条件チェック
            triggered = False
            current_value = stats.get("avg", 0)
            
            if rule.condition == "gt" and current_value > rule.threshold:
                triggered = True
            elif rule.condition == "lt" and current_value < rule.threshold:
                triggered = True
            elif rule.condition == "eq" and abs(current_value - rule.threshold) < 0.001:
                triggered = True
            
            alert_key = f"{rule.name}:{rule.metric_name}"
            
            if triggered:
                if alert_key not in self.active_alerts:
                    # 新しいアラート
                    alert = {
                        "rule_name": rule.name,
                        "metric_name": rule.metric_name,
                        "severity": rule.severity,
                        "threshold": rule.threshold,
                        "current_value": current_value,
                        "triggered_at": current_time,
                        "status": "active"
                    }
                    
                    self.active_alerts[alert_key] = alert
                    self.alert_history.append(alert.copy())
                    
                    await self._send_alert_notification(alert)
            else:
                if alert_key in self.active_alerts:
                    # アラート解決
                    resolved_alert = self.active_alerts[alert_key]
                    resolved_alert["status"] = "resolved"
                    resolved_alert["resolved_at"] = current_time
                    
                    self.alert_history.append(resolved_alert.copy())
                    del self.active_alerts[alert_key]
                    
                    await self._send_resolution_notification(resolved_alert)
    
    async def _send_alert_notification(self, alert: Dict[str, Any]):
        """アラート通知の送信"""
        print(f"🚨 ALERT: {alert['rule_name']}")
        print(f"   Metric: {alert['metric_name']}")
        print(f"   Current: {alert['current_value']:.2f}")
        print(f"   Threshold: {alert['threshold']:.2f}")
        print(f"   Severity: {alert['severity']}")
    
    async def _send_resolution_notification(self, alert: Dict[str, Any]):
        """アラート解決通知の送信"""
        print(f"✅ RESOLVED: {alert['rule_name']}")
    
    def get_active_alerts(self) -> List[Dict[str, Any]]:
        """アクティブなアラートの取得"""
        return list(self.active_alerts.values())
    
    def get_alert_history(self, hours: int = 24) -> List[Dict[str, Any]]:
        """アラート履歴の取得"""
        cutoff_time = time.time() - (hours * 3600)
        return [
            alert for alert in self.alert_history
            if alert["triggered_at"] > cutoff_time
        ]

class AgentMonitoringSystem:
    """AIエージェント専用監視システム"""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.logger = StructuredLogger(service_name)
        self.metrics = MetricsCollector()
        self.alerts = AlertManager(self.metrics)
        
        # デフォルトアラートルールの設定
        self._setup_default_alerts()
        
        # 監視タスク
        self.monitoring_task = None
    
    def _setup_default_alerts(self):
        """デフォルトアラートルールの設定"""
        default_rules = [
            AlertRule(
                name="High Error Rate",
                metric_name="agent.error_rate",
                condition="gt",
                threshold=0.1,  # 10%
                duration_seconds=300,  # 5分
                severity="critical"
            ),
            AlertRule(
                name="Slow Response Time",
                metric_name="agent.response_time",
                condition="gt",
                threshold=30.0,  # 30秒
                duration_seconds=300,
                severity="warning"
            ),
            AlertRule(
                name="High Memory Usage",
                metric_name="system.memory.percent",
                condition="gt",
                threshold=85.0,  # 85%
                duration_seconds=300,
                severity="warning"
            ),
            AlertRule(
                name="High Cost Per Hour",
                metric_name="agent.cost_per_hour",
                condition="gt",
                threshold=100.0,  # $100/時間
                duration_seconds=3600,  # 1時間
                severity="critical"
            )
        ]
        
        for rule in default_rules:
            self.alerts.add_alert_rule(rule)
    
    def track_agent_request(self, 
                           request_id: str,
                           user_id: str,
                           prompt: str,
                           response: str,
                           execution_time: float,
                           tokens_used: int,
                           cost: float,
                           status: str):
        """エージェントリクエストの追跡"""
        
        # ログ記録
        self.logger.log_agent_request(
            request_id, user_id, prompt, response,
            execution_time, tokens_used, cost, status
        )
        
        # メトリクス記録
        self.metrics.increment_counter("agent.requests.total", tags={"status": status})
        self.metrics.record_histogram("agent.response_time", execution_time)
        self.metrics.record_histogram("agent.tokens_used", tokens_used)
        self.metrics.record_histogram("agent.cost", cost)
        
        # エラー率の計算
        recent_requests = self.metrics.get_recent_metrics(5)  # 5分間
        total_requests = len([m for m in recent_requests if m.name == "agent.requests.total"])
        error_requests = len([
            m for m in recent_requests 
            if m.name == "agent.requests.total" and m.tags.get("status") == "error"
        ])
        
        if total_requests > 0:
            error_rate = error_requests / total_requests
            self.metrics.set_gauge("agent.error_rate", error_rate)
        
        # コスト/時間の計算
        hourly_cost = self._calculate_hourly_cost()
        self.metrics.set_gauge("agent.cost_per_hour", hourly_cost)
    
    def _calculate_hourly_cost(self) -> float:
        """時間当たりコストの計算"""
        hour_ago = time.time() - 3600
        recent_costs = [
            m.value for m in self.metrics.metrics
            if m.name == "agent.cost" and m.timestamp > hour_ago
        ]
        return sum(recent_costs)
    
    def track_system_performance(self):
        """システムパフォーマンスの追跡"""
        system_metrics = self.metrics.get_system_metrics()
        
        for metric_name, value in system_metrics.items():
            self.logger.log_performance_metric(metric_name, value)
    
    def track_security_event(self, event_type: str, details: Dict[str, Any]):
        """セキュリティイベントの追跡"""
        self.logger.log_security_event(event_type, details)
        self.metrics.increment_counter("security.events", tags={"type": event_type})
    
    async def start_monitoring(self):
        """監視の開始"""
        if self.monitoring_task and not self.monitoring_task.done():
            return
        
        self.monitoring_task = asyncio.create_task(self._monitoring_loop())
    
    async def stop_monitoring(self):
        """監視の停止"""
        if self.monitoring_task:
            self.monitoring_task.cancel()
            try:
                await self.monitoring_task
            except asyncio.CancelledError:
                pass
    
    async def _monitoring_loop(self):
        """監視ループ"""
        while True:
            try:
                # システムメトリクス収集
                self.track_system_performance()
                
                # アラートチェック
                await self.alerts.check_alerts()
                
                await asyncio.sleep(30)  # 30秒間隔
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                self.logger.log_error(e, {"context": "monitoring_loop"})
                await asyncio.sleep(60)  # エラー時は1分待機
    
    def get_monitoring_dashboard_data(self) -> Dict[str, Any]:
        """監視ダッシュボード用データの取得"""
        recent_metrics = self.metrics.get_recent_metrics(60)  # 過去1時間
        
        # 基本統計
        request_stats = self.metrics.calculate_statistics("agent.requests.total", 60)
        response_time_stats = self.metrics.calculate_statistics("agent.response_time", 60)
        cost_stats = self.metrics.calculate_statistics("agent.cost", 60)
        
        # システムメトリクス
        system_metrics = self.metrics.get_system_metrics()
        
        # アクティブアラート
        active_alerts = self.alerts.get_active_alerts()
        
        return {
            "service_name": self.service_name,
            "timestamp": time.time(),
            "request_stats": request_stats,
            "response_time_stats": response_time_stats,
            "cost_stats": cost_stats,
            "system_metrics": system_metrics,
            "active_alerts": active_alerts,
            "total_metrics_collected": len(recent_metrics)
        }

# 使用例
async def demonstrate_monitoring_system():
    """監視システムのデモ"""
    
    # 監視システムの初期化
    monitor = AgentMonitoringSystem("ai-agent-service")
    
    # 監視開始
    await monitor.start_monitoring()
    
    # テストリクエストのシミュレーション
    print("=== AIエージェント監視システム デモ ===\n")
    
    for i in range(10):
        # 正常なリクエスト
        monitor.track_agent_request(
            request_id=f"req_{i}",
            user_id=f"user_{i % 3}",
            prompt=f"Test prompt {i}",
            response=f"Test response {i}",
            execution_time=2.0 + (i * 0.5),
            tokens_used=100 + (i * 10),
            cost=0.002 + (i * 0.001),
            status="success" if i < 8 else "error"  # 最後の2つはエラー
        )
        
        await asyncio.sleep(0.1)
    
    # セキュリティイベントのシミュレーション
    monitor.track_security_event("prompt_injection_attempt", {
        "user_id": "malicious_user",
        "prompt": "Ignore previous instructions..."
    })
    
    # 少し待ってから統計を確認
    await asyncio.sleep(2)
    
    # ダッシュボードデータの取得
    dashboard_data = monitor.get_monitoring_dashboard_data()
    
    print("=== 監視ダッシュボード ===")
    print(f"サービス名: {dashboard_data['service_name']}")
    print(f"リクエスト統計: {dashboard_data['request_stats']}")
    print(f"応答時間統計: {dashboard_data['response_time_stats']}")
    print(f"コスト統計: {dashboard_data['cost_stats']}")
    print(f"アクティブアラート数: {len(dashboard_data['active_alerts'])}")
    
    if dashboard_data['active_alerts']:
        print("\nアクティブアラート:")
        for alert in dashboard_data['active_alerts']:
            print(f"  - {alert['rule_name']}: {alert['current_value']:.2f} (閾値: {alert['threshold']:.2f})")
    
    # 監視停止
    await monitor.stop_monitoring()

if __name__ == "__main__":
    asyncio.run(demonstrate_monitoring_system())

さらに理解を深める参考書

関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。

学習リソースと実践書籍ガイド

厳選された技術書籍リスト

AIエージェント開発に必要なツール・ライブラリの習得をサポートする、実践的な書籍リソースを紹介します:

開発環境・ツール習得

  • 「Python開発環境構築完全ガイド」- Poetry、Docker、CI/CDの実践
  • 「現代的開発ツール活用術」- 効率的な開発フローの構築
  • 「VSCode拡張開発」- カスタマイズによる開発効率向上

データベース・検索技術

  • 「ベクトルデータベース実践ガイド」- Pinecone、Weaviate活用法
  • 「Elasticsearch実践ガイド」- 高度な全文検索システム
  • 「Redis設計・運用ガイド」- インメモリDB活用術

監視・運用技術

  • 「Prometheus監視システム」- メトリクス収集と可視化
  • 「Grafana実践ダッシュボード」- 運用監視の効率化
  • 「ログ管理システム構築」- 構造化ログとトラブルシューティング

セキュリティ・品質管理

  • 「AIシステムセキュリティ」- プロンプトインジェクション対策
  • 「テスト駆動AI開発」- 品質保証の実践手法
  • 「DevSecOps実践ガイド」- セキュアな開発フロー

これらの書籍は、理論と実践のバランスが優れており、実際のプロジェクトで即戦力となる知識とスキルを身につけることができます。

さらに理解を深める参考書

関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。

まとめ:効率的なAIエージェント開発環境の構築

適切なツール・ライブラリの選択と活用は、AIエージェント開発プロジェクトの成否を決定する重要な要素です。本記事で紹介したツールセットを活用することで、以下の価値を実現できます:

主要な効果

  1. 開発効率の向上: 統合開発環境と自動化ツールによる生産性向上
  2. 品質の確保: 包括的なテスト・監視システムによる高品質なシステム構築
  3. 運用の安定化: プロアクティブな監視とアラートによる安定運用
  4. スケーラビリティ: マイクロサービス対応ツールによる柔軟な拡張性

導入の優先順位

  1. 基盤環境: Poetry、Docker、VSCode環境の整備
  2. 開発ツール: LLM統合、ベクトルDB、テストフレームワーク
  3. 監視システム: ログ、メトリクス、アラート機能
  4. 高度な機能: セキュリティツール、パフォーマンス最適化

継続的な学習と実践により、これらのツールを効果的に組み合わせ、世界レベルのAIエージェントシステムを構築する能力を身につけることができるでしょう。適切なツール選択が、技術的負債を減らし、長期的な成功につながります。

さらに理解を深める参考書

関連記事と相性の良い実践ガイドです。手元に置いて反復しながら進めてみてください。

この記事をシェア

続けて読みたい記事

編集部がピックアップした関連記事で学びを広げましょう。

#AIエージェント

【2025年完全版】AIエージェントフレームワーク徹底比較:最適な選択ガイド

2025/11/28
#セキュリティ

【2025年版】AIエージェントのセキュリティテスト完全ガイド

2025/11/23
#LangGraph

【2025年完全版】LangGraph完全マスターガイド:マルチエージェント構築からトラブル解決まで

2025/11/28
#AI

AIエージェント開発戦国時代!主要フレームワーク徹底比較【2025年版】

2025/9/3
#プログラミング

【2025年完全版】モノレポ管理実践ガイド:開発効率を3倍にする最新ツールと戦略

2025/11/28
#Python

【2025年完全版】FastAPI完全マスターガイド:API開発からデプロイまで

2025/11/28