Files
executor/strategies/views.py
Jongheon Kim 940d9bfe6e feat: add PDF report generation for backtest executions
- Implemented `generate_backtest_report` to create PDF reports for backtest results.
- Added `report_file` field to `StrategyExecution` for storing report paths.
- Introduced `/executions/<execution_id>/report/` endpoint for downloading reports.
- Enhanced backtesting flow to generate and save reports upon completion.
- Updated dependencies to include `matplotlib` for report generation.
2026-02-08 14:39:23 +09:00

377 lines
13 KiB
Python

import json
import logging
import os
import threading
import time
import requests
from django.conf import settings
from django.http import FileResponse, JsonResponse
from django.shortcuts import render, get_object_or_404
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.utils import timezone
from .models import QuantStrategy, StrategyVersion, StrategyExecution
from .base import StrategyRegistry
from .services.backtest import run_backtest
from .services.report import generate_backtest_report
from . import implementations # 구현체들을 로드하여 레지스트리에 등록
logger = logging.getLogger(__name__)
@require_http_methods(["GET"])
def list_strategies(request):
# 레지스트리에서 사용 가능한 전략들을 가져옴
available_strategies = StrategyRegistry.list_strategies()
# DB에서 관리되는 전략들을 가져옴
strategies = QuantStrategy.objects.filter(is_active=True).prefetch_related('versions')
strategy_list = []
for strategy in strategies:
current_version = strategy.versions.filter(is_current=True).first()
strategy_data = {
'id': strategy.id,
'name': strategy.name,
'description': strategy.description,
'created_at': strategy.created_at.isoformat(),
'current_version': {
'id': current_version.id,
'version': current_version.version,
'implementation_key': current_version.implementation_key,
'parameters': current_version.parameters,
'created_at': current_version.created_at.isoformat()
} if current_version else None,
'total_versions': strategy.versions.count(),
'available_implementations': available_strategies.get(strategy.name, {}).get('versions', [])
}
strategy_list.append(strategy_data)
return JsonResponse({
'strategies': strategy_list,
'total_count': len(strategy_list),
'available_implementations': available_strategies
})
@csrf_exempt
@require_http_methods(["POST"])
def execute_strategy(request):
try:
data = json.loads(request.body)
strategy_name = data.get('strategy_name')
version = data.get('version')
execution_parameters = data.get('parameters', {})
callback_url = data.get('callback_url')
if not strategy_name:
return JsonResponse({
'error': 'strategy_name is required'
}, status=400)
strategy = get_object_or_404(QuantStrategy, name=strategy_name, is_active=True)
if version:
strategy_version = get_object_or_404(
StrategyVersion,
strategy=strategy,
version=version
)
else:
strategy_version = strategy.versions.filter(is_current=True).first()
if not strategy_version:
return JsonResponse({
'error': 'No current version found for this strategy'
}, status=404)
execution = StrategyExecution.objects.create(
strategy_version=strategy_version,
execution_parameters=execution_parameters,
status='pending',
callback_url=callback_url
)
def run_strategy():
try:
execution.status = 'running'
execution.save()
# 전략 구현체 인스턴스를 가져와서 실행
strategy_impl = strategy_version.get_strategy_implementation()
# 기본 파라미터와 실행 파라미터를 병합
merged_parameters = strategy_impl.default_parameters.copy()
merged_parameters.update(execution_parameters)
# 전략 실행
result = strategy_impl.execute(merged_parameters)
execution.status = 'completed'
execution.result = result
execution.completed_at = timezone.now()
execution.save()
except Exception as e:
execution.status = 'failed'
execution.error_message = str(e)
execution.completed_at = timezone.now()
execution.save()
finally:
# 콜백 URL이 있으면 결과 전송
if execution.callback_url:
send_callback(execution)
thread = threading.Thread(target=run_strategy)
thread.start()
return JsonResponse({
'execution_id': execution.id,
'status': 'pending',
'message': 'Strategy execution started',
'callback_url': callback_url
})
except json.JSONDecodeError:
return JsonResponse({
'error': 'Invalid JSON in request body'
}, status=400)
except Exception as e:
return JsonResponse({
'error': str(e)
}, status=500)
def send_callback(execution):
"""전략 실행 완료 후 콜백 URL로 결과 전송"""
if not execution.callback_url:
return
try:
# 콜백 데이터 구성
callback_data = {
'execution_id': execution.id,
'strategy': execution.strategy_version.strategy.name,
'version': execution.strategy_version.version,
'status': execution.status,
'started_at': execution.started_at.isoformat(),
'completed_at': execution.completed_at.isoformat() if execution.completed_at else None,
'execution_parameters': execution.execution_parameters
}
if execution.status == 'completed' and execution.result:
callback_data['result'] = execution.result
if execution.status == 'failed' and execution.error_message:
callback_data['error_message'] = execution.error_message
# POST 요청 전송 (타임아웃 10초)
response = requests.post(
execution.callback_url,
json=callback_data,
headers={'Content-Type': 'application/json'},
timeout=10
)
# 콜백 전송 결과 저장
execution.callback_sent = True
execution.callback_sent_at = timezone.now()
execution.callback_response = {
'status_code': response.status_code,
'response_text': response.text[:500], # 처음 500자만 저장
'headers': dict(response.headers)
}
execution.save()
logger.info(f"Callback sent successfully to {execution.callback_url} for execution {execution.id}")
except requests.exceptions.Timeout:
logger.error(f"Callback timeout for execution {execution.id} to {execution.callback_url}")
execution.callback_response = {'error': 'timeout'}
execution.save()
except requests.exceptions.RequestException as e:
logger.error(f"Callback failed for execution {execution.id} to {execution.callback_url}: {str(e)}")
execution.callback_response = {'error': str(e)}
execution.save()
except Exception as e:
logger.error(f"Unexpected error sending callback for execution {execution.id}: {str(e)}")
execution.callback_response = {'error': f'unexpected_error: {str(e)}'}
execution.save()
@require_http_methods(["GET"])
def execution_status(request, execution_id):
execution = get_object_or_404(StrategyExecution, id=execution_id)
response_data = {
'execution_id': execution.id,
'strategy': execution.strategy_version.strategy.name,
'version': execution.strategy_version.version,
'status': execution.status,
'started_at': execution.started_at.isoformat(),
'execution_parameters': execution.execution_parameters
}
if execution.completed_at:
response_data['completed_at'] = execution.completed_at.isoformat()
if execution.status == 'completed' and execution.result:
response_data['result'] = execution.result
if execution.report_file:
response_data['report_url'] = f'/api/executions/{execution.id}/report/'
if execution.status == 'failed' and execution.error_message:
response_data['error_message'] = execution.error_message
# 콜백 정보 추가
if execution.callback_url:
response_data['callback'] = {
'url': execution.callback_url,
'sent': execution.callback_sent,
'sent_at': execution.callback_sent_at.isoformat() if execution.callback_sent_at else None,
'response': execution.callback_response
}
return JsonResponse(response_data)
@require_http_methods(["GET"])
def list_available_implementations(request):
"""레지스트리에 등록된 모든 전략 구현체 목록을 반환"""
available_strategies = StrategyRegistry.list_strategies()
return JsonResponse({
'available_implementations': available_strategies
})
@csrf_exempt
@require_http_methods(["POST"])
def backtest_strategy(request):
"""전략 백테스트 실행"""
try:
data = json.loads(request.body)
strategy_name = data.get('strategy_name')
version = data.get('version')
execution_parameters = data.get('parameters', {})
callback_url = data.get('callback_url')
if not strategy_name:
return JsonResponse({
'error': 'strategy_name is required'
}, status=400)
# 백테스트 필수 파라미터 검증
if 'backtest_start_date' not in execution_parameters:
return JsonResponse({
'error': 'parameters.backtest_start_date is required'
}, status=400)
if 'backtest_end_date' not in execution_parameters:
return JsonResponse({
'error': 'parameters.backtest_end_date is required'
}, status=400)
strategy = get_object_or_404(QuantStrategy, name=strategy_name, is_active=True)
if version:
strategy_version = get_object_or_404(
StrategyVersion,
strategy=strategy,
version=version
)
else:
strategy_version = strategy.versions.filter(is_current=True).first()
if not strategy_version:
return JsonResponse({
'error': 'No current version found for this strategy'
}, status=404)
# execution_mode를 backtest로 표시
backtest_params = execution_parameters.copy()
backtest_params['execution_mode'] = 'backtest'
execution = StrategyExecution.objects.create(
strategy_version=strategy_version,
execution_parameters=backtest_params,
status='pending',
callback_url=callback_url
)
def run_backtest_task():
try:
execution.status = 'running'
execution.save()
strategy_impl = strategy_version.get_strategy_implementation()
# 기본 파라미터와 실행 파라미터를 병합
merged_parameters = strategy_impl.default_parameters.copy()
merged_parameters.update(execution_parameters)
# 백테스트 실행
result = run_backtest(strategy_impl, merged_parameters)
execution.status = 'completed'
execution.result = result
execution.completed_at = timezone.now()
try:
report_path = generate_backtest_report(result, execution.id)
execution.report_file = report_path
except Exception as report_err:
logger.exception(f'PDF report generation failed for execution {execution.id}: {report_err}')
execution.save()
except Exception as e:
logger.exception(f'Backtest failed for execution {execution.id}')
execution.status = 'failed'
execution.error_message = str(e)
execution.completed_at = timezone.now()
execution.save()
finally:
if execution.callback_url:
send_callback(execution)
thread = threading.Thread(target=run_backtest_task)
thread.start()
return JsonResponse({
'execution_id': execution.id,
'status': 'pending',
'message': 'Backtest execution started',
'callback_url': callback_url
})
except json.JSONDecodeError:
return JsonResponse({
'error': 'Invalid JSON in request body'
}, status=400)
except Exception as e:
return JsonResponse({
'error': str(e)
}, status=500)
@require_http_methods(["GET"])
def download_report(request, execution_id):
"""백테스트 PDF 리포트 다운로드"""
execution = get_object_or_404(StrategyExecution, id=execution_id)
if not execution.report_file:
return JsonResponse({'error': 'Report not available'}, status=404)
file_path = os.path.join(settings.MEDIA_ROOT, execution.report_file)
if not os.path.exists(file_path):
return JsonResponse({'error': 'Report file not found'}, status=404)
response = FileResponse(open(file_path, 'rb'), content_type='application/pdf')
response['Content-Disposition'] = f'attachment; filename="backtest_{execution_id}.pdf"'
return response