Files
executor/strategies/views.py

242 lines
8.6 KiB
Python

from django.shortcuts import render, get_object_or_404
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.utils import timezone
import json
import threading
import time
import requests
import logging
from .models import QuantStrategy, StrategyVersion, StrategyExecution
from .base import StrategyRegistry
from . import implementations # 구현체들을 로드하여 레지스트리에 등록
logger = logging.getLogger(__name__)
@require_http_methods(["GET"])
def list_strategies(request):
# 레지스트리에서 사용 가능한 전략들을 가져옴
available_strategies = StrategyRegistry.list_strategies()
# DB에서 관리되는 전략들을 가져옴
strategies = QuantStrategy.objects.filter(is_active=True).prefetch_related('versions')
strategy_list = []
for strategy in strategies:
current_version = strategy.versions.filter(is_current=True).first()
strategy_data = {
'id': strategy.id,
'name': strategy.name,
'description': strategy.description,
'created_at': strategy.created_at.isoformat(),
'current_version': {
'id': current_version.id,
'version': current_version.version,
'implementation_key': current_version.implementation_key,
'parameters': current_version.parameters,
'created_at': current_version.created_at.isoformat()
} if current_version else None,
'total_versions': strategy.versions.count(),
'available_implementations': available_strategies.get(strategy.name, {}).get('versions', [])
}
strategy_list.append(strategy_data)
return JsonResponse({
'strategies': strategy_list,
'total_count': len(strategy_list),
'available_implementations': available_strategies
})
@csrf_exempt
@require_http_methods(["POST"])
def execute_strategy(request):
try:
data = json.loads(request.body)
strategy_name = data.get('strategy_name')
version = data.get('version')
execution_parameters = data.get('parameters', {})
callback_url = data.get('callback_url')
if not strategy_name:
return JsonResponse({
'error': 'strategy_name is required'
}, status=400)
strategy = get_object_or_404(QuantStrategy, name=strategy_name, is_active=True)
if version:
strategy_version = get_object_or_404(
StrategyVersion,
strategy=strategy,
version=version
)
else:
strategy_version = strategy.versions.filter(is_current=True).first()
if not strategy_version:
return JsonResponse({
'error': 'No current version found for this strategy'
}, status=404)
execution = StrategyExecution.objects.create(
strategy_version=strategy_version,
execution_parameters=execution_parameters,
status='pending',
callback_url=callback_url
)
def run_strategy():
try:
execution.status = 'running'
execution.save()
# 전략 구현체 인스턴스를 가져와서 실행
strategy_impl = strategy_version.get_strategy_implementation()
# 기본 파라미터와 실행 파라미터를 병합
merged_parameters = strategy_impl.default_parameters.copy()
merged_parameters.update(execution_parameters)
# 전략 실행
result = strategy_impl.execute(merged_parameters)
execution.status = 'completed'
execution.result = result
execution.completed_at = timezone.now()
execution.save()
except Exception as e:
execution.status = 'failed'
execution.error_message = str(e)
execution.completed_at = timezone.now()
execution.save()
finally:
# 콜백 URL이 있으면 결과 전송
if execution.callback_url:
send_callback(execution)
thread = threading.Thread(target=run_strategy)
thread.start()
return JsonResponse({
'execution_id': execution.id,
'status': 'pending',
'message': 'Strategy execution started',
'callback_url': callback_url
})
except json.JSONDecodeError:
return JsonResponse({
'error': 'Invalid JSON in request body'
}, status=400)
except Exception as e:
return JsonResponse({
'error': str(e)
}, status=500)
def send_callback(execution):
"""전략 실행 완료 후 콜백 URL로 결과 전송"""
if not execution.callback_url:
return
try:
# 콜백 데이터 구성
callback_data = {
'execution_id': execution.id,
'strategy': execution.strategy_version.strategy.name,
'version': execution.strategy_version.version,
'status': execution.status,
'started_at': execution.started_at.isoformat(),
'completed_at': execution.completed_at.isoformat() if execution.completed_at else None,
'execution_parameters': execution.execution_parameters
}
if execution.status == 'completed' and execution.result:
callback_data['result'] = execution.result
if execution.status == 'failed' and execution.error_message:
callback_data['error_message'] = execution.error_message
# POST 요청 전송 (타임아웃 10초)
response = requests.post(
execution.callback_url,
json=callback_data,
headers={'Content-Type': 'application/json'},
timeout=10
)
# 콜백 전송 결과 저장
execution.callback_sent = True
execution.callback_sent_at = timezone.now()
execution.callback_response = {
'status_code': response.status_code,
'response_text': response.text[:500], # 처음 500자만 저장
'headers': dict(response.headers)
}
execution.save()
logger.info(f"Callback sent successfully to {execution.callback_url} for execution {execution.id}")
except requests.exceptions.Timeout:
logger.error(f"Callback timeout for execution {execution.id} to {execution.callback_url}")
execution.callback_response = {'error': 'timeout'}
execution.save()
except requests.exceptions.RequestException as e:
logger.error(f"Callback failed for execution {execution.id} to {execution.callback_url}: {str(e)}")
execution.callback_response = {'error': str(e)}
execution.save()
except Exception as e:
logger.error(f"Unexpected error sending callback for execution {execution.id}: {str(e)}")
execution.callback_response = {'error': f'unexpected_error: {str(e)}'}
execution.save()
@require_http_methods(["GET"])
def execution_status(request, execution_id):
execution = get_object_or_404(StrategyExecution, id=execution_id)
response_data = {
'execution_id': execution.id,
'strategy': execution.strategy_version.strategy.name,
'version': execution.strategy_version.version,
'status': execution.status,
'started_at': execution.started_at.isoformat(),
'execution_parameters': execution.execution_parameters
}
if execution.completed_at:
response_data['completed_at'] = execution.completed_at.isoformat()
if execution.status == 'completed' and execution.result:
response_data['result'] = execution.result
if execution.status == 'failed' and execution.error_message:
response_data['error_message'] = execution.error_message
# 콜백 정보 추가
if execution.callback_url:
response_data['callback'] = {
'url': execution.callback_url,
'sent': execution.callback_sent,
'sent_at': execution.callback_sent_at.isoformat() if execution.callback_sent_at else None,
'response': execution.callback_response
}
return JsonResponse(response_data)
@require_http_methods(["GET"])
def list_available_implementations(request):
"""레지스트리에 등록된 모든 전략 구현체 목록을 반환"""
available_strategies = StrategyRegistry.list_strategies()
return JsonResponse({
'available_implementations': available_strategies
})