Copy class PersonaParser:
"""Parse and validate persona content with comprehensive validation"""
def __init__(self):
self.parsing_rules = PersonaParsingRules()
self.validation_framework = PersonaValidationFramework()
self.content_analyzers = PersonaContentAnalyzers()
def parse_persona_content(self, persona_content):
"""Parse persona content with comprehensive analysis"""
# Initial content validation
content_validation = self.validate_content_format(persona_content)
if not content_validation.passed:
raise PersonaParsingError(f"Content format validation failed: {content_validation.errors}")
# Parse persona sections
parsed_sections = self.parse_persona_sections(persona_content)
# Extract persona characteristics
characteristics = self.extract_persona_characteristics(parsed_sections)
# Parse behavioral directives
behavioral_directives = self.parse_behavioral_directives(parsed_sections)
# Extract knowledge domains
knowledge_domains = self.extract_knowledge_domains(parsed_sections)
# Parse communication style
communication_style = self.parse_communication_style(parsed_sections)
# Extract capabilities and limitations
capabilities = self.extract_capabilities(parsed_sections)
limitations = self.extract_limitations(parsed_sections)
# Create parsed persona object
parsed_persona = ParsedPersona(
raw_content=persona_content,
sections=parsed_sections,
characteristics=characteristics,
behavioral_directives=behavioral_directives,
knowledge_domains=knowledge_domains,
communication_style=communication_style,
capabilities=capabilities,
limitations=limitations
)
# Validate parsed persona
validation_result = self.validation_framework.validate_parsed_persona(parsed_persona)
if not validation_result.passed:
raise PersonaValidationError(f"Parsed persona validation failed: {validation_result.errors}")
return parsed_persona
def parse_persona_sections(self, persona_content):
"""Parse persona content into structured sections"""
section_patterns = {
"identity": r"## Identity\s*\n(.*?)(?=\n##|\Z)",
"role": r"## Role\s*\n(.*?)(?=\n##|\Z)",
"responsibilities": r"## Responsibilities\s*\n(.*?)(?=\n##|\Z)",
"expertise": r"## Expertise\s*\n(.*?)(?=\n##|\Z)",
"communication_style": r"## Communication Style\s*\n(.*?)(?=\n##|\Z)",
"decision_making": r"## Decision Making\s*\n(.*?)(?=\n##|\Z)",
"collaboration": r"## Collaboration\s*\n(.*?)(?=\n##|\Z)",
"limitations": r"## Limitations\s*\n(.*?)(?=\n##|\Z)"
}
parsed_sections = {}
for section_name, pattern in section_patterns.items():
match = re.search(pattern, persona_content, re.DOTALL | re.IGNORECASE)
if match:
section_content = match.group(1).strip()
parsed_sections[section_name] = self.parse_section_content(section_content)
else:
# Section not found - check if required
if self.is_required_section(section_name):
raise PersonaParsingError(f"Required section '{section_name}' not found in persona content")
else:
parsed_sections[section_name] = None
return parsed_sections
def extract_persona_characteristics(self, parsed_sections):
"""Extract key persona characteristics from parsed sections"""
characteristics = PersonaCharacteristics()
# Extract from identity section
if parsed_sections.get("identity"):
identity_traits = self.extract_identity_traits(parsed_sections["identity"])
characteristics.add_traits(identity_traits)
# Extract from role section
if parsed_sections.get("role"):
role_characteristics = self.extract_role_characteristics(parsed_sections["role"])
characteristics.add_role_characteristics(role_characteristics)
# Extract from expertise section
if parsed_sections.get("expertise"):
expertise_areas = self.extract_expertise_areas(parsed_sections["expertise"])
characteristics.add_expertise_areas(expertise_areas)
# Extract behavioral patterns
behavioral_patterns = self.extract_behavioral_patterns(parsed_sections)
characteristics.add_behavioral_patterns(behavioral_patterns)
return characteristics
### 5. State Management
#### Transformation State Management
```python
class TransformationStateManager:
"""Manage state throughout persona transformation process"""
def __init__(self):
self.state_stack = []
self.state_history = []
self.checkpoint_manager = StateCheckpointManager()
self.state_validators = StateValidationFramework()
def manage_transformation_state(self, transformation_session):
"""Manage state throughout transformation process"""
# Initialize state management
initial_state = self.capture_initial_state()
self.state_stack.append(initial_state)
# Create state checkpoints at key phases
checkpoint_phases = [
"pre_transformation",
"context_preserved",
"agent_config_loaded",
"persona_parsed",
"context_loaded",
"memory_allocated",
"persona_activated",
"post_validation",
"transformation_complete"
]
state_management_result = StateManagementResult()
for phase in checkpoint_phases:
try:
# Create checkpoint before phase
checkpoint = self.checkpoint_manager.create_checkpoint(
phase=phase,
transformation_session=transformation_session,
current_state=self.get_current_state()
)
# Validate state consistency
state_validation = self.state_validators.validate_state_consistency(
current_state=self.get_current_state(),
expected_phase=phase
)
if not state_validation.passed:
raise StateValidationError(f"State validation failed at {phase}: {state_validation.errors}")
# Record successful checkpoint
state_management_result.add_checkpoint(phase, checkpoint)
except Exception as e:
# Handle state management error
error_recovery = self.handle_state_error(phase, e, transformation_session)
state_management_result.add_error(phase, error_recovery)
if error_recovery.is_critical:
raise CriticalStateError(f"Critical state error at {phase}: {e}")
return state_management_result
def capture_initial_state(self):
"""Capture initial system state before transformation"""
return SystemState(
orchestrator_mode=get_current_orchestrator_mode(),
active_workflows=get_active_workflows(),
user_session=capture_user_session_state(),
system_resources=capture_system_resource_state(),
memory_state=capture_memory_state(),
configuration_state=capture_configuration_state(),
timestamp=time.time()
)
def validate_state_transition(self, from_state, to_state, transition_type):
"""Validate state transition is valid and safe"""
# Check transition validity
transition_validation = self.state_validators.validate_transition(
from_state=from_state,
to_state=to_state,
transition_type=transition_type
)
if not transition_validation.is_valid:
raise InvalidStateTransitionError(f"Invalid state transition: {transition_validation.error}")
# Check resource requirements
resource_validation = self.validate_transition_resources(from_state, to_state)
if not resource_validation.sufficient:
raise InsufficientResourcesError(f"Insufficient resources for state transition: {resource_validation.missing}")
# Check dependencies
dependency_validation = self.validate_transition_dependencies(from_state, to_state)
if not dependency_validation.satisfied:
raise UnsatisfiedDependenciesError(f"Unsatisfied dependencies: {dependency_validation.missing}")
return StateTransitionValidationResult(
valid=True,
transition_validation=transition_validation,
resource_validation=resource_validation,
dependency_validation=dependency_validation
)