412 lines
15 KiB
Python
412 lines
15 KiB
Python
from collections import deque
|
||
import re
|
||
|
||
|
||
class KnowledgebasedAgent:
|
||
"""Knowledge-based agent for Wumpus World using CNF Resolution"""
|
||
|
||
def __init__(self, method="CNF"):
|
||
"""Initialize the knowledge-based agent"""
|
||
self.method = method
|
||
self.cnf_clauses = [] # List of sets (each set is a clause)
|
||
|
||
# ==================== MAIN INTERFACE ====================
|
||
|
||
def TELL(self, step_result):
|
||
"""Add sensor data from gym environment step result
|
||
Args:
|
||
step_result: Can be either:
|
||
- String (legacy): "S11", "-B12", etc.
|
||
- Tuple from env.step(): (observation, reward, terminated, truncated, info)
|
||
"""
|
||
# Handle legacy string input
|
||
if isinstance(step_result, str):
|
||
# print(f"Added sensor: {step_result}")
|
||
self._generate_clauses_from_sensor(step_result)
|
||
return
|
||
|
||
# Handle gym environment step result
|
||
if isinstance(step_result, tuple) and len(step_result) >= 3:
|
||
observation, reward, terminated = step_result[0], step_result[1], step_result[2]
|
||
|
||
# Extract position (convert from 0-based to 1-based coordinates)
|
||
x = observation['x'] + 1 # Convert 0-based to 1-based
|
||
y = observation['y'] + 1 # Convert 0-based to 1-based
|
||
position = f"{x}{y}"
|
||
|
||
print(f"\n Processing step result for position [{x},{y}]:")
|
||
print(f" Terminated: {terminated}, Reward: {reward}")
|
||
|
||
# Extract sensors from observation
|
||
sensors = {
|
||
'stench': observation.get('stench', False),
|
||
'breeze': observation.get('breeze', False),
|
||
'glitter': observation.get('glitter', False),
|
||
'bump': observation.get('bump', False),
|
||
'scream': observation.get('scream', False)
|
||
}
|
||
|
||
# Process survival/death logic first
|
||
if terminated:
|
||
# Agent died - field contains Wumpus OR Pit
|
||
death_clause = {f"W{position}", f"P{position}"}
|
||
self.cnf_clauses.append(death_clause)
|
||
print(f" Death rule: W{position} ∨ P{position}")
|
||
|
||
# No sensor processing when agent dies
|
||
print(f" Agent died - no sensor data processed")
|
||
return
|
||
else:
|
||
# Agent survived - field is safe (no Wumpus AND no Pit)
|
||
safe_wumpus = {f"-W{position}"}
|
||
safe_pit = {f"-P{position}"}
|
||
self.cnf_clauses.append(safe_wumpus)
|
||
self.cnf_clauses.append(safe_pit)
|
||
print(f" Survival rules: ¬W{position} ∧ ¬P{position}")
|
||
|
||
# Process sensor readings
|
||
sensor_mapping = {
|
||
'stench': 'S',
|
||
'breeze': 'B',
|
||
'glitter': 'G',
|
||
'bump': 'BUMP',
|
||
'scream': 'SC'
|
||
}
|
||
|
||
for sensor_name, sensor_value in sensors.items():
|
||
sensor_code = sensor_mapping.get(sensor_name, sensor_name.upper())
|
||
|
||
if sensor_value:
|
||
# Positive sensor reading
|
||
sensor_fact = f"{sensor_code}{position}"
|
||
# print(f" Sensor: {sensor_fact}")
|
||
self._add_sensor_unit_clause(sensor_fact)
|
||
self._generate_clauses_from_sensor(sensor_fact)
|
||
else:
|
||
# Negative sensor reading
|
||
sensor_fact = f"-{sensor_code}{position}"
|
||
# print(f" Sensor: {sensor_fact}")
|
||
self._add_sensor_unit_clause(sensor_fact)
|
||
self._generate_clauses_from_sensor(sensor_fact)
|
||
|
||
return
|
||
|
||
def _add_sensor_unit_clause(self, sensor_fact):
|
||
"""Add sensor as unit clause (avoid duplicates from _generate_clauses_from_sensor)"""
|
||
unit_clause = {sensor_fact}
|
||
if unit_clause not in self.cnf_clauses:
|
||
self.cnf_clauses.append(unit_clause)
|
||
|
||
def TELL_SENSOR(self, sensor_string):
|
||
"""Legacy method for adding individual sensor strings"""
|
||
self.TELL(sensor_string)
|
||
|
||
def TELL_STEP(self, step_result):
|
||
"""Convenience method specifically for gym step results"""
|
||
self.TELL(step_result)
|
||
|
||
def ASK(self, query):
|
||
"""Check if KB entails query using CNF Resolution"""
|
||
result = self._cnf_resolution(query)
|
||
print(f"ASK({query}): {result}")
|
||
return result
|
||
|
||
def print_clauses(self):
|
||
"""Print all CNF clauses"""
|
||
print("\n=== CNF Clauses ===")
|
||
for i, clause in enumerate(self.cnf_clauses):
|
||
literals = " ∨ ".join(sorted(clause))
|
||
print(f" {i + 1}. {{{literals}}}")
|
||
|
||
# ==================== CNF RESOLUTION ALGORITHM ====================
|
||
|
||
def _cnf_resolution(self, query):
|
||
"""CNF Resolution to prove KB |= query"""
|
||
# To prove KB |= query, show KB ∧ ¬query is unsatisfiable
|
||
negated_query = self._negate(query)
|
||
test_clauses = [clause.copy() for clause in self.cnf_clauses]
|
||
test_clauses.append({negated_query})
|
||
|
||
print(f"\nProving: KB ∧ ¬{query} is unsatisfiable")
|
||
print(f"Added negated query: {{{negated_query}}}")
|
||
|
||
new_clauses = []
|
||
iteration = 0
|
||
|
||
while True:
|
||
iteration += 1
|
||
# print(f"\n--- Resolution Step {iteration} ---")
|
||
|
||
# Try all pairs of clauses
|
||
for i in range(len(test_clauses)):
|
||
for j in range(i + 1, len(test_clauses)):
|
||
resolvent = self._resolve(test_clauses[i], test_clauses[j])
|
||
|
||
if resolvent is not None:
|
||
if len(resolvent) == 0:
|
||
# Empty clause = contradiction!
|
||
print(f"Empty clause from {test_clauses[i]} ∩ {test_clauses[j]}")
|
||
print("⚡ Contradiction found! KB entails query.")
|
||
return True
|
||
|
||
# Check if new clause is actually new
|
||
if resolvent not in test_clauses and resolvent not in new_clauses:
|
||
new_clauses.append(resolvent)
|
||
# print(f"New: {resolvent} from {test_clauses[i]} ∩ {test_clauses[j]}")
|
||
|
||
# If no new clauses, we're done
|
||
if not new_clauses:
|
||
print("No new clauses. Query not entailed.")
|
||
return False
|
||
|
||
# Add new clauses
|
||
test_clauses.extend(new_clauses)
|
||
new_clauses = []
|
||
|
||
# Safety limit
|
||
if iteration > 20:
|
||
print("Max iterations reached.")
|
||
return False
|
||
|
||
def _resolve(self, clause1, clause2):
|
||
"""Resolve two clauses if they have complementary literals"""
|
||
for literal in clause1:
|
||
complement = self._negate(literal)
|
||
if complement in clause2:
|
||
# Found resolution: combine clauses, remove complementary pair
|
||
resolvent = (clause1 - {literal}) | (clause2 - {complement})
|
||
return resolvent
|
||
return None
|
||
|
||
# ==================== SENSOR RULE GENERATION ====================
|
||
|
||
def _generate_clauses_from_sensor(self, sensor):
|
||
"""Generate CNF clauses based on sensor readings"""
|
||
# Add sensor as unit clause
|
||
self.cnf_clauses.append({sensor})
|
||
|
||
# Dispatch to specific sensor handlers
|
||
if self._handle_stench_sensors(sensor): return
|
||
if self._handle_breeze_sensors(sensor): return
|
||
if self._handle_glitter_sensors(sensor): return
|
||
if self._handle_scream_sensors(sensor): return
|
||
if self._handle_bump_sensors(sensor): return
|
||
|
||
def _handle_stench_sensors(self, sensor):
|
||
"""Handle stench-related sensors"""
|
||
# STENCH: S11, S22, etc.
|
||
stench_match = re.match(r'^S(\d)(\d)$', sensor)
|
||
if stench_match:
|
||
x, y = int(stench_match.group(1)), int(stench_match.group(2))
|
||
pos = f"{x}{y}"
|
||
|
||
# S11 → ¬W11 becomes {-S11, -W11}
|
||
self.cnf_clauses.append({f"-S{pos}", f"-W{pos}"})
|
||
|
||
# S11 → (W21 ∨ W12 ∨ ...) becomes {-S11, W21, W12, ...}
|
||
neighbors = self._get_neighbors(x, y)
|
||
if neighbors:
|
||
neighbor_wumpus = [f"W{nx}{ny}" for nx, ny in neighbors]
|
||
clause = {f"-S{pos}"} | set(neighbor_wumpus)
|
||
self.cnf_clauses.append(clause)
|
||
|
||
print(f" Generated: S{pos} → ¬W{pos} ∧ (W-neighbors)")
|
||
return True
|
||
|
||
# NO STENCH: -S12, -S22, etc.
|
||
no_stench_match = re.match(r'^-S(\d)(\d)$', sensor)
|
||
if no_stench_match:
|
||
x, y = int(no_stench_match.group(1)), int(no_stench_match.group(2))
|
||
pos = f"{x}{y}"
|
||
|
||
# -S12 → ¬W(neighbors) becomes {S12, -W11}, {S12, -W22}, etc.
|
||
neighbors = self._get_neighbors(x, y)
|
||
for nx, ny in neighbors:
|
||
neighbor_pos = f"{nx}{ny}"
|
||
self.cnf_clauses.append({f"S{pos}", f"-W{neighbor_pos}"})
|
||
|
||
print(f" Generated: ¬S{pos} → ¬W-neighbors")
|
||
return True
|
||
|
||
return False
|
||
|
||
def _handle_breeze_sensors(self, sensor):
|
||
"""Handle breeze-related sensors"""
|
||
# BREEZE: B11, B22, etc.
|
||
breeze_match = re.match(r'^B(\d)(\d)$', sensor)
|
||
if breeze_match:
|
||
x, y = int(breeze_match.group(1)), int(breeze_match.group(2))
|
||
pos = f"{x}{y}"
|
||
|
||
# B11 → ¬P11 becomes {-B11, -P11}
|
||
self.cnf_clauses.append({f"-B{pos}", f"-P{pos}"})
|
||
|
||
# B11 → (P21 ∨ P12 ∨ ...) becomes {-B11, P21, P12, ...}
|
||
neighbors = self._get_neighbors(x, y)
|
||
if neighbors:
|
||
neighbor_pits = [f"P{nx}{ny}" for nx, ny in neighbors]
|
||
clause = {f"-B{pos}"} | set(neighbor_pits)
|
||
self.cnf_clauses.append(clause)
|
||
|
||
print(f" Generated: B{pos} → ¬P{pos} ∧ (P-neighbors)")
|
||
return True
|
||
|
||
# NO BREEZE: -B12, -B22, etc.
|
||
no_breeze_match = re.match(r'^-B(\d)(\d)$', sensor)
|
||
if no_breeze_match:
|
||
x, y = int(no_breeze_match.group(1)), int(no_breeze_match.group(2))
|
||
pos = f"{x}{y}"
|
||
|
||
# -B12 → ¬P(neighbors) becomes {B12, -P11}, {B12, -P22}, etc.
|
||
neighbors = self._get_neighbors(x, y)
|
||
for nx, ny in neighbors:
|
||
neighbor_pos = f"{nx}{ny}"
|
||
self.cnf_clauses.append({f"B{pos}", f"-P{neighbor_pos}"})
|
||
|
||
print(f" Generated: ¬B{pos} → ¬P-neighbors")
|
||
return True
|
||
|
||
return False
|
||
|
||
def _handle_glitter_sensors(self, sensor):
|
||
"""Handle glitter-related sensors"""
|
||
# GLITTER: G11, G22, etc.
|
||
glitter_match = re.match(r'^G(\d)(\d)$', sensor)
|
||
if glitter_match:
|
||
x, y = int(glitter_match.group(1)), int(glitter_match.group(2))
|
||
pos = f"{x}{y}"
|
||
|
||
# G11 → Gold11 becomes {-G11, Gold11}
|
||
self.cnf_clauses.append({f"-G{pos}", f"Gold{pos}"})
|
||
print(f" Generated: G{pos} → Gold{pos}")
|
||
return True
|
||
|
||
# NO GLITTER: -G11, -G22, etc.
|
||
no_glitter_match = re.match(r'^-G(\d)(\d)$', sensor)
|
||
if no_glitter_match:
|
||
x, y = int(no_glitter_match.group(1)), int(no_glitter_match.group(2))
|
||
pos = f"{x}{y}"
|
||
|
||
# -G11 → ¬Gold11 becomes {G11, -Gold11}
|
||
self.cnf_clauses.append({f"G{pos}", f"-Gold{pos}"})
|
||
print(f" Generated: ¬G{pos} → ¬Gold{pos}")
|
||
return True
|
||
|
||
return False
|
||
|
||
def _handle_scream_sensors(self, sensor):
|
||
"""Handle scream-related sensors"""
|
||
# SCREAM: Sc
|
||
if sensor == "Sc":
|
||
# Sc → WumpusDead becomes {-Sc, WumpusDead}
|
||
self.cnf_clauses.append({"-Sc", "WumpusDead"})
|
||
# WumpusDead → ¬W_anywhere: for all positions
|
||
for x in range(1, 5):
|
||
for y in range(1, 5):
|
||
self.cnf_clauses.append({"-WumpusDead", f"-W{x}{y}"})
|
||
print(f" Generated: Sc → WumpusDead → ¬W(everywhere)")
|
||
return True
|
||
|
||
# NO SCREAM: -Sc
|
||
if sensor == "-Sc":
|
||
# -Sc → WumpusAlive becomes {Sc, WumpusAlive}
|
||
self.cnf_clauses.append({"Sc", "WumpusAlive"})
|
||
print(f" Generated: ¬Sc → WumpusAlive")
|
||
return True
|
||
|
||
return False
|
||
|
||
def _handle_bump_sensors(self, sensor):
|
||
"""Handle bump-related sensors"""
|
||
# BUMP: Bump11, Bump22, etc.
|
||
bump_match = re.match(r'^Bump(\d)(\d)$', sensor)
|
||
if bump_match:
|
||
x, y = int(bump_match.group(1)), int(bump_match.group(2))
|
||
print(f" Note: Bump{x}{y} indicates wall collision")
|
||
return True
|
||
|
||
# NO BUMP: -Bump11, -Bump22, etc.
|
||
no_bump_match = re.match(r'^-Bump(\d)(\d)$', sensor)
|
||
if no_bump_match:
|
||
x, y = int(no_bump_match.group(1)), int(no_bump_match.group(2))
|
||
print(f" Note: ¬Bump{x}{y} indicates successful movement")
|
||
return True
|
||
|
||
return False
|
||
|
||
# ==================== UTILITY METHODS ====================
|
||
|
||
def _negate(self, literal):
|
||
"""Negate a literal"""
|
||
if literal.startswith('-'):
|
||
return literal[1:]
|
||
else:
|
||
return f"-{literal}"
|
||
|
||
def _get_neighbors(self, x, y):
|
||
"""Get valid neighbors in 4x4 grid"""
|
||
neighbors = []
|
||
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
|
||
nx, ny = x + dx, y + dy
|
||
if 1 <= nx <= 4 and 1 <= ny <= 4:
|
||
neighbors.append((nx, ny))
|
||
return neighbors
|
||
|
||
|
||
# ==================== TEST SCENARIOS ====================
|
||
|
||
|
||
def test_gym_integration():
|
||
"""Test direct integration with gym environment step results"""
|
||
print("\n🎮 Test 3: Direct Gym Environment Integration")
|
||
print("="*60)
|
||
|
||
agent = KnowledgebasedAgent("CNF")
|
||
|
||
print("📡 Simulating gym environment step results...")
|
||
|
||
# Simulate step result from your example
|
||
step_result_1 = (
|
||
{'x': 0, 'y': 0, 'gold': False, 'direction': 1, 'arrow': True,
|
||
'stench': True, 'breeze': False, 'glitter': False, 'bump': False, 'scream': False},
|
||
[-1], # reward
|
||
False, # terminated (agent survives)
|
||
False, # truncated
|
||
{'info': 'step 1'}
|
||
)
|
||
|
||
# Agent survives at position [1,1] (converted from [0,0])
|
||
agent.TELL(step_result_1)
|
||
|
||
# Simulate death scenario
|
||
step_result_2 = (
|
||
{'x': 1, 'y': 0, 'gold': False, 'direction': 1, 'arrow': True,
|
||
'stench': False, 'breeze': False, 'glitter': False, 'bump': False, 'scream': False},
|
||
[-1000], # death reward
|
||
True, # terminated (agent dies)
|
||
False, # truncated
|
||
{'info': 'death'}
|
||
)
|
||
|
||
# Agent dies at position [2,1] (converted from [1,0])
|
||
agent.TELL(step_result_2)
|
||
|
||
print("\n🔍 Testing queries with survival logic...")
|
||
agent.ASK("W11") # Should be FALSE (agent survived [1,1])
|
||
agent.ASK("P11") # Should be FALSE (agent survived [1,1])
|
||
agent.ASK("W21") # Could be TRUE (agent died at [2,1])
|
||
agent.ASK("P21") # Should be False (no breez at [1,1])
|
||
|
||
print("\n📋 Generated clauses:")
|
||
agent.print_clauses()
|
||
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print("🏰 CNF Resolution for Wumpus World - Complete Implementation")
|
||
print("=" * 70)
|
||
|
||
test_gym_integration()
|
||
|
||
print("\n✨ CNF Resolution correctly handles all Wumpus World logic!")
|