26 examples

Resource leak

Failure to release resources after use.

[ FAQ1 ]

What is a resource leak?

A resource leak occurs when a program reserves system resources such as file handles, sockets, database connections, or memory, but neglects to properly release them once they're no longer necessary. Unlike memory leaks that specifically affect RAM, resource leaks involve broader system resources and can significantly degrade application and system performance over time. Common symptoms include increased resource usage, application slowdowns, failed resource allocations, or system instability due to exhaustion of available resources. Resource leaks frequently arise from programming oversights, missing cleanup logic, or poorly handled exceptions.
[ FAQ2 ]

How to fix a resource leak

To fix resource leaks, ensure resources like file handles, sockets, and database connections are always explicitly closed or released after use, ideally within structured cleanup blocks (finally, try-with-resources, or equivalent constructs). Use resource management tools and libraries that automatically handle resource lifecycles to minimize manual errors. Regularly employ profiling and monitoring tools to detect and pinpoint resource leaks in running applications, proactively addressing issues before resource exhaustion occurs. Implement thorough testing and code reviews to ensure consistent resource cleanup, significantly reducing the risk of leaks in your applications.
diff block
}
// Then delete all actors
- for (let i = 0; i < context.actorIds.length; i++) {
- const actorId = context.actorIds[i];
- console.log(`Destroying actor ${i + 1}:`, actorId);
- try {
- await client.actors.destroy(actorId, {
- project: RIVET_PROJECT,
- environment: RIVET_ENVIRONMENT,
- });
- } catch (err) {
- console.error(`Error destroying actor ${i + 1}:`, err);
- }
- }
+ // for (let i = 0; i < context.actorIds.length; i++) {
+ // const actorId = context.actorIds[i];
+ // console.log(`Destroying actor ${i + 1}:`, actorId);
+ // try {
+ // await client.actors.destroy(actorId, {
+ // project: RIVET_PROJECT,
+ // environment: RIVET_ENVIRONMENT,
+ // });
+ // } catch (err) {
+ // console.error(`Error destroying actor ${i + 1}:`, err);
+ // }
+ // }
Greptile
greptile
logic: Commenting out actor cleanup could lead to resource leakage in test environments. Consider implementing a separate cleanup mechanism or documenting why actors should persist after tests.
diff block
+package org.owasp.webgoat.lessons.pathtraversal;
+
+import static org.owasp.webgoat.container.assignments.AttackResultBuilder.failed;
+import static org.owasp.webgoat.container.assignments.AttackResultBuilder.success;
+import static org.springframework.http.MediaType.ALL_VALUE;
+import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.owasp.webgoat.container.CurrentUsername;
+import org.owasp.webgoat.container.assignments.AssignmentHints;
+import org.owasp.webgoat.container.assignments.AttackResult;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.ResponseEntity;
+import org.springframework.util.FileCopyUtils;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.multipart.MultipartFile;
+
+@RestController
+@AssignmentHints({
+ "path-traversal-zip-slip.hint1",
+ "path-traversal-zip-slip.hint2",
+ "path-traversal-zip-slip.hint3",
+ "path-traversal-zip-slip.hint4"
+})
+@Slf4j
+public class ProfileZipSlip extends ProfileUploadBase {
+
+ public ProfileZipSlip(@Value("${webgoat.server.directory}") String webGoatHomeDirectory) {
+ super(webGoatHomeDirectory);
+ }
+
+ @PostMapping(
+ value = "/PathTraversal/zip-slip",
+ consumes = ALL_VALUE,
+ produces = APPLICATION_JSON_VALUE)
+ @ResponseBody
+ public AttackResult uploadFileHandler(
+ @RequestParam("uploadedFileZipSlip") MultipartFile file, @CurrentUsername String username) {
+ if (!file.getOriginalFilename().toLowerCase().endsWith(".zip")) {
+ return failed(this).feedback("path-traversal-zip-slip.no-zip").build();
+ } else {
+ return processZipUpload(file, username);
+ }
+ }
+
+ @SneakyThrows
+ private AttackResult processZipUpload(MultipartFile file, String username) {
+ var tmpZipDirectory = Files.createTempDirectory(username);
+ cleanupAndCreateDirectoryForUser(username);
+ var currentImage = getProfilePictureAsBase64(username);
+
+ try {
+ var uploadedZipFile = tmpZipDirectory.resolve(file.getOriginalFilename());
+ FileCopyUtils.copy(file.getBytes(), uploadedZipFile.toFile());
+
+ ZipFile zip = new ZipFile(uploadedZipFile.toFile());
Greptile
greptile
style: ZipFile is not closed in a try-with-resources block, which could lead to resource leaks
suggested fix
+ try (ZipFile zip = new ZipFile(uploadedZipFile.toFile())) {
diff block
+package fetchers
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+)
+
+// ASNFetcher implements the IPRangeFetcher interface for specific ASNs.
+type ASNFetcher struct {
+ ASNs []string // List of ASNs in AS#### format
+}
+
+func (f ASNFetcher) Name() string {
+ return "asn"
+}
+
+func (f ASNFetcher) Description() string {
+ return "Fetches IP ranges for specific Autonomous System Numbers (ASNs)."
+}
+
+func (f ASNFetcher) FetchIPRanges() ([]string, error) {
+ if len(f.ASNs) == 0 {
+ return nil, fmt.Errorf("no ASNs provided to fetch")
+ }
+
+ ipRanges := make([]string, 0)
+
+ for _, asn := range f.ASNs {
+ url := fmt.Sprintf("https://api.hackertarget.com/aslookup/?q=%s", asn)
+ resp, err := http.Get(url)
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch IP ranges for ASN %s: %v", asn, err)
+ }
+
+ body, err := io.ReadAll(resp.Body)
+ resp.Body.Close()
+ if err != nil {
Greptile
greptile
style: Move resp.Body.Close() into a defer statement immediately after the response check to prevent resource leaks
suggested fix
body, err := io.ReadAll(resp.Body)
+ defer resp.Body.Close()
if err != nil {
diff block
+use std::{io::Stdout, thread::JoinHandle, time::Duration};
+
+use anyhow::Error;
+use crossterm::event::{self, Event, KeyCode, KeyEvent, KeyModifiers};
+use ratatui::{
+ layout::{Alignment, Constraint, Direction, Layout, Rect},
+ prelude::CrosstermBackend,
+ style::{Color, Style, Stylize},
+ widgets::{Block, BorderType, Paragraph, Row, Table, TableState},
+ Frame, Terminal,
+};
+
+use serde::{Deserialize, Serialize};
+use tui_textarea::TextArea;
+
+use crate::utils::{
+ auth::Token,
+ homedir::posthog_home_dir,
+ query::{self, HogQLQueryErrorResponse, HogQLQueryResponse, HogQLQueryResult},
+};
+
+pub struct QueryTui {
+ host: String,
+ creds: Token,
+ current_result: Option<HogQLQueryResult>,
+ lower_panel_state: Option<LowerPanelState>,
+ bg_query_handle: Option<JoinHandle<Result<HogQLQueryResult, Error>>>,
+ focus: Focus,
+ debug: bool,
+}
+
+enum LowerPanelState {
+ TableState(TableState),
+ DebugState(TextArea<'static>),
+}
+
+#[derive(Clone, Copy)]
+enum Focus {
+ Editor,
+ Output,
+}
+
+#[derive(Serialize, Deserialize)]
+struct PersistedEditorState {
+ lines: Vec<String>,
+ current_result: Option<HogQLQueryResult>,
+}
+
+impl QueryTui {
+ pub fn new(creds: Token, host: String, debug: bool) -> Self {
+ Self {
+ current_result: None,
+ lower_panel_state: None,
+ creds,
+ host,
+ focus: Focus::Editor,
+ debug,
+ bg_query_handle: None,
+ }
+ }
+
+ fn draw_outer(&mut self, frame: &mut Frame) -> Rect {
+ let area = frame.area();
+
+ let outer = Layout::default()
+ .direction(Direction::Vertical)
+ .constraints([Constraint::Fill(1)].as_ref())
+ .split(area);
+
+ let mut top_title = "Posthog Query Editor".to_string();
+ if self.bg_query_handle.is_some() {
+ top_title.push_str(" (Running query, Ctrl+C to cancel)");
+ }
+
+ let outer_block = Block::bordered()
+ .title_top(top_title)
+ .title_bottom("Ctrl+R to run query, ESC to quit, Ctrl+F to switch focus")
+ .border_type(BorderType::Rounded)
+ .title_alignment(Alignment::Center);
+
+ let inner_area = outer_block.inner(outer[0]);
+
+ frame.render_widget(outer_block, outer[0]);
+
+ inner_area
+ }
+
+ fn save_editor_state(&self, lines: Vec<String>) -> Result<(), Error> {
+ let home_dir = posthog_home_dir();
+ let editor_state_path = home_dir.join("editor_state.json");
+ let state = PersistedEditorState {
+ lines,
+ current_result: self.current_result.clone(),
+ };
+
+ let state_str = serde_json::to_string(&state)?;
+ std::fs::write(editor_state_path, state_str)?;
+ Ok(())
+ }
+
+ fn load_editor_state(&mut self) -> Result<Vec<String>, Error> {
+ let home_dir = posthog_home_dir();
+ let editor_state_path = home_dir.join("editor_state.json");
+ if !editor_state_path.exists() {
+ return Ok(vec![]);
+ }
+
+ let state_str = std::fs::read_to_string(editor_state_path)?;
+ let Ok(state): Result<PersistedEditorState, _> = serde_json::from_str(&state_str) else {
+ return Ok(vec![]);
+ };
+ self.current_result = state.current_result;
+ Ok(state.lines)
+ }
+
+ fn draw_lower_panel(&mut self, frame: &mut Frame, area: Rect) {
+ let is_focus = matches!(self.focus, Focus::Output);
+ match (&self.current_result, self.debug) {
+ (Some(Ok(res)), false) => {
+ let table = get_response_table(res, is_focus);
+ let mut ts =
+ if let Some(LowerPanelState::TableState(ts)) = self.lower_panel_state.take() {
+ ts
+ } else {
+ TableState::default()
+ };
+
+ frame.render_stateful_widget(table, area, &mut ts);
+ self.lower_panel_state = Some(LowerPanelState::TableState(ts));
+ }
+ (Some(Ok(res)), true) => {
+ let debug_display =
+ if let Some(LowerPanelState::DebugState(ta)) = self.lower_panel_state.take() {
+ ta
+ } else {
+ get_debug_display(res)
+ };
+
+ let debug_display = style_debug_display(debug_display, is_focus);
+ frame.render_widget(&debug_display, area);
+ self.lower_panel_state = Some(LowerPanelState::DebugState(debug_display));
+ }
+ (Some(Err(err)), _) => {
+ let paragraph = get_error_display(err, is_focus);
+ frame.render_widget(paragraph, area);
+ }
+ (None, _) => {}
+ }
+ }
+
+ fn draw(&mut self, frame: &mut Frame, text_area: &TextArea) {
+ let inner_area = self.draw_outer(frame);
+
+ let mut panel_count: usize = 1;
+ if self.current_result.is_some() {
+ panel_count += 1;
+ }
+
+ // TODO - figure out nicer dynamic constraints?
+ let mut constraints = vec![];
+ constraints.extend(vec![Constraint::Fill(1); panel_count]);
+
+ let inner_panels = Layout::default()
+ .direction(Direction::Vertical)
+ .constraints(constraints)
+ .split(inner_area);
+
+ frame.render_widget(text_area, inner_panels[0]);
+ if inner_panels.len() > 1 {
+ self.draw_lower_panel(frame, inner_panels[1]);
+ }
+ }
+
+ fn handle_bg_query(&mut self) -> Result<bool, Error> {
+ let Some(handle) = self.bg_query_handle.take() else {
+ return Ok(false);
+ };
+
+ if !handle.is_finished() {
+ self.bg_query_handle = Some(handle);
+ return Ok(false);
+ }
+
+ let res = handle.join().expect("Task did not panic")?;
+
+ self.current_result = Some(res);
+ Ok(true)
+ }
+
+ fn handle_events(&mut self, text_area: &mut TextArea) -> Result<Option<String>, Error> {
+ self.handle_bg_query()?;
+ self.save_editor_state(text_area.lines().to_vec())?;
+ if event::poll(Duration::from_millis(17))? == false {
+ return Ok(None);
+ }
+ if let Event::Key(key) = event::read()? {
+ // Your own key mapping to break the event loop
+ if key.code == KeyCode::Esc {
+ let last_query = text_area.lines().join("\n");
+ return Ok(Some(last_query));
+ }
+
+ if key.code == KeyCode::Char('r') && key.modifiers == KeyModifiers::CONTROL {
+ let lines = text_area.lines().to_vec();
+ self.spawn_bg_query(lines);
+ return Ok(None);
+ }
+
+ if key.code == KeyCode::Char('c') && key.modifiers == KeyModifiers::CONTROL {
+ // TODO - we don't have proper task cancellation here, but this "cancels" the query from the
+ // user's perspective - they will never see the results
+ self.bg_query_handle = None;
+ return Ok(None);
+ }
+
+ if key.code == KeyCode::Char('f') && key.modifiers == KeyModifiers::CONTROL {
+ self.focus = match self.focus {
+ Focus::Editor => Focus::Output,
+ Focus::Output => Focus::Editor,
+ };
+ return Ok(None);
+ }
+
+ if key.code == KeyCode::Char('q') && key.modifiers == KeyModifiers::CONTROL {
+ self.current_result = None;
+ self.lower_panel_state = None;
+ return Ok(None);
+ }
+
+ match self.focus {
+ Focus::Editor => {
+ text_area.input(key);
+ }
+ Focus::Output => {
+ self.handle_output_event(key);
+ }
+ }
+ }
+
+ Ok(None)
+ }
+
+ fn handle_output_event(&mut self, key: KeyEvent) {
+ match &mut self.lower_panel_state {
+ Some(LowerPanelState::TableState(ref mut ts)) => {
+ if key.code == KeyCode::Down {
+ ts.select(Some(ts.selected().unwrap_or(0) + 1));
+ } else if key.code == KeyCode::Up {
+ ts.select(Some(ts.selected().unwrap_or(0).saturating_sub(1)));
+ }
+ }
+ Some(LowerPanelState::DebugState(ta)) => {
+ ta.input(key);
+ }
+ _ => {}
+ }
+ }
+
+ fn enter_draw_loop(
+ &mut self,
+ mut terminal: Terminal<CrosstermBackend<Stdout>>,
+ ) -> Result<String, Error> {
+ let lines = self.load_editor_state()?;
+ let mut text_area = TextArea::new(lines);
+ loop {
+ terminal.draw(|frame| self.draw(frame, &text_area))?;
+ if let Some(query) = self.handle_events(&mut text_area)? {
+ return Ok(query);
+ }
+ }
+ }
+
+ fn spawn_bg_query(&mut self, lines: Vec<String>) {
+ let query = lines.join("\n");
+ let query_endpoint = format!("{}/api/environments/{}/query", self.host, self.creds.env_id);
+ let m_token = self.creds.token.clone();
+ let handle =
+ std::thread::spawn(move || query::run_query(&query_endpoint, &m_token, &query));
+
+ // We drop any previously running thread handle here, but don't kill the thread... this is fine,
+ // I think. The alternative is to switch to tokio and get true task cancellation, but :shrug:,
+ // TODO later I guess
+ self.bg_query_handle = Some(handle);
Greptile
greptile
style: Dropping thread handle without cancellation could lead to resource leaks
diff block
+package main
+
+import (
+ "fmt"
+ "log"
+ "net/http"
+ "time"
+)
+
+const userEndpoint = "https://jsonplaceholder.typicode.com/users"
+
+func main() {
+ client := &http.Client{
+ Timeout: time.Second * 10,
+ }
+ req, err := http.NewRequest("GET", userEndpoint, nil)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ res, err := client.Do(req)
+ if err != nil {
+ log.Fatal(err)
+ }
Greptile
greptile
Connection reuse is an HTTP/1.1 feature where the same TCP connection can be used for multiple HTTP requests to improve performance. When you don't properly read or close the response body, the underlying TCP connection can't be reused for future requests, which can lead to: 1. Resource leaks 2. Running out of available connections 3. Degraded performance since new connections need to be established In your code, you should either read the response body completely using `io.ReadAll` or `io.Copy`, or ensure you're properly closing it with `defer res.Body.Close()`. The best practice is to do both - read the body and close it. Here's how to properly handle it:
suggested fix
res, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
+ defer res.Body.Close()
diff block
+// Copyright 2025 OpenObserve Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+use std::time::Duration;
+
+use actix_http::ws::{CloseCode, CloseReason};
+use actix_ws::{MessageStream, Session};
+use config::{
+ get_config,
+ meta::websocket::{SearchEventReq, SearchResultType},
+};
+use futures::StreamExt;
+use infra::errors::{self, Error};
+#[cfg(feature = "enterprise")]
+use o2_enterprise::enterprise::common::{
+ auditor::{AuditMessage, Protocol, WsMeta},
+ infra::config::get_config as get_o2_config,
+};
+use rand::prelude::SliceRandom;
+use tokio::sync::mpsc;
+
+#[cfg(feature = "enterprise")]
+use crate::service::self_reporting::audit;
+#[cfg(feature = "enterprise")]
+use crate::service::websocket_events::handle_cancel;
+use crate::{
+ common::infra::config::WS_SEARCH_REGISTRY,
+ // router::http::ws_v2::types::StreamMessage,
+ service::websocket_events::{
+ WsClientEvents, WsServerEvents, handle_search_request,
+ search_registry_utils::{self, SearchState},
+ sessions_cache_utils,
+ },
+};
+
+// Do not clone the session, instead use a reference to the session
+pub struct WsSession {
+ inner: Option<Session>,
+ // Utc timestamp in microseconds
+ last_activity_ts: i64,
+ // Utc timestamp in microseconds
+ created_ts: i64,
+}
+
+impl WsSession {
+ pub fn new(inner: Session) -> Self {
+ let now = chrono::Utc::now().timestamp_micros();
+ Self {
+ inner: Some(inner),
+ last_activity_ts: now,
+ created_ts: now,
+ }
+ }
+
+ pub fn update_activity(&mut self) {
+ self.last_activity_ts = chrono::Utc::now().timestamp_micros();
+ }
+
+ pub fn is_expired(&self) -> bool {
+ let cfg = get_config();
+ let now = chrono::Utc::now().timestamp_micros();
+ let idle_timeout_micros = cfg.websocket.session_idle_timeout_secs * 1_000_000;
+ let max_lifetime_micros = cfg.websocket.session_max_lifetime_secs * 1_000_000;
+
+ // 1. if the session has been idle for too long
+ // 2. if the session has exceeded the max lifetime
+ (now - self.last_activity_ts) > idle_timeout_micros
+ || (now - self.created_ts) > max_lifetime_micros
+ }
+
+ /// Send a text message to the client
+ pub async fn text(&mut self, msg: String) -> Result<(), actix_ws::Closed> {
+ self.update_activity();
+ if let Some(ref mut session) = self.inner {
+ session.text(msg).await
+ } else {
+ Err(actix_ws::Closed)
+ }
+ }
+
+ /// Close the session with a reason
+ pub async fn close(&mut self, reason: Option<CloseReason>) -> Result<(), actix_ws::Closed> {
+ self.update_activity();
+ if let Some(session) = self.inner.take() {
+ session.close(reason).await
+ } else {
+ Err(actix_ws::Closed)
+ }
+ }
+
+ /// Send a pong response
+ pub async fn pong(&mut self, payload: &[u8]) -> Result<(), actix_ws::Closed> {
+ self.update_activity();
+ if let Some(ref mut session) = self.inner {
+ session.pong(payload).await
+ } else {
+ Err(actix_ws::Closed)
+ }
+ }
+
+ /// Send a ping request
+ pub async fn ping(&mut self, payload: &[u8]) -> Result<(), actix_ws::Closed> {
+ self.update_activity();
+ if let Some(ref mut session) = self.inner {
+ session.ping(payload).await
+ } else {
+ Err(actix_ws::Closed)
+ }
+ }
+}
+
+pub async fn run(
+ mut msg_stream: MessageStream,
+ user_id: String,
+ req_id: String,
+ org_id: String,
+ path: String,
+) {
+ let cfg = get_config();
+ let mut ping_interval =
+ tokio::time::interval(Duration::from_secs(cfg.websocket.ping_interval_secs as u64));
+ let mut close_reason: Option<CloseReason> = None;
+
+ loop {
+ tokio::select! {
+ Some(msg) = msg_stream.next() => {
+ // Update activity on any message
+ if let Some(mut session) = sessions_cache_utils::get_mut_session(&req_id) {
+ session.update_activity();
+ }
+
+ match msg {
+ Ok(actix_ws::Message::Ping(bytes)) => {
+ if let Some(mut session) = sessions_cache_utils::get_mut_session(&req_id) {
+ if let Err(e) = session.pong(&bytes).await {
+ log::error!("[WS_HANDLER]: Failed to send pong: {}", e);
+ break;
+ }
+ } else {
+ log::error!("[WS_HANDLER]: Session not found for ping response");
+ break;
+ }
+ }
+ Ok(actix_ws::Message::Pong(_)) => {
+ log::debug!("[WS_HANDLER] Received pong from {}", req_id);
+ }
+ Ok(actix_ws::Message::Text(msg)) => {
+ log::info!("[WS_HANDLER]: Request Id: {} Node Role: {} Received message: {}",
+ req_id,
+ get_config().common.node_role,
+ msg
+ );
+ handle_text_message(&org_id, &user_id, &req_id, msg.to_string(), path.clone()).await;
+ }
+ Ok(actix_ws::Message::Close(reason)) => {
+ if let Some(reason) = reason.as_ref() {
+ match reason.code {
+ CloseCode::Normal | CloseCode::Error => {
+ log::info!("[WS_HANDLER]: Request Id: {} Node Role: {} Closing connection with reason: {:?}",
+ req_id,
+ get_config().common.node_role,
+ reason
+ );
+ },
+ _ => {
+ log::error!("[WS_HANDLER]: Request Id: {} Node Role: {} Abnormal closure with reason: {:?}",req_id,get_config().common.node_role,reason);
+ },
+ }
+ }
+ close_reason = reason;
+ break;
+ }
+ _ => ()
+ }
+ }
+ // Heartbeat to keep the connection alive
+ _ = ping_interval.tick() => {
+ if let Some(mut session) = sessions_cache_utils::get_mut_session(&req_id) {
+ if let Err(e) = session.ping(&[]).await {
+ log::error!("[WS_HANDLER] Failed to send ping: {}", e);
+ break;
+ }
+ }
+ }
+ }
+ }
+ cleanup_and_close_session(&req_id, close_reason).await;
+}
+
+/// Handle the incoming text message
+/// Text message is parsed into `WsClientEvents` and processed accordingly
+/// Depending on each event type, audit must be done
+/// Currently audit is done only for the search event
+pub async fn handle_text_message(
+ org_id: &str,
+ user_id: &str,
+ req_id: &str,
+ msg: String,
+ path: String,
+) {
+ match serde_json::from_str::<WsClientEvents>(&msg) {
+ Ok(client_msg) => {
+ // Validate the events
+ if !client_msg.is_valid() {
+ log::error!("[WS_HANDLER]: Invalid event: {:?}", client_msg);
+ let err_res = WsServerEvents::error_response(
+ errors::Error::Message("Invalid event".to_string()),
+ Some(req_id.to_string()),
+ None,
+ );
+ let _ = send_message(req_id, err_res.to_json()).await;
+ return;
+ }
+
+ match client_msg {
+ WsClientEvents::Search(ref search_req) => {
+ handle_search_event(search_req, org_id, user_id, req_id, path.clone()).await;
+ }
+ #[cfg(feature = "enterprise")]
+ WsClientEvents::Cancel { trace_id, org_id } => {
+ if org_id.is_empty() {
+ log::error!(
+ "[WS_HANDLER]: Request Id: {} Node Role: {} Org id not found",
+ req_id,
+ get_config().common.node_role
+ );
+ return;
+ };
+
+ // First handle the cancel event
+ // send a cancel flag to the search task
+ if let Err(e) = handle_cancel_event(&trace_id).await {
+ log::warn!("[WS_HANDLER]: Error in cancelling : {}", e);
+ return;
+ }
+
+ log::info!(
+ "[WS_HANDLER]: trace_id: {}, Cancellation flag set to: {:?}",
+ trace_id,
+ search_registry_utils::is_cancelled(&trace_id)
+ );
+
+ let res = handle_cancel(&trace_id, &org_id).await;
+ let _ = send_message(req_id, res.to_json()).await;
+
+ #[cfg(feature = "enterprise")]
+ let client_msg = WsClientEvents::Cancel {
+ trace_id,
+ org_id: org_id.to_string(),
+ };
+
+ // Add audit before closing
+ #[cfg(feature = "enterprise")]
+ let is_audit_enabled = get_o2_config().common.audit_enabled;
+
+ #[cfg(feature = "enterprise")]
+ if is_audit_enabled {
+ audit(AuditMessage {
+ user_email: user_id.to_string(),
+ org_id: org_id.to_string(),
+ _timestamp: chrono::Utc::now().timestamp(),
+ protocol: Protocol::Ws(WsMeta {
+ path: path.clone(),
+ message_type: client_msg.get_type(),
+ content: client_msg.to_json(),
+ close_reason: "".to_string(),
+ }),
+ })
+ .await;
+ }
+ }
+ WsClientEvents::Benchmark { id } => {
+ // simulate random delay for benchmarking by sleep for 10/20/30/60/90
+ // seconds
+ let delay: Vec<u64> = vec![10, 20, 30, 60, 90];
+ let delay = delay.choose(&mut rand::thread_rng()).unwrap();
+ log::info!(
+ "[WS_HANDLER]: Sleeping for benchmark, id: {}, delay: {}",
+ id,
+ delay
+ );
+ tokio::time::sleep(tokio::time::Duration::from_secs(*delay)).await;
+
+ let response = serde_json::json!({
+ "id": id,
+ "took": delay,
+ });
+ let _ = send_message(req_id, response.to_string()).await;
+ let close_reason = Some(CloseReason {
+ code: CloseCode::Normal,
+ description: None,
+ });
+ cleanup_and_close_session(req_id, close_reason).await;
+ }
+ }
+ }
+ Err(e) => {
+ log::error!(
+ "[WS_HANDLER]: Request id: {} Failed to parse message: {:?}, error: {:?}",
+ req_id,
+ msg,
+ e
+ );
+ let err_res = WsServerEvents::error_response(e.into(), Some(req_id.to_string()), None);
+ let _ = send_message(req_id, err_res.to_json()).await;
+ let close_reason = Some(CloseReason {
+ code: CloseCode::Error,
+ description: None,
+ });
+ let mut session = if let Some(session) = sessions_cache_utils::get_mut_session(req_id) {
+ session
+ } else {
+ log::error!("[WS_HANDLER]: req_id: {} session not found", req_id);
+ return;
+ };
+ let _ = session.close(close_reason).await;
+ }
+ }
+}
+
+pub async fn send_message(req_id: &str, msg: String) -> Result<(), Error> {
+ let mut session = if let Some(session) = sessions_cache_utils::get_mut_session(req_id) {
+ session
+ } else {
+ return Err(Error::Message(format!(
+ "[req_id {}] session not found",
+ req_id
+ )));
+ };
+
+ log::debug!("[WS_HANDLER]: req_id: {} sending message: {}", req_id, msg);
+
+ session.text(msg).await.map_err(|e| {
+ log::error!("[WS_HANDLER]: Failed to send message: {:?}", e);
+ Error::Message(e.to_string())
+ })
+}
+
+async fn cleanup_and_close_session(req_id: &str, close_reason: Option<CloseReason>) {
+ if let Some(mut session) = sessions_cache_utils::get_mut_session(req_id) {
+ if let Some(reason) = close_reason.as_ref() {
+ log::info!(
+ "[WS_HANDLER]: req_id: {} Closing session with reason: {:?}",
+ req_id,
+ reason
+ );
+ }
+
+ // Attempt to close the session
+ if let Err(e) = session.close(close_reason).await {
+ log::error!(
+ "[WS_HANDLER]: req_id: {} Failed to close session gracefully: {:?}",
+ req_id,
+ e
+ );
+ }
+ }
+
+ // Remove the session from the cache
+ sessions_cache_utils::remove_session(req_id);
+ log::info!(
+ "[WS_HANDLER]: req_id: {} Session removed from cache. Remaining sessions: {}",
+ req_id,
+ sessions_cache_utils::len_sessions()
+ );
+}
+
+// Main search handler
+async fn handle_search_event(
+ search_req: &SearchEventReq,
+ org_id: &str,
+ user_id: &str,
+ req_id: &str,
+ #[allow(unused_variables)] path: String,
+) {
+ let (cancel_tx, mut cancel_rx) = mpsc::channel(1);
+ let mut accumulated_results: Vec<SearchResultType> = Vec::new();
+
+ let org_id = org_id.to_string();
+ let user_id = user_id.to_string();
+ let req_id = req_id.to_string();
+ let trace_id = search_req.trace_id.clone();
+ let trace_id_for_task = trace_id.clone();
+ let search_req = search_req.clone();
+
+ #[cfg(feature = "enterprise")]
+ let is_audit_enabled = get_o2_config().common.audit_enabled;
+
+ #[cfg(feature = "enterprise")]
+ let client_msg = WsClientEvents::Search(Box::new(search_req.clone()));
+
+ // Register running search BEFORE spawning the search task
+ WS_SEARCH_REGISTRY.insert(
+ trace_id.clone(),
+ SearchState::Running {
+ cancel_tx,
+ req_id: req_id.to_string(),
+ },
+ );
+
+ // Spawn the search task
+ tokio::spawn(async move {
+ // Handle the search request
+ // If search is cancelled, the task will exit
+ // Otherwise, the task will complete and the results will be sent to the client
+ // The task will also update the search state to completed
+ // The task will also close the session
+ // The task will also cleanup the search resources
+ tokio::select! {
+ search_result = handle_search_request(
+ &req_id,
+ &mut accumulated_results,
+ &org_id,
+ &user_id,
+ search_req.clone(),
+ ) => {
+ match search_result {
+ Ok(_) => {
+ if let Some(mut state) = WS_SEARCH_REGISTRY.get_mut(&trace_id_for_task) {
+ *state = SearchState::Completed {
+ req_id: req_id.to_string(),
+ };
+ }
+
+ // Add audit before closing
+ #[cfg(feature = "enterprise")]
+ if is_audit_enabled {
+ audit(AuditMessage {
+ user_email: user_id,
+ org_id,
+ _timestamp: chrono::Utc::now().timestamp(),
+ protocol: Protocol::Ws(WsMeta {
+ path: path.clone(),
+ message_type: client_msg.get_type(),
+ content: client_msg.to_json(),
+ close_reason: "".to_string(),
+ }),
+ })
+ .await;
+ }
+
+ cleanup_search_resources(&trace_id_for_task).await;
+ }
+ Err(e) => {
+ let _ = handle_search_error(e, &req_id, &trace_id_for_task).await;
+ // Add audit before closing
+ #[cfg(feature = "enterprise")]
+ if is_audit_enabled {
+ audit(AuditMessage {
+ user_email: user_id,
+ org_id,
+ _timestamp: chrono::Utc::now().timestamp(),
+ protocol: Protocol::Ws(WsMeta {
+ path: path.clone(),
+ message_type: client_msg.get_type(),
+ content: client_msg.to_json(),
+ close_reason: "".to_string(),
+ }),
+ })
+ .await;
+ }
+
+ // Even if the search is cancelled, we need to cleanup the resources
+ cleanup_search_resources(&trace_id_for_task).await;
+ }
+ }
+ }
+ _ = cancel_rx.recv() => {
+ // if search is cancelled, update the state
+ // the cancel handler will close the session
+
+ // Just cleanup resources when cancelled
+ cleanup_search_resources(&trace_id_for_task).await;
+ }
+ }
+ });
+}
+
+// Cancel handler
+#[cfg(feature = "enterprise")]
+async fn handle_cancel_event(trace_id: &str) -> Result<(), anyhow::Error> {
+ if let Some(mut entry) = WS_SEARCH_REGISTRY.get_mut(trace_id) {
+ let state = entry.value_mut();
+ let cancel_tx = match state {
+ SearchState::Running { cancel_tx, .. } => cancel_tx.clone(),
+ state => {
+ let err_msg = format!("Cannot cancel search in state: {:?}", state);
+ log::warn!("[WS_HANDLER]: {}", err_msg);
+ return Err(anyhow::anyhow!(err_msg));
+ }
+ };
+
+ *entry.value_mut() = SearchState::Cancelled {
+ req_id: trace_id.to_string(),
+ };
+
+ if let Err(e) = cancel_tx.send(()).await {
+ log::error!("[WS_HANDLER]: Failed to send cancel signal: {}", e);
+ }
+
+ log::info!("[WS_HANDLER]: Search cancelled for trace_id: {}", trace_id);
+ }
+ Ok(())
+}
+
+async fn handle_search_error(e: Error, req_id: &str, trace_id: &str) -> Option<CloseReason> {
+ // if the error is due to search cancellation, return.
+ // the cancel handler will close the session
+ if let errors::Error::ErrorCode(errors::ErrorCodes::SearchCancelQuery(_)) = e {
+ log::info!(
+ "[WS_HANDLER]: trace_id: {}, Return from search handler, search canceled",
+ trace_id
+ );
+ // Update state to cancelled before returning
+ if let Some(mut state) = WS_SEARCH_REGISTRY.get_mut(trace_id) {
+ *state = SearchState::Cancelled {
+ req_id: trace_id.to_string(),
+ };
+ }
+ return None;
+ }
+
+ log::error!("[WS_HANDLER]: trace_id: {} Search error: {}", trace_id, e);
+ // Send error response
+ let err_res =
+ WsServerEvents::error_response(e, Some(trace_id.to_string()), Some(req_id.to_string()));
+ let _ = send_message(req_id, err_res.to_json()).await;
+
+ // Close with error
+ let close_reason = CloseReason {
+ code: CloseCode::Error,
+ description: None,
+ };
+
+ // Update registry state
+ if let Some(mut state) = WS_SEARCH_REGISTRY.get_mut(trace_id) {
+ *state = SearchState::Completed {
+ req_id: trace_id.to_string(),
+ };
+ }
+
+ Some(close_reason)
+}
+
+// Add cleanup function
+async fn cleanup_search_resources(trace_id: &str) {
+ WS_SEARCH_REGISTRY.remove(trace_id);
+ log::debug!("[WS_HANDLER]: trace_id: {}, Resources cleaned up", trace_id);
+}
Greptile
greptile
style: cleanup_search_resources should verify removal was successful to ensure no resource leaks
diff block
+package main
+
+import (
+ "bufio"
+ "fmt"
+ "log"
+ "net"
+)
+
+func main() {
+ // Connects over TCP to golang.org:80
+ conn, err := net.Dial("tcp", "golang.org:80")
Greptile
greptile
logic: Connection is never closed. Add `defer conn.Close()` after error check to prevent resource leaks
suggested fix
conn, err := net.Dial("tcp", "golang.org:80")
+ if err == nil {
+ defer conn.Close()
+ }
diff block
+package tickettango;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Scanner;
+
+/**
+ * Manages user authentication and user data
+ */
+public class UserManager {
+ private Map<String, User> users;
+ private User currentUser;
+
+ /**
+ * Constructor loads users from file or uses defaults
+ */
+ public UserManager() {
+ users = new HashMap<>();
+ loadUsers();
+ }
+
+ /**
+ * Load users from the UserDatabase.txt file or create defaults
+ */
+ private void loadUsers() {
+ // Path to the user database file
+ String userDatabasePath = "/Users/innovation/Documents/CPS406TicketTango/Jar Application/UserDatabase.txt";
+ File file = new File(userDatabasePath);
+
+ try {
+ // Attempt to read from file
+ Scanner scanner = new Scanner(file);
+ while (scanner.hasNextLine()) {
Greptile
greptile
style: Scanner not wrapped in try-with-resources. Could lead to resource leak if exception occurs
suggested fix
// Attempt to read from file
+ try (Scanner scanner = new Scanner(file)) {
while (scanner.hasNextLine()) {
diff block
+import { ActionPanel, Action, Icon, showToast, Toast, List, open as raycastOpen, showInFinder } from "@raycast/api";
+import { useEffect, useState } from "react";
+import { BlobReader, BlobWriter, ZipReader } from "@zip.js/zip.js";
+import fs from "fs";
+import { Blob } from "buffer";
+import path from "path";
+import { writeFile } from "fs/promises";
+import os from "os";
+import { ZipEntry, ZipFile } from "./common/types";
+import { formatFileSize, getBreadcrumb, getFileIcon, getParentDirectory } from "./common/utils";
+
+export default function ZipDetailsView(props: { filePath: string; password?: string | undefined }) {
+ const { filePath, password } = props;
+
+ const [isLoading, setIsLoading] = useState<boolean>(true);
+ const [zipFile, setZipFile] = useState<ZipFile | null>(null);
+
+ useEffect(() => {
+ loadZipContents(filePath);
+ }, [filePath]);
+
+ const entries = getCurrentEntries();
+
+ async function loadZipContents(filePath: string) {
+ try {
+ setIsLoading(true);
+ const fileName = path.basename(filePath);
+ const blob = new Blob([fs.readFileSync(filePath)]);
+ const reader = new ZipReader(new BlobReader(blob as globalThis.Blob), { password });
+ const zipEntries = await reader.getEntries();
+
+ // Process entries to create a hierarchical structure
+ const entries: ZipEntry[] = zipEntries.map((entry) => ({
+ name: path.basename(entry.filename),
+ isDirectory: entry.directory,
+ size: entry.uncompressedSize,
+ path: entry.filename,
+ lastModDate: new Date(entry.lastModDate),
+ }));
+
+ setZipFile({
+ fileName,
+ fullPath: filePath,
+ entries,
+ currentDir: "",
+ });
+
+ setIsLoading(false);
+ await reader.close();
+ } catch (error) {
+ setIsLoading(false);
+ throw new Error(`Failed to read ZIP file: ${error instanceof Error ? error.message : String(error)}`);
+ }
+ }
+
+ // Navigate to a directory inside the ZIP
+ function navigateToDirectory(dirPath: string) {
+ if (zipFile) {
+ setZipFile({
+ ...zipFile,
+ currentDir: dirPath,
+ });
+ }
+ }
+
+ // Get entries for the current directory
+ function getCurrentEntries(): ZipEntry[] {
+ if (!zipFile) return [];
+
+ const currentDir = zipFile.currentDir;
+ // Get direct children of the current directory
+ const entries = zipFile.entries.filter((entry) => {
+ const relativePath = entry.path.replace(currentDir, "");
+ // Exclude entry if it's the current directory
+ if (relativePath === "") return false;
+
+ // Check if it's a direct child (no additional slashes) or a directory with one trailing slash
+ const parts = relativePath.split("/").filter((p) => p !== "");
+ return parts.length === 1 || (parts.length === 0 && entry.isDirectory);
+ });
+
+ // Create directory entries for subdirectories
+ const directorySet = new Set<string>();
+
+ zipFile.entries.forEach((entry) => {
+ if (entry.path.startsWith(currentDir) && entry.path !== currentDir) {
+ const remainingPath = entry.path.slice(currentDir.length);
+ const parts = remainingPath.split("/").filter((p) => p !== "");
+
+ if (parts.length > 1) {
+ // This is a nested file/directory, create a directory entry for its parent
+ directorySet.add(parts[0]);
+ }
+ }
+ });
+
+ // Add directory entries that aren't already in the entries list
+ directorySet.forEach((dirName) => {
+ const dirPath = `${currentDir}${dirName}/`;
+ if (!entries.find((e) => e.path === dirPath)) {
+ entries.push({
+ name: dirName,
+ isDirectory: true,
+ size: 0, // Directories don't have a size themselves
+ path: dirPath,
+ lastModDate: new Date(),
+ });
+ }
+ });
+
+ return entries;
+ }
+
+ async function previewFile(entry: ZipEntry) {
+ try {
+ if (!zipFile) {
+ await showToast({ style: Toast.Style.Failure, title: "Error", message: "No ZIP file loaded" });
+ return;
+ }
+ await showToast({ style: Toast.Style.Animated, title: "Extracting to Temp..." });
+ const tempDir = path.join(os.tmpdir(), "archiver-rc");
+ if (!fs.existsSync(tempDir)) {
+ fs.mkdirSync(tempDir, { recursive: true });
+ }
+ const outputPath = path.join(tempDir, entry.name);
+ await extractZipFile(zipFile, entry, outputPath);
+ await showToast({ style: Toast.Style.Success, title: "File Extracted", message: outputPath });
+ await raycastOpen(outputPath);
+ } catch (error) {
+ console.error("Extraction error:", error);
+ await showToast({
+ style: Toast.Style.Failure,
+ title: "Extraction Failed",
+ message: error instanceof Error ? error.message : String(error),
+ });
+ }
+ }
+
+ async function extractFile(entry: ZipEntry) {
+ try {
+ if (!zipFile) {
+ await showToast({ style: Toast.Style.Failure, title: "Error", message: "No ZIP file loaded" });
+ return;
+ }
+ await showToast({ style: Toast.Style.Animated, title: "Extracting File..." });
+ const tempDir = path.join(os.tmpdir(), "archiver-rc");
+ if (!fs.existsSync(tempDir)) {
+ fs.mkdirSync(tempDir, { recursive: true });
+ }
+ const downloadsDir = path.join(os.homedir(), "Downloads");
+ const baseName = entry.name;
+ const outputPath = getOutputFilePath(downloadsDir, baseName);
+ await extractZipFile(zipFile, entry, outputPath);
+ await showToast({ style: Toast.Style.Success, title: "File Extracted", message: outputPath });
+ await showInFinder(outputPath);
+ } catch (error) {
+ console.error("Extraction error:", error);
+ await showToast({
+ style: Toast.Style.Failure,
+ title: "Extraction Failed",
+ message: error instanceof Error ? error.message : String(error),
+ });
+ }
+ }
+
+ // Generate a unique output file path
+ // by appending a number if the file already exists
+ // e.g. "file (1).txt", "file (2).txt", etc.
+ function getOutputFilePath(dir: string, baseName: string): string {
+ let outputPath = path.join(dir, baseName);
+ let counter = 1;
+ const extName = path.extname(baseName);
+ const nameWithoutExt = path.basename(baseName, extName);
+
+ // Check if file already exists and generate new filename if needed
+ while (fs.existsSync(outputPath)) {
+ const newFileName = `${nameWithoutExt} (${counter})${extName}`;
+ outputPath = path.join(dir, newFileName);
+ counter++;
+ }
+ return outputPath;
+ }
+
+ async function extractZipFile(zipFile: ZipFile, entry: ZipEntry, outputPath: string) {
+ // Re-open the zip file to read this specific entry
+ const blob = new Blob([fs.readFileSync(zipFile.fullPath)]);
+ const reader = new ZipReader(new BlobReader(blob as globalThis.Blob), { password });
+
+ const entries = await reader.getEntries();
+
+ // Find matching entry
+ const zipEntry = entries.find((e) => e.filename === entry.path);
+
+ if (!zipEntry || !zipEntry.getData) {
+ showToast({ style: Toast.Style.Failure, title: "Error", message: "File not found in ZIP archive" });
+ return;
+ }
+
+ const fileBlob: Blob | undefined = await zipEntry.getData?.(new BlobWriter());
+ if (!fileBlob) {
+ showToast({ style: Toast.Style.Failure, title: "Error", message: "Failed to get file blob" });
+ return;
+ }
+
+ const arrayBuffer = await fileBlob.arrayBuffer();
+ const buffer = Buffer.from(arrayBuffer);
+ await writeFile(outputPath, buffer);
+
+ await showToast({
+ style: Toast.Style.Success,
+ title: "File Extracted",
+ message: outputPath,
+ primaryAction: {
+ title: "Open File",
+ onAction: () => {
+ raycastOpen(outputPath);
+ },
+ },
+ });
+
+ await reader.close();
+ }
Greptile
greptile
style: ZipReader is not closed in error cases, which could lead to resource leaks. Consider using try-finally.
suggested fix
async function extractZipFile(zipFile: ZipFile, entry: ZipEntry, outputPath: string) {
// Re-open the zip file to read this specific entry
const blob = new Blob([fs.readFileSync(zipFile.fullPath)]);
const reader = new ZipReader(new BlobReader(blob as globalThis.Blob), { password });
try {
const entries = await reader.getEntries();
// Find matching entry
const zipEntry = entries.find((e) => e.filename === entry.path);
if (!zipEntry || !zipEntry.getData) {
showToast({ style: Toast.Style.Failure, title: "Error", message: "File not found in ZIP archive" });
return;
}
const fileBlob: Blob | undefined = await zipEntry.getData?.(new BlobWriter());
if (!fileBlob) {
showToast({ style: Toast.Style.Failure, title: "Error", message: "Failed to get file blob" });
return;
}
const arrayBuffer = await fileBlob.arrayBuffer();
const buffer = Buffer.from(arrayBuffer);
await writeFile(outputPath, buffer);
await showToast({
style: Toast.Style.Success,
title: "File Extracted",
message: outputPath,
primaryAction: {
title: "Open File",
onAction: () => {
raycastOpen(outputPath);
},
},
});
+ } finally {
await reader.close();
}
}
diff block
+"""Datasets API Client."""
+
+import json
+import logging
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+
+import httpx
+
+from autoblocks._impl.config.constants import API_ENDPOINT_V2
+from autoblocks._impl.datasets.exceptions import APIError
+from autoblocks._impl.datasets.exceptions import ValidationError
+from autoblocks._impl.datasets.exceptions import parse_error_response
+from autoblocks._impl.datasets.models.dataset import Dataset
+from autoblocks._impl.datasets.models.dataset import DatasetItem
+from autoblocks._impl.datasets.models.dataset import DatasetItemsSuccessResponse
+from autoblocks._impl.datasets.models.dataset import DatasetListItem
+from autoblocks._impl.datasets.models.dataset import DatasetSchema
+from autoblocks._impl.datasets.models.dataset import SuccessResponse
+from autoblocks._impl.datasets.models.schema import create_schema_property
+from autoblocks._impl.datasets.utils.helpers import build_path
+from autoblocks._impl.datasets.utils.serialization import deserialize_model
+from autoblocks._impl.datasets.utils.serialization import deserialize_model_list
+from autoblocks._impl.datasets.utils.serialization import serialize_model
+from autoblocks._impl.util import cuid_generator
+
+log = logging.getLogger(__name__)
+
+
+class DatasetsClient:
+ """Datasets API Client"""
+
+ def __init__(self, config: Dict[str, Any]) -> None:
+ """
+ Initialize the client with configuration
+
+ Args:
+ config: Dict with:
+ - api_key: Autoblocks API key
+ - app_slug: Application slug
+ - timeout_ms: Optional timeout in milliseconds (default: 60000)
+ """
+ if not config.get("api_key"):
+ raise ValidationError("API key is required")
+
+ if not config.get("app_slug"):
+ raise ValidationError("App slug is required")
+
+ self.api_key = config["api_key"]
+ self.app_slug = config["app_slug"]
+ self.timeout_sec = config.get("timeout_ms", 60000) / 1000 # Convert to seconds
+ self.base_url = API_ENDPOINT_V2
+
+ self._headers = {
+ "Content-Type": "application/json",
+ "Authorization": f"Bearer {self.api_key}",
+ }
+
+ # Create a shared client for connection pooling
+ self._client = httpx.Client(headers=self._headers, timeout=self.timeout_sec)
Greptile
greptile
logic: httpx.Client should be used in a context manager or explicitly closed to prevent resource leaks
diff block
"stream": stream,
"genius": genius
}
-
if session_id:
payload["sessionId"] = session_id
-
- response = await self.client.post(url, json=payload, headers=self.headers)
- response.raise_for_status()
- return response.json()
-
- async def search_repositories(self, messages, repositories, session_id=None, genius=True):
+
+ client_timeout = timeout if timeout is not None else self.default_timeout
+ async with httpx.AsyncClient(timeout=client_timeout) as client:
+ response = await client.post(url, json=payload, headers=self.headers)
+ response.raise_for_status()
+ return response.json()
+
+ async def stream_query_repositories(
+ self,
+ messages: List[Dict[str, Any]],
+ repositories: List[Dict[str, Any]],
+ session_id: Optional[str] = None,
+ genius: bool = True,
+ timeout: Optional[float] = None
+ ) -> AsyncGenerator[Dict[str, Any], None]:
+ """
+ Streams the Greptile /query endpoint using chunked responses.
+
+ Args:
+ messages: List of message objects with role and content
+ repositories: List of repository objects
+ session_id: Optional session ID for continuing a conversation
+ genius: Whether to use enhanced query capabilities
+ timeout: Optional request timeout in seconds
+
+ Yields:
+ Parsed chunked message objects as dictionaries.
+ """
+ url = f"{self.base_url}/query"
+ payload = {
+ "messages": messages,
+ "repositories": repositories,
+ "stream": True,
+ "genius": genius
+ }
+ if session_id:
+ payload["sessionId"] = session_id
+
+ req_timeout = timeout if timeout is not None else self.default_timeout
+ async with httpx.AsyncClient(timeout=req_timeout) as client:
+ async with client.stream("POST", url, json=payload, headers=self.headers) as response:
Greptile
greptile
logic: Same issue with client creation in streaming method. Should reuse self.client to avoid resource leaks.
diff block
+package guardiansigner
+
+import (
+ "context"
+ "crypto/ecdsa"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+
+ "github.com/ethereum/go-ethereum/crypto"
+ ethcrypto "github.com/ethereum/go-ethereum/crypto"
+ "google.golang.org/protobuf/proto"
+
+ nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
+ "golang.org/x/crypto/openpgp/armor" //nolint:staticcheck // Package is deprecated but we need it in the codebase still.
+)
+
+// FileSigner is a signer that loads a guardian key from a file. The URI is expected to be
+// in the format file://<path-to-file>.
+type FileSigner struct {
+ keyPath string
+ privateKey *ecdsa.PrivateKey
+}
+
+const (
+ GuardianKeyArmoredBlock = "WORMHOLE GUARDIAN PRIVATE KEY"
+)
+
+// The FileSigner is a signer that reads a guardian key from a file (signerKeyPath). The key is
+// expected to be armored with an OpenPGP armor block, and the key itself is expected to be a
+// protobuf-encoded GuardianKey message.
+func NewFileSigner(_ context.Context, unsafeDevMode bool, signerKeyPath string) (*FileSigner, error) {
+ fileSigner := &FileSigner{
+ keyPath: signerKeyPath,
+ }
+
+ f, err := os.Open(signerKeyPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to open file: %w", err)
+ }
Greptile
greptile
logic: File handle is never closed after opening, which could lead to resource leaks
suggested fix
f, err := os.Open(signerKeyPath)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
+ defer f.Close()
diff block
client_id
);
- // Spawn a task to handle health check
- let response_tx_clone = response_tx.clone();
- let disconnect_tx_clone1 = disconnect_tx.clone();
- tokio::spawn(async move {
- let mut ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(
- get_ping_interval_secs_with_jitter() as _,
- ));
- ping_interval.tick().await;
-
- loop {
- ping_interval.tick().await;
- if let Err(e) = response_tx_clone
- .send(WsServerEvents::Ping(SERVER_HEALTH_CHECK_PING_MSG.to_vec()))
- .await
- {
- log::error!(
- "[WS::Router::Handler] error sending ping to outgoing thread via response channel for client_id: {}, error: {}",
- client_id_clone1,
- e
- );
- if let Err(e) = disconnect_tx_clone1.send(None).await {
- log::error!(
- "[WS::Router::Handler] Error informing handle_outgoing to stop: {e}"
- );
- }
- break;
- }
- }
- });
-
// Spawn task to handle idle timeout
- let session_manager_clone = session_manager.clone();
- let cfg_clone = cfg.clone();
- let disconnect_tx_clone2 = disconnect_tx.clone();
- tokio::spawn(async move {
- let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
- cfg_clone.websocket.session_idle_timeout_secs as _,
- ));
- interval.tick().await;
- loop {
- interval.tick().await;
- if session_manager_clone
- .reached_max_idle_time(&client_id_clone)
- .await
- {
- log::info!(
- "[WS::Router::Handler]: MAX_IDLE_TIME reached. Normal shutdown client_id: {}",
- client_id_clone
- );
- if let Err(e) = disconnect_tx_clone2.send(None).await {
- log::error!(
- "[WS::Router::Handler] Error informing handle_outgoing to stop: {e}"
- );
- }
- break;
- }
- }
- });
+ let _session_manager_clone = session_manager.clone();
+ let _cfg_clone = cfg.clone();
+ let _disconnect_tx_clone2 = disconnect_tx.clone();
+ // tokio::spawn(async move {
+ // let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
+ // cfg_clone.websocket.session_idle_timeout_secs as _,
+ // ));
+ // interval.tick().await;
+ // loop {
+ // interval.tick().await;
+ // if session_manager_clone
+ // .reached_max_idle_time(&client_id_clone)
+ // .await
+ // {
+ // log::info!(
+ // "[WS::Router::Handler]: MAX_IDLE_TIME reached. Normal shutdown client_id:
+ // {}", client_id_clone
+ // );
+ // if let Err(e) = disconnect_tx_clone2.send(None).await {
+ // log::error!(
+ // "[WS::Router::Handler] Error informing handle_outgoing to stop: {e}"
+ // );
+ // }
+ // break;
+ // }
+ // }
+ // });
Greptile
greptile
style: Consider keeping idle timeout functionality enabled to prevent resource leaks from abandoned connections
diff block
auth
}
+
+#[command]
+pub fn delete_legacy_auth() -> Result<(), String> {
+ let mut db = get_leveldb_connection();
+
+ db.delete(b"auth").unwrap();
+
+ Ok(())
Greptile
greptile
logic: Missing db.close() call after delete operation, which could lead to resource leaks
suggested fix
db.delete(b"auth").unwrap();
+ db.close().unwrap();
Ok(())
diff block
+/*
+ * This file is part of WebGoat, an Open Web Application Security Project utility. For details, please see http://www.owasp.org/
+ *
+ * Copyright (c) 2002 - 2019 Bruce Mayhew
+ *
+ * This program is free software; you can redistribute it and/or modify it under the terms of the
+ * GNU General Public License as published by the Free Software Foundation; either version 2 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
+ * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with this program; if
+ * not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
+ * 02111-1307, USA.
+ *
+ * Getting Source ==============
+ *
+ * Source for this application is maintained at https://github.com/WebGoat/WebGoat, a repository for free software projects.
+ */
+
+package org.owasp.webgoat.lessons.sqlinjection.introduction;
+
+import static org.hsqldb.jdbc.JDBCResultSet.CONCUR_UPDATABLE;
+import static org.hsqldb.jdbc.JDBCResultSet.TYPE_SCROLL_SENSITIVE;
+import static org.owasp.webgoat.container.assignments.AttackResultBuilder.failed;
+import static org.owasp.webgoat.container.assignments.AttackResultBuilder.success;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.owasp.webgoat.container.LessonDataSource;
+import org.owasp.webgoat.container.assignments.AssignmentEndpoint;
+import org.owasp.webgoat.container.assignments.AssignmentHints;
+import org.owasp.webgoat.container.assignments.AttackResult;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@AssignmentHints(
+ value = {
+ "SqlStringInjectionHint.9.1",
+ "SqlStringInjectionHint.9.2",
+ "SqlStringInjectionHint.9.3",
+ "SqlStringInjectionHint.9.4",
+ "SqlStringInjectionHint.9.5"
+ })
+public class SqlInjectionLesson9 implements AssignmentEndpoint {
+
+ private final LessonDataSource dataSource;
+
+ public SqlInjectionLesson9(LessonDataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ @PostMapping("/SqlInjection/attack9")
+ @ResponseBody
+ public AttackResult completed(@RequestParam String name, @RequestParam String auth_tan) {
+ return injectableQueryIntegrity(name, auth_tan);
+ }
+
+ protected AttackResult injectableQueryIntegrity(String name, String auth_tan) {
+ StringBuilder output = new StringBuilder();
+ String queryInjection =
+ "SELECT * FROM employees WHERE last_name = '"
+ + name
+ + "' AND auth_tan = '"
+ + auth_tan
+ + "'";
+ try (Connection connection = dataSource.getConnection()) {
+ // V2019_09_26_7__employees.sql
+ int oldMaxSalary = this.getMaxSalary(connection);
+ int oldSumSalariesOfOtherEmployees = this.getSumSalariesOfOtherEmployees(connection);
+ // begin transaction
+ connection.setAutoCommit(false);
+ // do injectable query
+ Statement statement = connection.createStatement(TYPE_SCROLL_SENSITIVE, CONCUR_UPDATABLE);
+ SqlInjectionLesson8.log(connection, queryInjection);
+ statement.execute(queryInjection);
Greptile
greptile
style: Statement is not closed in a finally block or try-with-resources, which could lead to resource leaks
suggested fix
+ try (Statement statement = connection.createStatement(TYPE_SCROLL_SENSITIVE, CONCUR_UPDATABLE)) {
SqlInjectionLesson8.log(connection, queryInjection);
statement.execute(queryInjection);
}
diff block
+from time import time_ns
+import argparse
+
+from playwright.sync_api import Playwright, sync_playwright
+
+proxy_server = {
+ "server": "http://squid:3128",
+}
+
+def log_note(message: str) -> None:
+ timestamp = str(time_ns())[:16]
+ print(f"{timestamp} {message}")
+
+def run(p_sync: Playwright, browser_name: str, fifo_path: str) -> None:
+ log_note(f"Launch browser {browser_name}")
+ if browser_name == "firefox":
+ browser = p_sync.firefox.launch(headless=True, proxy=proxy_server)
+ else:
+ # this leverages new headless mode by Chromium: https://developer.chrome.com/articles/new-headless/
+ # The mode is however ~40% slower: https://github.com/microsoft/playwright/issues/21216
+ browser = p_sync.chromium.launch(headless=False,args=["--headless=new"], proxy=proxy_server)
+ context = browser.new_context(ignore_https_errors=True)
+ page = context.new_page()
+
+ try:
+ for _ in range(1,10):
+ with open(fifo_path, 'r', encoding='utf-8') as fifo:
+ # Read data from the named pipe
+ url = fifo.read()
+ log_note(f"Opening URL {url}")
+ context = browser.new_context(ignore_https_errors=True)
+ page = context.new_page()
+ page.goto(url)
+ page.wait_for_load_state('load')
+ log_note(f"Finished loading URL {url}")
+ page.close()
+ context.close()
+
+
+ except Exception as e:
+ if hasattr(e, 'message'): # only Playwright error class has this member
+ log_note(f"Exception occurred: {e.message}")
+ log_note("Page content was:")
+ log_note(page.content())
+ raise e
Greptile
greptile
logic: Potential resource leak: page.content() may fail if page is already closed, and original exception could be lost. Consider logging the original exception first
suggested fix
except Exception as e:
if hasattr(e, 'message'): # only Playwright error class has this member
log_note(f"Exception occurred: {e.message}")
try:
log_note("Page content was:")
log_note(page.content())
+ except Exception as content_error:
+ log_note(f"Failed to get page content: {content_error}")
raise e
diff block
+// Cross-platform UUID generation function
+import { AbstractAgent, EventType } from '@ag-ui/client';
+import type {
+ BaseEvent,
+ RunAgentInput,
+ AgentConfig,
+ RunStartedEvent,
+ RunFinishedEvent,
+ TextMessageStartEvent,
+ TextMessageContentEvent,
+ TextMessageEndEvent,
+ Message,
+ ToolCallStartEvent,
+ ToolCallArgsEvent,
+ ToolCallEndEvent,
+} from '@ag-ui/client';
+import type { CoreMessage } from '@mastra/core';
+import { Observable } from 'rxjs';
+import type { Agent } from '../resources/agent';
+
+interface MastraAgentConfig extends AgentConfig {
+ agent: Agent;
+ agentId: string;
+ resourceId?: string;
+}
+
+export class AGUIAdapter extends AbstractAgent {
+ agent: Agent;
+ resourceId?: string;
+ constructor({ agent, agentId, resourceId, ...rest }: MastraAgentConfig) {
+ super({
+ agentId,
+ ...rest,
+ });
+ this.agent = agent;
+ this.resourceId = resourceId;
+ }
+
+ protected run(input: RunAgentInput): Observable<BaseEvent> {
+ return new Observable<BaseEvent>(subscriber => {
+ const convertedMessages = convertMessagesToMastraMessages(input.messages);
+
+ subscriber.next({
+ type: EventType.RUN_STARTED,
+ threadId: input.threadId,
+ runId: input.runId,
+ } as RunStartedEvent);
+
+ this.agent
+ .stream({
+ threadId: input.threadId,
+ resourceId: this.resourceId ?? '',
+ runId: input.runId,
+ messages: convertedMessages,
+ clientTools: input.tools.reduce(
+ (acc, tool) => {
+ acc[tool.name as string] = {
+ id: tool.name,
+ description: tool.description,
+ inputSchema: tool.parameters,
+ };
+ return acc;
+ },
+ {} as Record<string, any>,
+ ),
+ })
+ .then(response => {
+ let currentMessageId: string | undefined = undefined;
+ return response.processDataStream({
+ onTextPart: text => {
+ if (currentMessageId === undefined) {
+ currentMessageId = generateUUID();
+
+ const message: TextMessageStartEvent = {
+ type: EventType.TEXT_MESSAGE_START,
+ messageId: currentMessageId,
+ role: 'assistant',
+ };
+ subscriber.next(message);
+ }
+
+ const message: TextMessageContentEvent = {
+ type: EventType.TEXT_MESSAGE_CONTENT,
+ messageId: currentMessageId,
+ delta: text,
+ };
+ subscriber.next(message);
+ },
+ onFinishMessagePart: message => {
+ console.log('onFinishMessagePart', message);
+ if (currentMessageId !== undefined) {
+ const message: TextMessageEndEvent = {
+ type: EventType.TEXT_MESSAGE_END,
+ messageId: currentMessageId,
+ };
+ subscriber.next(message);
+ }
+ // Emit run finished event
+ subscriber.next({
+ type: EventType.RUN_FINISHED,
+ threadId: input.threadId,
+ runId: input.runId,
+ } as RunFinishedEvent);
+
+ // Complete the observable
+ subscriber.complete();
+ },
+ onToolCallPart(streamPart) {
+ const parentMessageId = currentMessageId || generateUUID();
+ subscriber.next({
+ type: EventType.TOOL_CALL_START,
+ toolCallId: streamPart.toolCallId,
+ toolCallName: streamPart.toolName,
+ parentMessageId,
+ } as ToolCallStartEvent);
+
+ subscriber.next({
+ type: EventType.TOOL_CALL_ARGS,
+ toolCallId: streamPart.toolCallId,
+ delta: JSON.stringify(streamPart.args),
+ parentMessageId,
+ } as ToolCallArgsEvent);
+
+ subscriber.next({
+ type: EventType.TOOL_CALL_END,
+ toolCallId: streamPart.toolCallId,
+ parentMessageId,
+ } as ToolCallEndEvent);
+ },
+ });
+ })
+ .catch(error => {
+ console.log('error', error);
+ // Handle error
+ subscriber.error(error);
+ });
+
+ return () => {};
Greptile
greptile
logic: empty teardown function could lead to resource leaks - should clean up any remaining subscriptions or resources
diff block
+import { err, ok } from "../../packages/common/result";
+import { Kafka } from "@upstash/kafka";
+import { PromiseGenericResult } from "../../packages/common/result";
+
+import { QueuePayload } from "./types";
+import { MessageProducer } from "./types";
+import { HeliconeScoresMessage } from "../handlers/HandlerContext";
+
+const KAFKA_CREDS = JSON.parse(process.env.KAFKA_CREDS ?? "{}");
+export const KAFKA_ENABLED = (KAFKA_CREDS?.KAFKA_ENABLED ?? "false") === "true";
+const KAFKA_URL = KAFKA_CREDS?.UPSTASH_KAFKA_URL;
+const KAFKA_USERNAME = KAFKA_CREDS?.UPSTASH_KAFKA_USERNAME;
+const KAFKA_PASSWORD = KAFKA_CREDS?.UPSTASH_KAFKA_PASSWORD;
+
+export class KafkaProducer implements MessageProducer {
+ private producer: Kafka | null = null;
+
+ constructor() {
+ if (!KAFKA_ENABLED || !KAFKA_URL || !KAFKA_USERNAME || !KAFKA_PASSWORD) {
+ console.log(
+ "Required Kafka environment variables are not set, KafkaProducer will not be initialized."
+ );
+ return;
+ }
+
+ this.producer = new Kafka({
+ url: KAFKA_URL,
+ username: KAFKA_USERNAME,
+ password: KAFKA_PASSWORD,
+ });
+ }
+
+ async sendMessages({
+ msgs,
+ topic,
+ }: QueuePayload): PromiseGenericResult<string> {
+ if (!this.producer) {
+ return err("Kafka is not initialized");
+ }
+
+ const producer = this.producer.producer();
Greptile
greptile
style: producer instance should be closed after use to prevent resource leaks
diff block
+"""Tracer implementation for Langfuse OpenTelemetry integration.
+
+This module provides the LangfuseTracer class, a thread-safe singleton that manages OpenTelemetry
+tracing infrastructure for Langfuse. It handles tracer initialization, span processors,
+API clients, and coordinates background tasks for efficient data processing and media handling.
+
+Key features:
+- Thread-safe OpenTelemetry tracer with Langfuse-specific span processors and sampling
+- Configurable batch processing of spans and scores with intelligent flushing behavior
+- Asynchronous background media upload processing with dedicated worker threads
+- Concurrent score ingestion with batching and retry mechanisms
+- Automatic project ID discovery and caching
+- Graceful shutdown handling with proper resource cleanup
+- Fault tolerance with detailed error logging and recovery mechanisms
+"""
+
+import atexit
+import os
+import threading
+from queue import Full, Queue
+from typing import Dict, Optional, cast
+
+import httpx
+from opentelemetry import trace as otel_trace_api
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.sampling import Decision, TraceIdRatioBased
+
+from langfuse._client.attributes import LangfuseOtelSpanAttributes
+from langfuse._client.constants import LANGFUSE_TRACER_NAME
+from langfuse._client.environment_variables import (
+ LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT,
+ LANGFUSE_RELEASE,
+ LANGFUSE_TRACING_ENVIRONMENT,
+)
+from langfuse._client.span_processor import LangfuseSpanProcessor
+from langfuse._task_manager.media_manager import MediaManager
+from langfuse._task_manager.media_upload_consumer import MediaUploadConsumer
+from langfuse._task_manager.score_ingestion_consumer import ScoreIngestionConsumer
+from langfuse._utils.environment import get_common_release_envs
+from langfuse._utils.prompt_cache import PromptCache
+from langfuse._utils.request import LangfuseClient
+from langfuse.api.client import AsyncFernLangfuse, FernLangfuse
+from langfuse.logger import langfuse_logger
+
+from ..version import __version__ as langfuse_version
+
+
+class LangfuseResourceManager:
+ """Thread-safe singleton that provides access to the OpenTelemetry tracer and processors.
+
+ This class implements a thread-safe singleton pattern keyed by the public API key,
+ ensuring that only one tracer instance exists per API key combination. It manages
+ the lifecycle of the OpenTelemetry tracer provider, span processors, and resource
+ attributes, as well as background threads for media uploads and score ingestion.
+
+ The tracer is responsible for:
+ 1. Setting up the OpenTelemetry tracer with appropriate sampling and configuration
+ 2. Managing the span processor for exporting spans to the Langfuse API
+ 3. Creating and managing Langfuse API clients (both synchronous and asynchronous)
+ 4. Handling background media upload processing via dedicated worker threads
+ 5. Processing and batching score ingestion events with configurable flush settings
+ 6. Retrieving and caching project information for URL generation and media handling
+ 7. Coordinating graceful shutdown of all background processes with proper resource cleanup
+
+ This implementation follows best practices for resource management in long-running
+ applications, including thread-safe singleton pattern, bounded queues to prevent memory
+ exhaustion, proper resource cleanup on shutdown, and fault-tolerant error handling with
+ detailed logging.
+
+ Thread safety is ensured through the use of locks, thread-safe queues, and atomic operations,
+ making this implementation suitable for multi-threaded and asyncio applications.
+ """
+
+ _instances: Dict[str, "LangfuseResourceManager"] = {}
+ _lock = threading.RLock()
+
+ def __new__(
+ cls,
+ *,
+ public_key: str,
+ secret_key: str,
+ host: str,
+ environment: Optional[str] = None,
+ release: Optional[str] = None,
+ timeout: Optional[int] = None,
+ flush_at: Optional[int] = None,
+ flush_interval: Optional[float] = None,
+ httpx_client: Optional[httpx.Client] = None,
+ media_upload_thread_count: Optional[int] = None,
+ sample_rate: Optional[float] = None,
+ ) -> "LangfuseResourceManager":
+ if public_key in cls._instances:
+ return cls._instances[public_key]
+
+ with cls._lock:
+ if public_key not in cls._instances:
+ instance = super(LangfuseResourceManager, cls).__new__(cls)
+ instance._otel_tracer = None
+ instance._initialize_instance(
+ public_key=public_key,
+ secret_key=secret_key,
+ host=host,
+ timeout=timeout,
+ environment=environment,
+ release=release,
+ flush_at=flush_at,
+ flush_interval=flush_interval,
+ httpx_client=httpx_client,
+ media_upload_thread_count=media_upload_thread_count,
+ sample_rate=sample_rate,
+ )
+
+ cls._instances[public_key] = instance
+
+ return cls._instances[public_key]
+
+ def _initialize_instance(
+ self,
+ *,
+ public_key: str,
+ secret_key: str,
+ host: str,
+ environment: Optional[str] = None,
+ release: Optional[str] = None,
+ timeout: Optional[int] = None,
+ flush_at: Optional[int] = None,
+ flush_interval: Optional[float] = None,
+ media_upload_thread_count: Optional[int] = None,
+ httpx_client: Optional[httpx.Client] = None,
+ sample_rate: Optional[float] = None,
+ ):
+ self.public_key = public_key
+
+ # OTEL Tracer
+ tracer_provider = _init_tracer_provider(
+ environment=environment, release=release, sample_rate=sample_rate
+ )
+
+ langfuse_processor = LangfuseSpanProcessor(
+ public_key=self.public_key,
+ secret_key=secret_key,
+ host=host,
+ timeout=timeout,
+ flush_at=flush_at,
+ flush_interval=flush_interval,
+ )
+ tracer_provider.add_span_processor(langfuse_processor)
+
+ tracer_provider = otel_trace_api.get_tracer_provider()
+ self._otel_tracer = tracer_provider.get_tracer(
+ LANGFUSE_TRACER_NAME,
+ langfuse_version,
+ attributes={"public_key": self.public_key},
+ )
+
+ # API Clients
+
+ ## API clients must be singletons because the underlying HTTPX clients
+ ## use connection pools with limited capacity. Creating multiple instances
+ ## could exhaust the OS's maximum number of available TCP sockets (file descriptors),
+ ## leading to connection errors.
+ self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
+ self.api = FernLangfuse(
+ base_url=host,
+ username=self.public_key,
+ password=secret_key,
+ x_langfuse_sdk_name="python",
+ x_langfuse_sdk_version=langfuse_version,
+ x_langfuse_public_key=self.public_key,
+ httpx_client=self.httpx_client,
+ timeout=timeout,
+ )
+ self.async_api = AsyncFernLangfuse(
+ base_url=host,
+ username=self.public_key,
+ password=secret_key,
+ x_langfuse_sdk_name="python",
+ x_langfuse_sdk_version=langfuse_version,
+ x_langfuse_public_key=self.public_key,
+ timeout=timeout,
+ )
+ score_ingestion_client = LangfuseClient(
+ public_key=self.public_key,
+ secret_key=secret_key,
+ base_url=host,
+ version=langfuse_version,
+ timeout=timeout or 20,
+ session=self.httpx_client,
+ )
+
+ # Media
+ self._media_upload_queue = Queue(100_000)
+ self._media_manager = MediaManager(
+ api_client=self.api,
+ media_upload_queue=self._media_upload_queue,
+ max_retries=3,
+ )
+ self._media_upload_consumers = []
+
+ media_upload_thread_count = media_upload_thread_count or max(
+ int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1
+ )
+
+ for i in range(media_upload_thread_count):
+ media_upload_consumer = MediaUploadConsumer(
+ identifier=i,
+ media_manager=self._media_manager,
+ )
+ media_upload_consumer.start()
+ self._media_upload_consumers.append(media_upload_consumer)
+
+ # Prompt cache
+ self.prompt_cache = PromptCache()
+
+ # Score ingestion
+ self._score_ingestion_queue = Queue(100_000)
+ self._ingestion_consumers = []
+
+ ingestion_consumer = ScoreIngestionConsumer(
+ ingestion_queue=self._score_ingestion_queue,
+ identifier=0,
+ client=score_ingestion_client,
+ flush_at=flush_at,
+ flush_interval=flush_interval,
+ max_retries=3,
+ public_key=self.public_key,
+ )
+ ingestion_consumer.start()
+ self._ingestion_consumers.append(ingestion_consumer)
+
+ # Register shutdown handler
+ atexit.register(self.shutdown)
+
+ langfuse_logger.info(
+ f"Startup: Langfuse tracer successfully initialized | "
+ f"public_key={self.public_key} | "
+ f"host={host} | "
+ f"environment={environment or 'default'} | "
+ f"sample_rate={sample_rate if sample_rate is not None else 1.0} | "
+ f"media_threads={media_upload_thread_count or 1}"
+ )
+
+ def add_score_task(self, event: dict):
+ try:
+ # Sample scores with the same sampler that is used for tracing
+ tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider())
+ should_sample = (
+ tracer_provider.sampler.should_sample(
+ parent_context=None,
+ trace_id=int(event["body"].trace_id, 16),
+ name="score",
+ ).decision
+ == Decision.RECORD_AND_SAMPLE
+ if hasattr(event["body"], "trace_id")
+ else True
+ )
+
+ if should_sample:
+ langfuse_logger.debug(
+ f"Score: Enqueuing event type={event['type']} for trace_id={event['body'].trace_id} name={event['body'].name} value={event['body'].value}"
+ )
+ self._score_ingestion_queue.put(event, block=False)
+
+ except Full:
+ langfuse_logger.warning(
+ "System overload: Score ingestion queue has reached capacity (100,000 items). Score will be dropped. Consider increasing flush frequency or decreasing event volume."
+ )
+
+ return
+ except Exception as e:
+ langfuse_logger.error(
+ f"Unexpected error: Failed to process score event. The score will be dropped. Error details: {e}"
+ )
+
+ return
+
+ @property
+ def tracer(self):
+ return self._otel_tracer
+
+ @staticmethod
+ def get_current_span():
+ return otel_trace_api.get_current_span()
+
+ def _join_consumer_threads(self):
+ """End the consumer threads once the queue is empty.
+
+ Blocks execution until finished
+ """
+ langfuse_logger.debug(
+ f"Shutdown: Waiting for {len(self._media_upload_consumers)} media upload thread(s) to complete processing"
+ )
+ for media_upload_consumer in self._media_upload_consumers:
+ media_upload_consumer.pause()
+
+ for media_upload_consumer in self._media_upload_consumers:
+ try:
+ media_upload_consumer.join()
+ except RuntimeError:
+ # consumer thread has not started
+ pass
+
+ langfuse_logger.debug(
+ f"Shutdown: Media upload thread #{media_upload_consumer._identifier} successfully terminated"
+ )
+
+ langfuse_logger.debug(
+ f"Shutdown: Waiting for {len(self._ingestion_consumers)} score ingestion thread(s) to complete processing"
+ )
+ for score_ingestion_consumer in self._ingestion_consumers:
+ score_ingestion_consumer.pause()
+
+ for score_ingestion_consumer in self._ingestion_consumers:
+ try:
+ score_ingestion_consumer.join()
+ except RuntimeError:
+ # consumer thread has not started
+ pass
+
+ langfuse_logger.debug(
+ f"Shutdown: Score ingestion thread #{score_ingestion_consumer._identifier} successfully terminated"
+ )
+
+ def flush(self):
+ tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider())
+ if isinstance(tracer_provider, otel_trace_api.ProxyTracerProvider):
+ return
+
+ tracer_provider.force_flush()
+
+ def shutdown(self):
+ # Unregister the atexit handler first
+ atexit.unregister(self.shutdown)
Greptile
greptile
logic: unregistering atexit handler before cleanup could lead to resource leaks if shutdown fails - should unregister after successful cleanup
suggested fix
tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider())
if isinstance(tracer_provider, otel_trace_api.ProxyTracerProvider):
return
tracer_provider.force_flush()
+ self._join_consumer_threads()
+ # Unregister the atexit handler after successful cleanup
atexit.unregister(self.shutdown)
diff block
+import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
+
+export interface S3Config {
+ region: string;
+ credentials: {
+ accessKeyId: string;
+ secretAccessKey: string;
+ };
+ endpoint?: string;
+}
+
+export interface S3UploadParams {
+ Bucket: string;
+ Key: string;
+ Body: Buffer;
+}
+
+async function upload(uploadParams: S3UploadParams, config: S3Config): Promise<void> {
+ const s3Client = new S3Client(config);
Greptile
greptile
style: S3Client should be reused rather than created for each upload to avoid resource leaks
diff block
impl WsError {
/// Disconnect the ws conn to client from router
pub fn should_disconnect(&self) -> bool {
- matches!(
- self,
- WsError::ProtocolError(_) | WsError::QuerierWsConnNotAvailable(_)
- )
+ false
+ // matches!(
+ // self,
+ // WsError::ProtocolError(_) | WsError::QuerierWsConnNotAvailable(_)
+ // )
}
Greptile
greptile
logic: Disabling automatic disconnection for protocol errors and querier unavailability could lead to zombie connections and resource leaks. Consider keeping the original logic or documenting why this change is necessary.
diff block
# Get the rewritten queries in a defined format
model = graph_config.tooling.fast_llm
+ # Get the system prompt for initial decomposition
+ db_session = next(get_session())
+ initial_decomposition_prompt = get_system_prompt("initial_decomposition_prompt", db_session=db_session)
Greptile
greptile
style: Database session is created but never explicitly closed. Consider using a context manager or ensuring the session is closed after use to prevent potential resource leaks.
suggested fix
+ with get_session() as db_session:
initial_decomposition_prompt = get_system_prompt("initial_decomposition_prompt", db_session=db_session)
diff block
+import { useState, useEffect, useRef } from "react";
+import { ChildProcess, spawn } from "child_process";
+import {
+ generateAudioFilename,
+ ensureTempDirectory,
+ checkSoxInstalled,
+ buildSoxCommand,
+ validateAudioFile,
+} from "../utils/audio";
+import { showToast, Toast } from "@raycast/api";
+import { ErrorTypes } from "../types";
+
+interface AudioRecorderHook {
+ isRecording: boolean;
+ recordingDuration: number;
+ recordingPath: string | null;
+ error: string | null;
+ startRecording: () => Promise<string | null>;
+ stopRecording: () => Promise<string | null>;
+}
+
+export function useAudioRecorder(): AudioRecorderHook {
+ const [isRecording, setIsRecording] = useState<boolean>(false);
+ const [recordingDuration, setRecordingDuration] = useState<number>(0);
+ const [recordingPath, setRecordingPath] = useState<string | null>(null);
+ const [error, setError] = useState<string | null>(null);
+
+ const recordingProcess = useRef<ChildProcess | null>(null);
+ const durationInterval = useRef<NodeJS.Timeout | null>(null);
+
+ useEffect(() => {
+ const checkSox = async () => {
+ const soxPath = await checkSoxInstalled();
+ if (!soxPath) {
+ setError(ErrorTypes.SOX_NOT_INSTALLED);
+ }
+ };
+
+ checkSox();
+
+ return () => {
+ if (isRecording) {
+ stopRecording();
Greptile
greptile
logic: stopRecording() is called in cleanup but its return value is not awaited, which could lead to resource leaks
diff block
logger.warning("Unexpected credentials provided for Web Connector")
return None
+ def _do_scrape(
+ self, index: int, initial_url: str, scrape_context: ScrapeContext
+ ) -> bool:
+ """returns False if the caller should continue (usually due to skipping duplicates),
+ True if the page scraped normally"""
+
+ if scrape_context.playwright is None:
+ raise RuntimeError("scrape_context.playwright is None")
+
+ if scrape_context.playwright_context is None:
+ raise RuntimeError("scrape_context.playwright_context is None")
+
+ if scrape_context.retry_count > 0:
+ # Add a random delay between retries (exponential backoff)
+ delay = min(2**scrape_context.retry_count + random.uniform(0, 1), 10)
+ logger.info(
+ f"Retry {scrape_context.retry_count}/{self.MAX_RETRIES} for {initial_url} after {delay:.2f}s delay"
+ )
+ time.sleep(delay)
+
+ if scrape_context.restart_playwright:
+ scrape_context.playwright, scrape_context.playwright_context = (
+ start_playwright()
+ )
+ scrape_context.restart_playwright = False
+
+ # Handle cookies for the URL
+ _handle_cookies(scrape_context.playwright_context, initial_url)
+
+ # First do a HEAD request to check content type without downloading the entire content
+ head_response = requests.head(
+ initial_url, headers=DEFAULT_HEADERS, allow_redirects=True
+ )
+ is_pdf = is_pdf_content(head_response)
+
+ if is_pdf or initial_url.lower().endswith(".pdf"):
+ # PDF files are not checked for links
+ response = requests.get(initial_url, headers=DEFAULT_HEADERS)
+ page_text, metadata, images = read_pdf_file(
+ file=io.BytesIO(response.content)
+ )
+ last_modified = response.headers.get("Last-Modified")
+
+ scrape_context.doc_batch.append(
+ Document(
+ id=initial_url,
+ sections=[TextSection(link=initial_url, text=page_text)],
+ source=DocumentSource.WEB,
+ semantic_identifier=initial_url.split("/")[-1],
+ metadata=metadata,
+ doc_updated_at=(
+ _get_datetime_from_last_modified_header(last_modified)
+ if last_modified
+ else None
+ ),
+ )
+ )
+ scrape_context.retry_success = True
+ return False
+
+ page = scrape_context.playwright_context.new_page()
+
+ if self.add_randomness:
+ # Add random mouse movements and scrolling to mimic human behavior
+ page.mouse.move(random.randint(100, 700), random.randint(100, 500))
+
+ # Can't use wait_until="networkidle" because it interferes with the scrolling behavior
+ page_response = page.goto(
+ initial_url,
+ timeout=30000, # 30 seconds
+ wait_until="domcontentloaded", # Wait for DOM to be ready
+ )
+
+ # Add a small random delay to mimic human behavior
+ time.sleep(random.uniform(0.5, 2.0))
+
+ last_modified = (
+ page_response.header_value("Last-Modified") if page_response else None
+ )
+ final_url = page.url
+ if final_url != initial_url:
+ protected_url_check(final_url)
+ initial_url = final_url
+ if initial_url in scrape_context.visited_links:
+ logger.info(
+ f"{index}: {initial_url} redirected to {final_url} - already indexed"
+ )
+ page.close()
+ scrape_context.retry_success = True
+ return False
+
+ logger.info(f"{index}: {initial_url} redirected to {final_url}")
+ scrape_context.visited_links.add(initial_url)
+
+ # If we got here, the request was successful
+ scrape_context.retry_success = True
+
+ if self.scroll_before_scraping:
+ scroll_attempts = 0
+ previous_height = page.evaluate("document.body.scrollHeight")
+ while scroll_attempts < WEB_CONNECTOR_MAX_SCROLL_ATTEMPTS:
+ page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
+ page.wait_for_load_state("networkidle", timeout=30000)
+ new_height = page.evaluate("document.body.scrollHeight")
+ if new_height == previous_height:
+ break # Stop scrolling when no more content is loaded
+ previous_height = new_height
+ scroll_attempts += 1
+
+ content = page.content()
+ soup = BeautifulSoup(content, "html.parser")
+
+ if self.recursive:
+ internal_links = get_internal_links(
+ scrape_context.base_url, initial_url, soup
+ )
+ for link in internal_links:
+ if link not in scrape_context.visited_links:
+ scrape_context.to_visit.append(link)
+
+ if page_response and str(page_response.status)[0] in ("4", "5"):
+ scrape_context.last_error = f"Skipped indexing {initial_url} due to HTTP {page_response.status} response"
+ logger.info(scrape_context.last_error)
+ return False
+
+ parsed_html = web_html_cleanup(soup, self.mintlify_cleanup)
+
+ """For websites containing iframes that need to be scraped,
+ the code below can extract text from within these iframes.
+ """
+ logger.debug(f"{index}: Length of cleaned text {len(parsed_html.cleaned_text)}")
+ if JAVASCRIPT_DISABLED_MESSAGE in parsed_html.cleaned_text:
+ iframe_count = page.frame_locator("iframe").locator("html").count()
+ if iframe_count > 0:
+ iframe_texts = (
+ page.frame_locator("iframe").locator("html").all_inner_texts()
+ )
+ document_text = "\n".join(iframe_texts)
+ """ 700 is the threshold value for the length of the text extracted
+ from the iframe based on the issue faced """
+ if len(parsed_html.cleaned_text) < IFRAME_TEXT_LENGTH_THRESHOLD:
+ parsed_html.cleaned_text = document_text
+ else:
+ parsed_html.cleaned_text += "\n" + document_text
+
+ # Sometimes pages with #! will serve duplicate content
+ # There are also just other ways this can happen
+ hashed_text = hash((parsed_html.title, parsed_html.cleaned_text))
+ if hashed_text in scrape_context.content_hashes:
+ logger.info(
+ f"{index}: Skipping duplicate title + content for {initial_url}"
+ )
+ return False
+
+ scrape_context.content_hashes.add(hashed_text)
+
+ scrape_context.doc_batch.append(
+ Document(
+ id=initial_url,
+ sections=[TextSection(link=initial_url, text=parsed_html.cleaned_text)],
+ source=DocumentSource.WEB,
+ semantic_identifier=parsed_html.title or initial_url,
+ metadata={},
+ doc_updated_at=(
+ _get_datetime_from_last_modified_header(last_modified)
+ if last_modified
+ else None
+ ),
+ )
+ )
+
+ page.close()
+ return True
+
def load_from_state(self) -> GenerateDocumentsOutput:
"""Traverses through all pages found on the website
and converts them into documents"""
- visited_links: set[str] = set()
- to_visit: list[str] = self.to_visit_list
- content_hashes = set()
- if not to_visit:
+ if not self.to_visit_list:
raise ValueError("No URLs to visit")
- base_url = to_visit[0] # For the recursive case
- doc_batch: list[Document] = []
+ base_url = self.to_visit_list[0] # For the recursive case
+ check_internet_connection(base_url) # make sure we can connect to the base url
- # make sure we can connect to the base url
- check_internet_connection(base_url)
+ scrape_context = ScrapeContext(base_url=base_url, to_visit=self.to_visit_list)
- # Needed to report error
- at_least_one_doc = False
- last_error = None
+ scrape_context.playwright, scrape_context.playwright_context = (
+ start_playwright()
+ )
- playwright, context = start_playwright()
- restart_playwright = False
- while to_visit:
- initial_url = to_visit.pop()
- if initial_url in visited_links:
+ while scrape_context.to_visit:
+ initial_url = scrape_context.to_visit.pop()
+ if initial_url in scrape_context.visited_links:
continue
- visited_links.add(initial_url)
+ scrape_context.visited_links.add(initial_url)
try:
protected_url_check(initial_url)
except Exception as e:
- last_error = f"Invalid URL {initial_url} due to {e}"
- logger.warning(last_error)
+ scrape_context.last_error = f"Invalid URL {initial_url} due to {e}"
+ logger.warning(scrape_context.last_error)
continue
- index = len(visited_links)
+ index = len(scrape_context.visited_links)
logger.info(f"{index}: Visiting {initial_url}")
# Add retry mechanism with exponential backoff
- max_retries = 3
- retry_count = 0
- retry_success = False
+ scrape_context.retry_count = 0
+ scrape_context.retry_success = False
- while retry_count < max_retries and not retry_success:
+ while (
+ scrape_context.retry_count < self.MAX_RETRIES
+ and not scrape_context.retry_success
+ ):
try:
- if retry_count > 0:
- # Add a random delay between retries (exponential backoff)
- delay = min(2**retry_count + random.uniform(0, 1), 10)
- logger.info(
- f"Retry {retry_count}/{max_retries} for {initial_url} after {delay:.2f}s delay"
- )
- time.sleep(delay)
-
- if restart_playwright:
- playwright, context = start_playwright()
- restart_playwright = False
-
- # Handle cookies for the URL
- _handle_cookies(context, initial_url)
-
- # First do a HEAD request to check content type without downloading the entire content
- head_response = requests.head(
- initial_url, headers=DEFAULT_HEADERS, allow_redirects=True
- )
- is_pdf = is_pdf_content(head_response)
-
- if is_pdf or initial_url.lower().endswith(".pdf"):
- # PDF files are not checked for links
- response = requests.get(initial_url, headers=DEFAULT_HEADERS)
- page_text, metadata, images = read_pdf_file(
- file=io.BytesIO(response.content)
- )
- last_modified = response.headers.get("Last-Modified")
-
- doc_batch.append(
- Document(
- id=initial_url,
- sections=[
- TextSection(link=initial_url, text=page_text)
- ],
- source=DocumentSource.WEB,
- semantic_identifier=initial_url.split("/")[-1],
- metadata=metadata,
- doc_updated_at=(
- _get_datetime_from_last_modified_header(
- last_modified
- )
- if last_modified
- else None
- ),
- )
- )
- retry_success = True
- continue
-
- page = context.new_page()
-
- if self.add_randomness:
- # Add random mouse movements and scrolling to mimic human behavior
- page.mouse.move(
- random.randint(100, 700), random.randint(100, 500)
- )
-
- # Can't use wait_until="networkidle" because it interferes with the scrolling behavior
- page_response = page.goto(
- initial_url,
- timeout=30000, # 30 seconds
- wait_until="domcontentloaded", # Wait for DOM to be ready
- )
-
- # Add a small random delay to mimic human behavior
- time.sleep(random.uniform(0.5, 2.0))
-
- # Check if we got a 403 error
- if page_response and page_response.status == 403:
- logger.warning(
- f"Received 403 Forbidden for {initial_url}, retrying..."
- )
- page.close()
- retry_count += 1
+ normal_scrape = self._do_scrape(index, initial_url, scrape_context)
+ if not normal_scrape:
continue
-
- last_modified = (
- page_response.header_value("Last-Modified")
- if page_response
- else None
- )
- final_url = page.url
- if final_url != initial_url:
- protected_url_check(final_url)
- initial_url = final_url
- if initial_url in visited_links:
- logger.info(
- f"{index}: {initial_url} redirected to {final_url} - already indexed"
- )
- page.close()
- retry_success = True
- continue
- logger.info(f"{index}: {initial_url} redirected to {final_url}")
- visited_links.add(initial_url)
-
- # If we got here, the request was successful
- retry_success = True
-
- if self.scroll_before_scraping:
- scroll_attempts = 0
- previous_height = page.evaluate("document.body.scrollHeight")
- while scroll_attempts < WEB_CONNECTOR_MAX_SCROLL_ATTEMPTS:
- page.evaluate(
- "window.scrollTo(0, document.body.scrollHeight)"
- )
- page.wait_for_load_state("networkidle", timeout=30000)
- new_height = page.evaluate("document.body.scrollHeight")
- if new_height == previous_height:
- break # Stop scrolling when no more content is loaded
- previous_height = new_height
- scroll_attempts += 1
-
- content = page.content()
- soup = BeautifulSoup(content, "html.parser")
-
- if self.recursive:
- internal_links = get_internal_links(base_url, initial_url, soup)
- for link in internal_links:
- if link not in visited_links:
- to_visit.append(link)
-
- if page_response and str(page_response.status)[0] in ("4", "5"):
- last_error = f"Skipped indexing {initial_url} due to HTTP {page_response.status} response"
- logger.info(last_error)
- continue
-
- parsed_html = web_html_cleanup(soup, self.mintlify_cleanup)
-
- """For websites containing iframes that need to be scraped,
- the code below can extract text from within these iframes.
- """
- logger.debug(
- f"{index}: Length of cleaned text {len(parsed_html.cleaned_text)}"
- )
- if JAVASCRIPT_DISABLED_MESSAGE in parsed_html.cleaned_text:
- iframe_count = (
- page.frame_locator("iframe").locator("html").count()
- )
- if iframe_count > 0:
- iframe_texts = (
- page.frame_locator("iframe")
- .locator("html")
- .all_inner_texts()
- )
- document_text = "\n".join(iframe_texts)
- """ 700 is the threshold value for the length of the text extracted
- from the iframe based on the issue faced """
- if (
- len(parsed_html.cleaned_text)
- < IFRAME_TEXT_LENGTH_THRESHOLD
- ):
- parsed_html.cleaned_text = document_text
- else:
- parsed_html.cleaned_text += "\n" + document_text
-
- # Sometimes pages with #! will serve duplicate content
- # There are also just other ways this can happen
- hashed_text = hash((parsed_html.title, parsed_html.cleaned_text))
- if hashed_text in content_hashes:
- logger.info(
- f"{index}: Skipping duplicate title + content for {initial_url}"
- )
- continue
- content_hashes.add(hashed_text)
-
- doc_batch.append(
- Document(
- id=initial_url,
- sections=[
- TextSection(
- link=initial_url, text=parsed_html.cleaned_text
- )
- ],
- source=DocumentSource.WEB,
- semantic_identifier=parsed_html.title or initial_url,
- metadata={},
- doc_updated_at=(
- _get_datetime_from_last_modified_header(last_modified)
- if last_modified
- else None
- ),
- )
- )
-
- page.close()
except Exception as e:
- last_error = f"Failed to fetch '{initial_url}': {e}"
- logger.exception(last_error)
- playwright.stop()
- restart_playwright = True
+ scrape_context.last_error = f"Failed to fetch '{initial_url}': {e}"
+ logger.exception(scrape_context.last_error)
+ scrape_context.playwright.stop()
+ scrape_context.restart_playwright = True
Greptile
greptile
style: Playwright context should be closed before stopping to prevent resource leaks
suggested fix
+ scrape_context.playwright_context.close()
scrape_context.playwright.stop()
scrape_context.restart_playwright = True
diff block
+from unittest.mock import patch, MagicMock
+
+from botocore.client import Config
+
+from posthog.settings.session_replay_v2 import (
+ SESSION_RECORDING_V2_S3_ACCESS_KEY_ID,
+ SESSION_RECORDING_V2_S3_BUCKET,
+ SESSION_RECORDING_V2_S3_ENDPOINT,
+ SESSION_RECORDING_V2_S3_REGION,
+ SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY,
+)
+from posthog.storage.session_recording_v2_object_storage import (
+ client,
+ SessionRecordingV2ObjectStorage,
+)
+from posthog.test.base import APIBaseTest
+
+TEST_BUCKET = "test_session_recording_v2_bucket"
+
+
+class TestSessionRecordingV2Storage(APIBaseTest):
+ def teardown_method(self, method) -> None:
+ pass
Greptile
greptile
style: Empty teardown method could lead to resource leaks. Either implement cleanup or remove the method.
suggested fix
diff block
)
.await;
}
+ if item_value.is_querier() && LOCAL_NODE.is_router() {
+ crate::router::http::remove_querier_from_handler(&item_value.name).await;
+ }
Greptile
greptile
style: Consider adding error handling for remove_querier_from_handler call failure to prevent potential resource leaks