7 examples
Resource exhaustion
System resources fully consumed, causing failures.
[ FAQ1 ]
What is resource exhaustion?
Resource exhaustion occurs when an application or system fully utilizes available system resources—such as memory, CPU, file descriptors, or threads—to the point where no additional resources can be allocated. This typically leads to degraded performance, denial of service conditions, or outright application failures. Common scenarios include running out of memory due to memory leaks, exhausting file descriptors through unclosed connections, or saturating thread pools with too many concurrent operations. Persistent resource exhaustion severely impacts system reliability and can result in downtime or data loss.
[ FAQ2 ]
How to fix resource exhaustion issues
To fix resource exhaustion, first identify and resolve underlying causes such as memory leaks, unclosed file descriptors, or excessive thread creation. Use system monitoring tools to track resource utilization proactively, setting appropriate limits and alarms to alert administrators before critical thresholds are reached. Employ resource management practices like connection pooling, proper cleanup of unused resources, and configuring thread pools or queues with sensible limits. Regularly review and tune application and system configurations, ensuring resource limits are correctly aligned with expected workloads to maintain stability and prevent exhaustion scenarios.
diff block
type: string
push:
properties:
+ generators:
+ items:
+ properties:
+ destinationSecretName:
+ type: string
+ generatorRef:
+ properties:
+ kind:
+ allOf:
+ - enum:
+ - Password
+ - UUID
+ - enum:
+ - Password
+ - UUID
+ description: Specify the Kind of the generator resource
+ type: string
+ name:
+ type: string
+ required:
+ - kind
+ - name
+ type: object
+ required:
+ - destinationSecretName
+ - generatorRef
+ type: object
+ type: array
greptile
style: No maximum limit on number of generators that can be specified. Consider adding maxItems to prevent resource exhaustion.
diff block
"stream": stream,
"genius": genius
}
-
if session_id:
payload["sessionId"] = session_id
-
- response = await self.client.post(url, json=payload, headers=self.headers)
- response.raise_for_status()
- return response.json()
-
- async def search_repositories(self, messages, repositories, session_id=None, genius=True):
+
+ client_timeout = timeout if timeout is not None else self.default_timeout
+ async with httpx.AsyncClient(timeout=client_timeout) as client:
+ response = await client.post(url, json=payload, headers=self.headers)
greptile
logic: Creating a new client for each request could lead to resource exhaustion. Consider reusing self.client instead of creating a new one.
diff block
@IsOptional()
PG_SSL_ALLOW_SELF_SIGNED = false;
+ @ConfigVariablesMetadata({
+ group: ConfigVariablesGroup.ServerConfig,
+ description: 'Enable pg connection pool sharing across tenants',
+ isEnvOnly: true,
+ type: ConfigVariableType.BOOLEAN,
+ })
+ @IsOptional()
+ PG_ENABLE_POOL_SHARING = true;
+
+ @ConfigVariablesMetadata({
+ group: ConfigVariablesGroup.ServerConfig,
+ description: 'Maximum number of clients in pg connection pool',
+ isEnvOnly: true,
+ type: ConfigVariableType.NUMBER,
+ })
+ @CastToPositiveNumber()
+ @IsOptional()
+ PG_POOL_MAX_CONNECTIONS = 10;
greptile
style: Consider adding a maximum value validation for max connections to prevent resource exhaustion
diff block
+use anyhow::{anyhow, Result};
+use axum::{extract::Json, Extension};
+use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl};
+use diesel_async::RunQueryDsl;
+use reqwest::StatusCode;
+use serde::{Deserialize, Serialize};
+use serde_yaml;
+use std::collections::HashMap;
+use uuid::Uuid;
+use regex::Regex;
+use tokio::task::JoinSet;
+
+use crate::{
+ database::{
+ lib::get_pg_pool,
+ models::{Dataset, DataSource, User},
+ schema::{data_sources, datasets},
+ },
+ routes::rest::ApiResponse,
+ utils::{
+ security::checks::is_user_workspace_admin_or_data_admin,
+ user::user_info::get_user_organization_id,
+ query_engine::{
+ credentials::get_data_source_credentials,
+ import_dataset_columns::{retrieve_dataset_columns_batch, DatasetColumnRecord},
+ },
+ clients::ai::{
+ openai::{OpenAiChatModel, OpenAiChatRole, OpenAiChatContent, OpenAiChatMessage},
+ llm_router::{llm_chat, LlmModel, LlmMessage},
+ },
+ },
+};
+
+#[derive(Debug, Deserialize)]
+pub struct GenerateDatasetRequest {
+ pub data_source_name: String,
+ pub schema: String,
+ pub database: Option<String>,
+ pub model_names: Vec<String>,
+}
+
+#[derive(Debug, Serialize)]
+pub struct GenerateDatasetResponse {
+ pub yml_contents: HashMap<String, String>,
+ pub errors: HashMap<String, String>,
+}
+
+#[derive(Debug, Serialize)]
+struct ModelConfig {
+ models: Vec<Model>,
+}
+
+#[derive(Debug, Serialize)]
+struct Model {
+ name: String,
+ description: String,
+ dimensions: Vec<Dimension>,
+ measures: Vec<Measure>,
+}
+
+#[derive(Debug, Serialize)]
+struct Dimension {
+ name: String,
+ expr: String,
+ #[serde(rename = "type")]
+ type_: String,
+ description: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ searchable: Option<bool>,
+}
+
+#[derive(Debug, Serialize)]
+struct Measure {
+ name: String,
+ expr: String,
+ #[serde(rename = "type")]
+ type_: String,
+ agg: Option<String>,
+ description: String,
+}
+
+// Add type mapping enum
+#[derive(Debug)]
+enum ColumnMappingType {
+ Dimension(String), // String holds the semantic type
+ Measure(String), // String holds the measure type (e.g., "number")
+ Unsupported,
+}
+
+fn map_snowflake_type(type_str: &str) -> ColumnMappingType {
+ // Convert to uppercase for consistent matching
+ let type_upper = type_str.to_uppercase();
+
+ match type_upper.as_str() {
+ // Numeric types that should be measures
+ "NUMBER" | "DECIMAL" | "NUMERIC" | "FLOAT" | "REAL" | "DOUBLE" | "INT" | "INTEGER" |
+ "BIGINT" | "SMALLINT" | "TINYINT" | "BYTEINT" => ColumnMappingType::Measure("number".to_string()),
+
+ // Date/Time types
+ "DATE" | "DATETIME" | "TIME" | "TIMESTAMP" | "TIMESTAMP_LTZ" |
+ "TIMESTAMP_NTZ" | "TIMESTAMP_TZ" => ColumnMappingType::Dimension("timestamp".to_string()),
+
+ // String types
+ "TEXT" | "STRING" | "VARCHAR" | "CHAR" | "CHARACTER" => ColumnMappingType::Dimension("string".to_string()),
+
+ // Boolean type
+ "BOOLEAN" | "BOOL" => ColumnMappingType::Dimension("boolean".to_string()),
+
+ // Unsupported types
+ "ARRAY" | "OBJECT" | "VARIANT" => ColumnMappingType::Unsupported,
+
+ // Default to dimension for unknown types
+ _ => {
+ tracing::warn!("Unknown Snowflake type: {}, defaulting to string dimension", type_str);
+ ColumnMappingType::Dimension("string".to_string())
+ }
+ }
+}
+
+pub async fn generate_datasets(
+ Extension(user): Extension<User>,
+ Json(request): Json<GenerateDatasetRequest>,
+) -> Result<ApiResponse<GenerateDatasetResponse>, (StatusCode, String)> {
+ // Check if user is workspace admin or data admin
+ let organization_id = match get_user_organization_id(&user.id).await {
+ Ok(id) => id,
+ Err(e) => {
+ tracing::error!("Error getting user organization id: {:?}", e);
+ return Err((
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "Error getting user organization id".to_string(),
+ ));
+ }
+ };
+
+ match is_user_workspace_admin_or_data_admin(&user, &organization_id).await {
+ Ok(true) => (),
+ Ok(false) => {
+ return Err((
+ StatusCode::FORBIDDEN,
+ "Insufficient permissions".to_string(),
+ ))
+ }
+ Err(e) => {
+ tracing::error!("Error checking user permissions: {:?}", e);
+ return Err((
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "Error checking user permissions".to_string(),
+ ));
+ }
+ }
+
+ match generate_datasets_handler(&request, &organization_id).await {
+ Ok(response) => Ok(ApiResponse::JsonData(response)),
+ Err(e) => {
+ tracing::error!("Error generating datasets: {:?}", e);
+ Err((
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "Error generating datasets".to_string(),
+ ))
+ }
+ }
+}
+
+async fn enhance_yaml_with_descriptions(yaml: String) -> Result<String> {
+ const DESCRIPTION_PLACEHOLDER: &str = "{NEED DESCRIPTION HERE}";
+
+ // Skip OpenAI call if no placeholders exist
+ if !yaml.contains(DESCRIPTION_PLACEHOLDER) {
+ return Ok(yaml);
+ }
+
+ let messages = vec![
+ LlmMessage::new(
+ "developer".to_string(),
+ "You are a YAML description enhancer. Your output must be wrapped in markdown code blocks using ```yml format.
+ Your task is to ONLY replace text matching exactly \"{NEED DESCRIPTION HERE}\" with appropriate descriptions. Do not modify any other parts of the YAML or other descriptions without the placeholder. You should still return the entire YAML in your output.
+ DO NOT modify any other part of the YAML.
+ DO NOT add any explanations or text outside the ```yml block.
+ Return the complete YAML wrapped in markdown, with only the placeholders replaced.".to_string(),
+ ),
+ LlmMessage::new(
+ "user".to_string(),
+ yaml,
+ ),
+ ];
+
+ let response = llm_chat(
+ LlmModel::OpenAi(OpenAiChatModel::O3Mini),
+ &messages,
+ 0.1,
+ 2048,
+ 120,
+ None,
+ false,
+ None,
+ &Uuid::new_v4(),
+ &Uuid::new_v4(),
+ crate::utils::clients::ai::langfuse::PromptName::CustomPrompt("enhance_yaml_descriptions".to_string()),
+ )
+ .await?;
+
+ // Extract YAML from markdown code blocks
+ let re = Regex::new(r"```yml\n([\s\S]*?)\n```").unwrap();
+ let yaml = match re.captures(&response) {
+ Some(caps) => caps.get(1).unwrap().as_str().to_string(),
+ None => return Err(anyhow!("Failed to extract YAML from response")),
+ };
+
+ Ok(yaml)
+}
+
+async fn generate_model_yaml(
+ model_name: &str,
+ ds_columns: &[DatasetColumnRecord],
+ schema: &str,
+) -> Result<String> {
+ // Filter columns for this model
+ let model_columns: Vec<_> = ds_columns
+ .iter()
+ .filter(|col| {
+ col.dataset_name.to_lowercase() == model_name.to_lowercase()
+ && col.schema_name.to_lowercase() == schema.to_lowercase()
+ })
+ .collect();
+
+ if model_columns.is_empty() {
+ return Err(anyhow!("No columns found for model"));
+ }
+
+ let mut dimensions = Vec::new();
+ let mut measures = Vec::new();
+
+ // Process each column and categorize as dimension or measure
+ for col in model_columns {
+ match map_snowflake_type(&col.type_) {
+ ColumnMappingType::Dimension(semantic_type) => {
+ dimensions.push(Dimension {
+ name: col.name.clone(),
+ expr: col.name.clone(),
+ type_: semantic_type,
+ description: "{NEED DESCRIPTION HERE}".to_string(),
+ searchable: Some(false),
+ });
+ }
+ ColumnMappingType::Measure(measure_type) => {
+ measures.push(Measure {
+ name: col.name.clone(),
+ expr: col.name.clone(),
+ type_: measure_type,
+ agg: Some("sum".to_string()),
+ description: "{NEED DESCRIPTION HERE}".to_string(),
+ });
+ }
+ ColumnMappingType::Unsupported => {
+ tracing::warn!(
+ "Skipping unsupported column type: {} for column: {}",
+ col.type_,
+ col.name
+ );
+ }
+ }
+ }
+
+ let model = Model {
+ name: model_name.to_string(),
+ description: format!("Generated model for {}", model_name),
+ dimensions,
+ measures,
+ };
+
+ let config = ModelConfig {
+ models: vec![model],
+ };
+
+ let yaml = serde_yaml::to_string(&config)?;
+
+ // Enhance descriptions using OpenAI
+ let enhanced_yaml = enhance_yaml_with_descriptions(yaml).await?;
+
+ Ok(enhanced_yaml)
+}
+
+async fn generate_datasets_handler(
+ request: &GenerateDatasetRequest,
+ organization_id: &Uuid,
+) -> Result<GenerateDatasetResponse> {
+ let mut conn = get_pg_pool().get().await?;
+
+ // Get data source
+ let data_source = match data_sources::table
+ .filter(data_sources::name.eq(&request.data_source_name))
+ .filter(data_sources::organization_id.eq(organization_id))
+ .filter(data_sources::deleted_at.is_null())
+ .first::<DataSource>(&mut conn)
+ .await
+ {
+ Ok(ds) => ds,
+ Err(e) => return Err(anyhow!("Data source not found: {}", e)),
+ };
+
+ // Get credentials
+ let credentials = get_data_source_credentials(&data_source.secret_id, &data_source.type_, false).await?;
+
+ // Prepare tables for batch validation
+ let tables_to_validate: Vec<(String, String)> = request
+ .model_names
+ .iter()
+ .map(|name| (name.clone(), request.schema.clone()))
+ .collect();
+
+ // Get all columns in one batch
+ let ds_columns = match retrieve_dataset_columns_batch(&tables_to_validate, &credentials, request.database.clone()).await {
+ Ok(cols) => cols,
+ Err(e) => return Err(anyhow!("Failed to get columns from data source: {}", e)),
+ };
+
+ // Process models concurrently
+ let mut join_set = JoinSet::new();
+
+ for model_name in &request.model_names {
+ let model_name = model_name.clone();
+ let schema = request.schema.clone();
+ let ds_columns = ds_columns.clone();
+
+ join_set.spawn(async move {
+ let result = generate_model_yaml(&model_name, &ds_columns, &schema).await;
+ (model_name, result)
+ });
+ }
greptile
style: No limit on concurrent tasks - could lead to resource exhaustion with large model_names lists
diff block
+"""
+TalkHier: A hierarchical multi-agent framework for content generation and refinement.
+Implements structured communication and evaluation protocols.
+"""
+
+import json
+import logging
+from dataclasses import dataclass, asdict
+from datetime import datetime
+from enum import Enum
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Union
+from concurrent.futures import ThreadPoolExecutor
+
+from swarms import Agent
+from swarms.structs.conversation import Conversation
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+class AgentRole(Enum):
+ """Defines the possible roles for agents in the system."""
+
+ SUPERVISOR = "supervisor"
+ GENERATOR = "generator"
+ EVALUATOR = "evaluator"
+ REVISOR = "revisor"
+
+
+@dataclass
+class CommunicationEvent:
+ """Represents a structured communication event between agents."""
+ message: str
+ background: Optional[str] = None
+ intermediate_output: Optional[Dict[str, Any]] = None
+ sender: str = ""
+ receiver: str = ""
+ timestamp: str = str(datetime.now())
+
+
+class TalkHier:
+ """
+ A hierarchical multi-agent system for content generation and refinement.
+
+ Implements the TalkHier framework with structured communication protocols
+ and hierarchical refinement processes.
+
+ Attributes:
+ max_iterations: Maximum number of refinement iterations
+ quality_threshold: Minimum score required for content acceptance
+ model_name: Name of the LLM model to use
+ base_path: Path for saving agent states
+ """
+
+ def __init__(
+ self,
+ max_iterations: int = 3,
+ quality_threshold: float = 0.8,
+ model_name: str = "gpt-4",
+ base_path: Optional[str] = None,
+ return_string: bool = False,
+ ):
+ """Initialize the TalkHier system."""
+ self.max_iterations = max_iterations
+ self.quality_threshold = quality_threshold
+ self.model_name = model_name
+ self.return_string = return_string
+ self.base_path = (
+ Path(base_path) if base_path else Path("./agent_states")
+ )
+ self.base_path.mkdir(exist_ok=True)
+
+ # Initialize agents
+ self._init_agents()
+
+ # Create conversation
+ self.conversation = Conversation()
+
+ def _safely_parse_json(self, json_str: str) -> Dict[str, Any]:
+ """
+ Safely parse JSON string, handling various formats and potential errors.
+
+ Args:
+ json_str: String containing JSON data
+
+ Returns:
+ Parsed dictionary
+ """
+ try:
+ # Try direct JSON parsing
+ return json.loads(json_str)
+ except json.JSONDecodeError:
+ try:
+ # Try extracting JSON from potential text wrapper
+ import re
+
+ json_match = re.search(r"\{.*\}", json_str, re.DOTALL)
+ if (json_match):
+ return json.loads(json_match.group())
+ # Try extracting from markdown code blocks
+ code_block_match = re.search(
+ r"```(?:json)?\s*(\{.*?\})\s*```",
+ json_str,
+ re.DOTALL,
+ )
+ if (code_block_match):
+ return json.loads(code_block_match.group(1))
+ except Exception as e:
+ logger.warning(f"Failed to extract JSON: {str(e)}")
+
+ # Fallback: create structured dict from text
+ return {
+ "content": json_str,
+ "metadata": {
+ "parsed": False,
+ "timestamp": str(datetime.now()),
+ },
+ }
+
+ def _get_criteria_generator_prompt(self) -> str:
+ """Get the prompt for the criteria generator agent."""
+ return """You are a Criteria Generator agent responsible for creating task-specific evaluation criteria.
+Analyze the task and generate appropriate evaluation criteria based on:
+- Task type and complexity
+- Required domain knowledge
+- Target audience expectations
+- Quality requirements
+
+Output all responses in strict JSON format:
+{
+ "criteria": {
+ "criterion_name": {
+ "description": "Detailed description of what this criterion measures",
+ "importance": "Weight from 0.0-1.0 indicating importance",
+ "evaluation_guide": "Guidelines for how to evaluate this criterion"
+ }
+ },
+ "metadata": {
+ "task_type": "Classification of the task type",
+ "complexity_level": "Assessment of task complexity",
+ "domain_focus": "Primary domain or field of the task"
+ }
+}"""
+
+ def _init_agents(self) -> None:
+ """Initialize all agents with their specific roles and prompts."""
+ # Main supervisor agent
+ self.main_supervisor = Agent(
+ agent_name="Main-Supervisor",
+ system_prompt=self._get_supervisor_prompt(),
+ model_name=self.model_name,
+ max_loops=1,
+ saved_state_path=str(
+ self.base_path / "main_supervisor.json"
+ ),
+ verbose=True,
+ )
+
+ # Generator agent
+ self.generator = Agent(
+ agent_name="Content-Generator",
+ system_prompt=self._get_generator_prompt(),
+ model_name=self.model_name,
+ max_loops=1,
+ saved_state_path=str(self.base_path / "generator.json"),
+ verbose=True,
+ )
+
+ # Criteria Generator agent
+ self.criteria_generator = Agent(
+ agent_name="Criteria-Generator",
+ system_prompt=self._get_criteria_generator_prompt(),
+ model_name=self.model_name,
+ max_loops=1,
+ saved_state_path=str(self.base_path / "criteria_generator.json"),
+ verbose=True,
+ )
+
+ # Evaluators without criteria (will be set during run)
+ self.evaluators = []
+ for i in range(3):
+ self.evaluators.append(
+ Agent(
+ agent_name=f"Evaluator-{i}",
+ system_prompt=self._get_evaluator_prompt(i),
+ model_name=self.model_name,
+ max_loops=1,
+ saved_state_path=str(
+ self.base_path / f"evaluator_{i}.json"
+ ),
+ verbose=True,
+ )
+ )
+
+ # Revisor agent
+ self.revisor = Agent(
+ agent_name="Content-Revisor",
+ system_prompt=self._get_revisor_prompt(),
+ model_name=self.model_name,
+ max_loops=1,
+ saved_state_path=str(self.base_path / "revisor.json"),
+ verbose=True,
+ )
+
+ def _generate_dynamic_criteria(self, task: str) -> Dict[str, str]:
+ """
+ Generate dynamic evaluation criteria based on the task.
+
+ Args:
+ task: Content generation task description
+
+ Returns:
+ Dictionary containing dynamic evaluation criteria
+ """
+ # Example dynamic criteria generation logic
+ if "technical" in task.lower():
+ return {
+ "accuracy": "Technical correctness and source reliability",
+ "clarity": "Readability and logical structure",
+ "depth": "Comprehensive coverage of technical details",
+ "engagement": "Interest level and relevance to the audience",
+ "technical_quality": "Grammar, spelling, and formatting",
+ }
+ else:
+ return {
+ "accuracy": "Factual correctness and source reliability",
+ "clarity": "Readability and logical structure",
+ "coherence": "Logical consistency and argument structure",
+ "engagement": "Interest level and relevance to the audience",
+ "completeness": "Coverage of the topic and depth",
+ "technical_quality": "Grammar, spelling, and formatting",
+ }
+
+ def _get_supervisor_prompt(self) -> str:
+ """Get the prompt for the supervisor agent."""
+ return """You are a Supervisor agent responsible for orchestrating the content generation process and selecting the best evaluation criteria.
+
+You must:
+1. Analyze tasks and develop strategies
+2. Review multiple evaluator feedback
+3. Select the most appropriate evaluation based on:
+ - Completeness of criteria
+ - Relevance to task
+ - Quality of feedback
+4. Provide clear instructions for content revision
+
+Output all responses in strict JSON format:
+{
+ "thoughts": {
+ "task_analysis": "Analysis of requirements, audience, scope",
+ "strategy": "Step-by-step plan and success metrics",
+ "evaluation_selection": {
+ "chosen_evaluator": "ID of selected evaluator",
+ "reasoning": "Why this evaluation was chosen",
+ "key_criteria": ["List of most important criteria"]
+ }
+ },
+ "next_action": {
+ "agent": "Next agent to engage",
+ "instruction": "Detailed instructions with context"
+ }
+}"""
+
+ def _get_generator_prompt(self) -> str:
+ """Get the prompt for the generator agent."""
+ return """You are a Generator agent responsible for creating high-quality, original content. Your role is to produce content that is engaging, informative, and tailored to the target audience.
+
+When generating content:
+- Thoroughly research and fact-check all information
+- Structure content logically with clear flow
+- Use appropriate tone and language for the target audience
+- Include relevant examples and explanations
+- Ensure content is original and plagiarism-free
+- Consider SEO best practices where applicable
+
+Output all responses in strict JSON format:
+{
+ "content": {
+ "main_body": "The complete generated content with proper formatting and structure",
+ "metadata": {
+ "word_count": "Accurate word count of main body",
+ "target_audience": "Detailed audience description",
+ "key_points": ["List of main points covered"],
+ "sources": ["List of reference sources if applicable"],
+ "readability_level": "Estimated reading level",
+ "tone": "Description of content tone"
+ }
+ }
+}"""
+
+ def _get_evaluator_prompt(self, evaluator_id: int) -> str:
+ """Get the base prompt for an evaluator agent."""
+ return f"""You are Evaluator {evaluator_id}, responsible for critically assessing content quality. Your evaluation must be thorough, objective, and constructive.
+
+When receiving content to evaluate:
+1. First analyze the task description to determine appropriate evaluation criteria
+2. Generate specific criteria based on task requirements
+3. Evaluate content against these criteria
+4. Provide detailed feedback for each criterion
+
+Output all responses in strict JSON format:
+{{
+ "generated_criteria": {{
+ "criteria_name": "description of what this criterion measures",
+ // Add more criteria based on task analysis
+ }},
+ "scores": {{
+ "overall": "0.0-1.0 composite score",
+ "categories": {{
+ // Scores for each generated criterion
+ "criterion_name": "0.0-1.0 score with evidence"
+ }}
+ }},
+ "feedback": [
+ "Specific, actionable improvement suggestions per criterion"
+ ],
+ "strengths": ["Notable positive aspects"],
+ "weaknesses": ["Areas needing improvement"]
+}}"""
+
+ def _get_revisor_prompt(self) -> str:
+ """Get the prompt for the revisor agent."""
+ return """You are a Revisor agent responsible for improving content based on evaluator feedback. Your role is to enhance content while maintaining its core message and purpose.
+
+When revising content:
+- Address all evaluator feedback systematically
+- Maintain consistency in tone and style
+- Preserve accurate information
+- Enhance clarity and flow
+- Fix technical issues
+- Optimize for target audience
+- Track all changes made
+
+Output all responses in strict JSON format:
+{
+ "revised_content": {
+ "main_body": "Complete revised content incorporating all improvements",
+ "metadata": {
+ "word_count": "Updated word count",
+ "changes_made": [
+ "Detailed list of specific changes and improvements",
+ "Reasoning for each major revision",
+ "Feedback points addressed"
+ ],
+ "improvement_summary": "Overview of main enhancements",
+ "preserved_elements": ["Key elements maintained from original"],
+ "revision_approach": "Strategy used for revisions"
+ }
+ }
+}"""
+
+ def _generate_criteria_for_task(self, task: str) -> Dict[str, Any]:
+ """Generate evaluation criteria for the given task."""
+ try:
+ criteria_input = {
+ "task": task,
+ "instruction": "Generate specific evaluation criteria for this task."
+ }
+
+ criteria_response = self.criteria_generator.run(json.dumps(criteria_input))
+ self.conversation.add(
+ role="Criteria-Generator",
+ content=criteria_response
+ )
+
+ return self._safely_parse_json(criteria_response)
+ except Exception as e:
+ logger.error(f"Error generating criteria: {str(e)}")
+ return {"criteria": {}}
+
+ def _create_comm_event(self, sender: Agent, receiver: Agent, response: Dict) -> CommunicationEvent:
+ """Create a structured communication event between agents."""
+ return CommunicationEvent(
+ message=response.get("message", ""),
+ background=response.get("background", ""),
+ intermediate_output=response.get("intermediate_output", {}),
+ sender=sender.agent_name,
+ receiver=receiver.agent_name,
+ )
+
+ def _evaluate_content(self, content: Union[str, Dict], task: str) -> Dict[str, Any]:
+ """Coordinate evaluation process with parallel evaluator execution."""
+ try:
+ content_dict = self._safely_parse_json(content) if isinstance(content, str) else content
+ criteria_data = self._generate_criteria_for_task(task)
+
+ def run_evaluator(evaluator, eval_input):
+ response = evaluator.run(json.dumps(eval_input))
+ return {
+ "evaluator_id": evaluator.agent_name,
+ "evaluation": self._safely_parse_json(response)
+ }
+
+ eval_inputs = [{
+ "task": task,
+ "content": content_dict,
+ "criteria": criteria_data.get("criteria", {})
+ } for _ in self.evaluators]
+
+ with ThreadPoolExecutor() as executor:
+ evaluations = list(executor.map(
+ lambda x: run_evaluator(*x),
+ zip(self.evaluators, eval_inputs)
+ ))
greptile
style: ThreadPoolExecutor should specify max_workers to prevent resource exhaustion with many evaluators
diff block
+PUBLIC_IP=$(ip -4 route get 1.0.0.0 | awk '{print $7; exit}')
+
+# MARK: Rivet server config
+cat << 'EOF' > /etc/rivet-server/config.json
+__RIVET_EDGE_CONFIG__
+EOF
+
+# Systemd service
+cat << 'EOF' > /etc/systemd/system/rivet-edge-server.service
+
+[Unit]
+Description=Rivet Edge Server
+Wants=network-online.target
+After=network-online.target
+ConditionPathExists=/etc/rivet-server/
+
+[Service]
+ExecStart=/usr/local/bin/rivet-edge-server
+Restart=always
+RestartSec=2
+
+# Real time service
+CPUSchedulingPolicy=fifo
+# High CPU priority
+CPUSchedulingPriority=90
+# Prevent killing from system OOM
+OOMScoreAdjust=-1000
+# Kill main process, not children
+KillMode=process
+# Increase limit of file watches
+LimitNOFILE=65536
+# Increase max process limits
+LimitNPROC=infinity
+TasksMax=infinity
greptile
style: Setting both LimitNPROC and TasksMax to infinity could lead to resource exhaustion. Consider setting reasonable limits.
diff block
+use anyhow::{anyhow, Result};
+use axum::{extract::Json, Extension};
+use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl};
+use diesel_async::RunQueryDsl;
+use reqwest::StatusCode;
+use serde::{Deserialize, Serialize};
+use serde_yaml;
+use std::collections::HashMap;
+use uuid::Uuid;
+use regex::Regex;
+use tokio::task::JoinSet;
+
+use crate::{
+ database::{
+ lib::get_pg_pool,
+ models::{Dataset, DataSource, User},
+ schema::{data_sources, datasets},
+ },
+ routes::rest::ApiResponse,
+ utils::{
+ security::checks::is_user_workspace_admin_or_data_admin,
+ user::user_info::get_user_organization_id,
+ query_engine::{
+ credentials::get_data_source_credentials,
+ import_dataset_columns::{retrieve_dataset_columns_batch, DatasetColumnRecord},
+ },
+ clients::ai::{
+ openai::{OpenAiChatModel, OpenAiChatRole, OpenAiChatContent, OpenAiChatMessage},
+ llm_router::{llm_chat, LlmModel, LlmMessage},
+ },
+ },
+};
+
+#[derive(Debug, Deserialize)]
+pub struct GenerateDatasetRequest {
+ pub data_source_name: String,
+ pub schema: String,
+ pub database: Option<String>,
+ pub model_names: Vec<String>,
+}
+
+#[derive(Debug, Serialize)]
+pub struct GenerateDatasetResponse {
+ pub yml_contents: HashMap<String, String>,
+ pub errors: HashMap<String, String>,
+}
+
+#[derive(Debug, Serialize)]
+struct ModelConfig {
+ models: Vec<Model>,
+}
+
+#[derive(Debug, Serialize)]
+struct Model {
+ name: String,
+ description: String,
+ dimensions: Vec<Dimension>,
+ measures: Vec<Measure>,
+}
+
+#[derive(Debug, Serialize)]
+struct Dimension {
+ name: String,
+ expr: String,
+ #[serde(rename = "type")]
+ type_: String,
+ description: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ searchable: Option<bool>,
+}
+
+#[derive(Debug, Serialize)]
+struct Measure {
+ name: String,
+ expr: String,
+ #[serde(rename = "type")]
+ type_: String,
+ agg: Option<String>,
+ description: String,
+}
+
+// Add type mapping enum
+#[derive(Debug)]
+enum ColumnMappingType {
+ Dimension(String), // String holds the semantic type
+ Measure(String), // String holds the measure type (e.g., "number")
+ Unsupported,
+}
+
+fn map_snowflake_type(type_str: &str) -> ColumnMappingType {
+ // Convert to uppercase for consistent matching
+ let type_upper = type_str.to_uppercase();
+
+ match type_upper.as_str() {
+ // Numeric types that should be measures
+ "NUMBER" | "DECIMAL" | "NUMERIC" | "FLOAT" | "REAL" | "DOUBLE" | "INT" | "INTEGER" |
+ "BIGINT" | "SMALLINT" | "TINYINT" | "BYTEINT" => ColumnMappingType::Measure("number".to_string()),
+
+ // Date/Time types
+ "DATE" | "DATETIME" | "TIME" | "TIMESTAMP" | "TIMESTAMP_LTZ" |
+ "TIMESTAMP_NTZ" | "TIMESTAMP_TZ" => ColumnMappingType::Dimension("timestamp".to_string()),
+
+ // String types
+ "TEXT" | "STRING" | "VARCHAR" | "CHAR" | "CHARACTER" => ColumnMappingType::Dimension("string".to_string()),
+
+ // Boolean type
+ "BOOLEAN" | "BOOL" => ColumnMappingType::Dimension("boolean".to_string()),
+
+ // Unsupported types
+ "ARRAY" | "OBJECT" | "VARIANT" => ColumnMappingType::Unsupported,
+
+ // Default to dimension for unknown types
+ _ => {
+ tracing::warn!("Unknown Snowflake type: {}, defaulting to string dimension", type_str);
+ ColumnMappingType::Dimension("string".to_string())
+ }
+ }
+}
+
+pub async fn generate_datasets(
+ Extension(user): Extension<User>,
+ Json(request): Json<GenerateDatasetRequest>,
+) -> Result<ApiResponse<GenerateDatasetResponse>, (StatusCode, String)> {
+ // Check if user is workspace admin or data admin
+ let organization_id = match get_user_organization_id(&user.id).await {
+ Ok(id) => id,
+ Err(e) => {
+ tracing::error!("Error getting user organization id: {:?}", e);
+ return Err((
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "Error getting user organization id".to_string(),
+ ));
+ }
+ };
+
+ match is_user_workspace_admin_or_data_admin(&user, &organization_id).await {
+ Ok(true) => (),
+ Ok(false) => {
+ return Err((
+ StatusCode::FORBIDDEN,
+ "Insufficient permissions".to_string(),
+ ))
+ }
+ Err(e) => {
+ tracing::error!("Error checking user permissions: {:?}", e);
+ return Err((
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "Error checking user permissions".to_string(),
+ ));
+ }
+ }
+
+ match generate_datasets_handler(&request, &organization_id).await {
+ Ok(response) => Ok(ApiResponse::JsonData(response)),
+ Err(e) => {
+ tracing::error!("Error generating datasets: {:?}", e);
+ Err((
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "Error generating datasets".to_string(),
+ ))
+ }
+ }
+}
+
+async fn enhance_yaml_with_descriptions(yaml: String) -> Result<String> {
+ const DESCRIPTION_PLACEHOLDER: &str = "{NEED DESCRIPTION HERE}";
+
+ // Skip OpenAI call if no placeholders exist
+ if !yaml.contains(DESCRIPTION_PLACEHOLDER) {
+ return Ok(yaml);
+ }
+
+ let messages = vec![
+ LlmMessage::new(
+ "developer".to_string(),
+ "You are a YAML description enhancer. Your output must be wrapped in markdown code blocks using ```yml format.
+ Your task is to ONLY replace text matching exactly \"{NEED DESCRIPTION HERE}\" with appropriate descriptions. Do not modify any other parts of the YAML or other descriptions without the placeholder. You should still return the entire YAML in your output.
+ DO NOT modify any other part of the YAML.
+ DO NOT add any explanations or text outside the ```yml block.
+ Return the complete YAML wrapped in markdown, with only the placeholders replaced.".to_string(),
+ ),
+ LlmMessage::new(
+ "user".to_string(),
+ yaml,
+ ),
+ ];
+
+ let response = llm_chat(
+ LlmModel::OpenAi(OpenAiChatModel::O3Mini),
+ &messages,
+ 0.1,
+ 2048,
+ 120,
+ None,
+ false,
+ None,
+ &Uuid::new_v4(),
+ &Uuid::new_v4(),
+ crate::utils::clients::ai::langfuse::PromptName::CustomPrompt("enhance_yaml_descriptions".to_string()),
+ )
+ .await?;
+
+ // Extract YAML from markdown code blocks
+ let re = Regex::new(r"```yml\n([\s\S]*?)\n```").unwrap();
+ let yaml = match re.captures(&response) {
+ Some(caps) => caps.get(1).unwrap().as_str().to_string(),
+ None => return Err(anyhow!("Failed to extract YAML from response")),
+ };
+
+ Ok(yaml)
+}
+
+async fn generate_model_yaml(
+ model_name: &str,
+ ds_columns: &[DatasetColumnRecord],
+ schema: &str,
+) -> Result<String> {
+ // Filter columns for this model
+ let model_columns: Vec<_> = ds_columns
+ .iter()
+ .filter(|col| {
+ col.dataset_name.to_lowercase() == model_name.to_lowercase()
+ && col.schema_name.to_lowercase() == schema.to_lowercase()
+ })
+ .collect();
+
+ if model_columns.is_empty() {
+ return Err(anyhow!("No columns found for model"));
+ }
+
+ let mut dimensions = Vec::new();
+ let mut measures = Vec::new();
+
+ // Process each column and categorize as dimension or measure
+ for col in model_columns {
+ match map_snowflake_type(&col.type_) {
+ ColumnMappingType::Dimension(semantic_type) => {
+ dimensions.push(Dimension {
+ name: col.name.clone(),
+ expr: col.name.clone(),
+ type_: semantic_type,
+ description: "{NEED DESCRIPTION HERE}".to_string(),
+ searchable: Some(false),
+ });
+ }
+ ColumnMappingType::Measure(measure_type) => {
+ measures.push(Measure {
+ name: col.name.clone(),
+ expr: col.name.clone(),
+ type_: measure_type,
+ agg: Some("sum".to_string()),
+ description: "{NEED DESCRIPTION HERE}".to_string(),
+ });
+ }
+ ColumnMappingType::Unsupported => {
+ tracing::warn!(
+ "Skipping unsupported column type: {} for column: {}",
+ col.type_,
+ col.name
+ );
+ }
+ }
+ }
+
+ let model = Model {
+ name: model_name.to_string(),
+ description: format!("Generated model for {}", model_name),
+ dimensions,
+ measures,
+ };
+
+ let config = ModelConfig {
+ models: vec![model],
+ };
+
+ let yaml = serde_yaml::to_string(&config)?;
+
+ // Enhance descriptions using OpenAI
+ let enhanced_yaml = enhance_yaml_with_descriptions(yaml).await?;
+
+ Ok(enhanced_yaml)
+}
+
+async fn generate_datasets_handler(
+ request: &GenerateDatasetRequest,
+ organization_id: &Uuid,
+) -> Result<GenerateDatasetResponse> {
+ let mut conn = get_pg_pool().get().await?;
+
+ // Get data source
+ let data_source = match data_sources::table
+ .filter(data_sources::name.eq(&request.data_source_name))
+ .filter(data_sources::organization_id.eq(organization_id))
+ .filter(data_sources::deleted_at.is_null())
+ .first::<DataSource>(&mut conn)
+ .await
+ {
+ Ok(ds) => ds,
+ Err(e) => return Err(anyhow!("Data source not found: {}", e)),
+ };
+
+ // Get credentials
+ let credentials = get_data_source_credentials(&data_source.secret_id, &data_source.type_, false).await?;
+
+ // Prepare tables for batch validation
+ let tables_to_validate: Vec<(String, String)> = request
+ .model_names
+ .iter()
+ .map(|name| (name.clone(), request.schema.clone()))
+ .collect();
+
+ // Get all columns in one batch
+ let ds_columns = match retrieve_dataset_columns_batch(&tables_to_validate, &credentials, request.database.clone()).await {
+ Ok(cols) => cols,
+ Err(e) => return Err(anyhow!("Failed to get columns from data source: {}", e)),
+ };
+
+ // Process models concurrently
+ let mut join_set = JoinSet::new();
+
+ for model_name in &request.model_names {
+ let model_name = model_name.clone();
+ let schema = request.schema.clone();
+ let ds_columns = ds_columns.clone();
+
+ join_set.spawn(async move {
+ let result = generate_model_yaml(&model_name, &ds_columns, &schema).await;
+ (model_name, result)
+ });
greptile
style: No limit on concurrent tasks could lead to resource exhaustion with large model_names lists
suggested fix
+ // Limit concurrent tasks to avoid resource exhaustion
+ if join_set.len() >= 10 {
+ join_set.join_next().await;
}
join_set.spawn(async move {
let result = generate_model_yaml(&model_name, &ds_columns, &schema).await;
(model_name, result)
});
Want to avoid this bug in your codebase? Try Greptile.
Avoid this bug!