+"""Query graph for municipal code data extraction."""
+
+from __future__ import annotations
+from typing import Dict, Any, TypedDict, Annotated
+from langgraph.graph import StateGraph, START, END
+from langchain_core.runnables import RunnableConfig
+import json
+from agent_graphs.queries import (
+ BUILDING_REQUIREMENTS_QUERIES,
+ PARKING_QUERIES,
+ SIGNAGE_QUERIES,
+ LOT_REQUIREMENTS_QUERIES,
+ BUILDING_PLACEMENT_QUERIES,
+ LANDSCAPING_QUERIES,
+ PERMITTED_USES_QUERIES,
+ format_query,
+)
+
+from agent_graphs.models import Answer
+from agent_graphs.configurations import QuerierConfiguration
+from qdrant_wrapper.qdrant_retriever import QdrantRetriever
+
+# Custom reducer function for dictionary merge
+def dict_merge(existing: Dict[str, Any], new: Dict[str, Any]) -> Dict[str, Any]:
+ """Merge two dictionaries without overwriting the existing one."""
+ result = existing.copy()
+ for key, value in new.items():
+ if key not in result:
+ result[key] = value
+ return result
+
+# Define the state with reducer functions
+class QuerierState(TypedDict):
+ document_id: str
+ zone_code: str
+ results: Annotated[Dict[str, Dict[str, Any]], dict_merge]
+
+def get_config(config: RunnableConfig) -> QuerierConfiguration:
+ """Get the full configuration object."""
+ return QuerierConfiguration.from_runnable_config(config)
+
+def init_state(state: QuerierState, config: RunnableConfig) -> QuerierState:
+ if "document_id" not in state.keys():
+ raise ValueError("Missing required key: document_id")
+ if "zone_code" not in state.keys():
+ raise ValueError("Missing required key: zone_code")
+ return {
+ **state,
+ "results": {}
+ }
+
+async def building_requirements_node(state: QuerierState, config: RunnableConfig) -> QuerierState:
+ """Query building requirements from the municipal code."""
+ document_id = state["document_id"]
+ zone_code = state["zone_code"]
+
+ # Create a dictionary mapping query types to their queries
+ queries = {
+ "maximum_building_height": format_query(BUILDING_REQUIREMENTS_QUERIES["maximum_building_height"], zone_code=zone_code),
+ "maximum_lot_coverage": format_query(BUILDING_REQUIREMENTS_QUERIES["maximum_lot_coverage"], zone_code=zone_code)
+ }
+
+ retriever = QdrantRetriever(document_id=document_id)
+ await retriever.initialize()
+
+ # Get the questions list and keep track of which index corresponds to which query type
+ questions = list(queries.values())
+ query_vars = list(queries.keys())
+
+ # Execute queries in parallel as before
+ raw_results = await retriever.execute_queries_in_parallel(questions, Answer)
+ results = {
+ query_var: raw_results[i]
+ for i, query_var in enumerate(query_vars)
+ }
+ return {
+ "results": {
+ "building_requirements": results
+ }
+ }
+
+async def parking_node(state: QuerierState, config: RunnableConfig) -> Dict[str, Any]:
+ """Query parking requirements from the municipal code."""
+ document_id = state["document_id"]
+ zone_code = state["zone_code"]
+
+ queries = {
+ "aisle_width": format_query(PARKING_QUERIES["aisle_width"], zone_code=zone_code),
+ "curbing_requirements": format_query(PARKING_QUERIES["curbing_requirements"], zone_code=zone_code),
+ "striping_requirements": format_query(PARKING_QUERIES["striping_requirements"], zone_code=zone_code),
+ "drainage_requirements": format_query(PARKING_QUERIES["drainage_requirements"], zone_code=zone_code),
+ "parking_stalls": format_query(PARKING_QUERIES["parking_stalls"], zone_code=zone_code)
+ }
+
+ retriever = QdrantRetriever(document_id=document_id)
+ await retriever.initialize()
+
+ # Get the questions list and keep track of which index corresponds to which query type
+ questions = list(queries.values())
+ query_vars = list(queries.keys())
+
+ # Execute queries in parallel as before
+ raw_results = await retriever.execute_queries_in_parallel(questions, Answer)
+ results = {
+ query_var: raw_results[i]
+ for i, query_var in enumerate(query_vars)
+ }
+ return {
+ "results": {
+ "parking_requirements": results
+ }
+ }
+
+async def signs_node(state: Dict[str, Any], config: RunnableConfig) -> Dict[str, Any]:
+ """Query signage requirements from the municipal code."""
+ document_id = state["document_id"]
+ zone_code = state["zone_code"]
+
+ # Create a dictionary mapping query types to their queries
+ queries = {
+ "permitted_signs": format_query(SIGNAGE_QUERIES["permitted_signs"], zone_code=zone_code),
+ "prohibited_signs": format_query(SIGNAGE_QUERIES["prohibited_signs"], zone_code=zone_code),
+ "design_requirements": format_query(SIGNAGE_QUERIES["design_requirements"], zone_code=zone_code)
+ }
+
+ retriever = QdrantRetriever(document_id=document_id)
+ await retriever.initialize()
+
+ # Get the questions list and keep track of which index corresponds to which query type
+ questions = list(queries.values())
+ query_vars = list(queries.keys())
+
+ # Execute queries in parallel as before
+ raw_results = await retriever.execute_queries_in_parallel(questions, Answer)
+ results = {
+ query_var: raw_results[i]
+ for i, query_var in enumerate(query_vars)
+ }
+ return {
+ "results": {
+ "signage_requirements": results
+ }
+ }
+
+async def lot_requirements_node(state: Dict[str, Any], config: RunnableConfig) -> Dict[str, Any]:
+ """Query lot requirements from the municipal code."""
+ document_id = state["document_id"]
+ zone_code = state["zone_code"]
+
+ queries = {
+ "density": format_query(LOT_REQUIREMENTS_QUERIES["density"], zone_code=zone_code),
+ "lot_size": format_query(LOT_REQUIREMENTS_QUERIES["lot_size"], zone_code=zone_code),
+ "lot_width": format_query(LOT_REQUIREMENTS_QUERIES["lot_width"], zone_code=zone_code),
+ "lot_frontage": format_query(LOT_REQUIREMENTS_QUERIES["lot_frontage"], zone_code=zone_code),
+ "living_area": format_query(LOT_REQUIREMENTS_QUERIES["living_area"], zone_code=zone_code)
+ }
+
+ # Create a new retriever instance for this node
+ retriever = QdrantRetriever(document_id=document_id)
+ await retriever.initialize()
+ questions = list(queries.values())
+ query_vars = list(queries.keys())
+
+ raw_results = await retriever.execute_queries_in_parallel(questions, Answer)
+ results = {
+ query_var: raw_results[i]
+ for i, query_var in enumerate(query_vars)
+ }
+ return {
+ "results": {
+ "lot_requirements": results
+ }
+ }
+
+async def building_placement_node(state: Dict[str, Any], config: RunnableConfig) -> Dict[str, Any]:
+ """Query building placement requirements from the municipal code."""
+ document_id = state["document_id"]
+ zone_code = state["zone_code"]
+
+ # Create a new retriever instance for this node
+ retriever = QdrantRetriever(document_id=document_id)
+ await retriever.initialize()
+
+ queries = {
+ "front_setback": format_query(BUILDING_PLACEMENT_QUERIES["front_setback"], zone_code=zone_code),
+ "street_side_setback": format_query(BUILDING_PLACEMENT_QUERIES["street_side_setback"], zone_code=zone_code),
+ "side_yard_setback": format_query(BUILDING_PLACEMENT_QUERIES["side_yard_setback"], zone_code=zone_code),
+ "rear_setback": format_query(BUILDING_PLACEMENT_QUERIES["rear_setback"], zone_code=zone_code),
+ "accessory_building_setback": format_query(BUILDING_PLACEMENT_QUERIES["accessory_building_setback"], zone_code=zone_code)
+ }
+
+ questions = list(queries.values())
+ query_vars = list(queries.keys())
+
+ raw_results = await retriever.execute_queries_in_parallel(questions, Answer)
+ results = {
+ query_var: raw_results[i]
+ for i, query_var in enumerate(query_vars)
+ }
+ return {
+ "results": {
+ "building_placement_requirements": results
+ }
+ }
+
+async def landscaping_requirements_node(state: Dict[str, Any], config: RunnableConfig) -> Dict[str, Any]:
+ """Query landscaping requirements from the municipal code."""
+ document_id = state["document_id"]
+ zone_code = state["zone_code"]
+
+ # Create a new retriever instance for this node
+ retriever = QdrantRetriever(document_id=document_id)
+ await retriever.initialize()
+
+ queries = {
+ "plant_sizes": format_query(LANDSCAPING_QUERIES["plant_sizes"], zone_code=zone_code),
+ "landscape_plan_review": format_query(LANDSCAPING_QUERIES["landscape_plan_review"], zone_code=zone_code),
+ "species_variation": format_query(LANDSCAPING_QUERIES["species_variation"], zone_code=zone_code),
+ "performance_guarantee": format_query(LANDSCAPING_QUERIES["performance_guarantee"], zone_code=zone_code)
+ }
+
+ questions = list(queries.values())
+ query_vars = list(queries.keys())
+
+ raw_results = await retriever.execute_queries_in_parallel(questions, Answer)
+ results = {
+ query_var: raw_results[i]
+ for i, query_var in enumerate(query_vars)
+ }
+ return {
+ "results": {
+ "landscaping_requirements": results
+ }
+ }
+
+# Permitted uses query node
+async def permitted_uses_node(state: QuerierState, config: RunnableConfig) -> QuerierState:
+ """Query permitted uses from the municipal code."""
+ document_id = state["document_id"]
+ zone_code = state["zone_code"]
+ configs = get_config(config)