nathant27 opened a new pull request, #5103: URL: https://github.com/apache/texera/pull/5103
### What changes were proposed in this PR? [Video Link](https://drive.google.com/file/d/14WV4hxlNoN3keYo8xOTaiYfMO4pfEiC9/view?usp=sharing) My Problem: When I was using Texera for the first time, the thing I wanted to do the most was interact with data in real time. While there are some previously implemented operators that interact with external services, like the twitch and reddit ones, they are very limited in what endpoints they can access, and they especially can't access data in realtime. In addition, I wanted to be able to get this data and analyze or summarize this data with LLM's instead of having to do it myself. My Solution: PulseFlow, my extension to Texera that allows for real time LLM analysis from external API datasources. For this extension I implemented 4 main operators that help build these live analytical workflows with real time data feeds: - PollingHttpSource polls an endpoint based on interval for a max number of iterations. For when web socket isn't available - WebSocketSource establishes web socket connection and uses the streamed response data for processing real time data. - HttpRequest for general Http Requests using REST methods. Mostly used POST for showing how it can interact with other external services with the extracted data from the Texera workflow - LLMAgent that can read and output data based on input tuples. Used for analyzing the responses and summarizing. Implementation Details 1. PollingHttpSource (source) - Polls an HTTP/REST endpoint at a fixed interval and emits each response as a tuple. - Configurable method (GET/POST/PUT/PATCH/DELETE), headers, request body, interval, and an optional maxIterations cap (0 = forever). - Output schema: response_body, status_code, polled_at. - Implemented via a forever-running Iterator that sleeps between polls; works around Texera's bounded-source model without engine changes. 2. WebSocketSource (source) - Connects to a ws:// or wss:// endpoint and emits each received frame as a tuple, forever. - Uses JDK 11+ java.net.http WebSocket; supports an initial subscribe message and arbitrary handshake headers. - Permissive URI handling: trims whitespace and percent-encodes the '@' character (common in Binance-style stream names) so users can paste provider URLs verbatim. - Requests Long.MaxValue messages up front to avoid per-frame back-pressure bookkeeping; reassembles partial text frames. - Output schema: message, received_at. 3. HttpRequest (transformer) - For each input tuple, performs a configurable HTTP call with ${fieldName} interpolation in URL and body templates. - Appends http_request_status, http_request_body, and http_request_error to the input schema (namespaced to avoid collisions with upstream columns like response_body). - failOnError toggle controls whether non-2xx responses crash the workflow or are surfaced inline. 4. LLMAgent (transformer) - Calls an Anthropic Messages or OpenAI Chat Completions endpoint per tuple with a templated system + user prompt. - Provider enum (LLMProvider: ANTHROPIC, OPENAI) switches request body shape and reply-text extraction path (content[0].text vs choices[0].message.content). - Request body built via Jackson ObjectNode so user-supplied prompt content is automatically JSON-escaped — no broken templates from embedded quotes or newlines. - API key sourced from the operator field, falling back to the ANTHROPIC_API_KEY / OPENAI_API_KEY environment variable. - Appends a configurable output column (default "llm_response") and "llm_error" to the input schema. Shared utilities (operator/http/util/): - HttpClientFactory: lazy singleton java.net.http.HttpClient reused by all operators. - HttpMethod: enum with @JsonValue so the UI renders a dropdown. - KeyValuePair: Jackson-friendly header entry class. - TemplateInterpolator: ${fieldName} substitution from a Tuple. Frontend fix (result-panel cell click): - When clicking a cell in the result table, the modal now receives the table's row data as a fallback and displays it immediately, overwriting only if the paginated server lookup returns a non-empty tuple. Previously, clicking a cell when the paginated result service was not yet initialized produced a permanently blank modal because the request to fetch the full row never fired (?. short-circuit on undefined service). <!-- Thanks for sending a pull request (PR)! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: [Contributing to Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md) 2. Ensure you have added or run the appropriate tests for your PR 3. If the PR is work in progress, mark it a draft on GitHub. 4. Please write your PR title to summarize what this PR proposes, we are following Conventional Commits style for PR titles as well. 5. Be sure to keep the PR description updated to reflect all changes. ### Any related issues, documentation, discussions? Not that I'm aware of. For Hackathon ### How was this PR tested? Not tested at all, just for demo purposes ### Was this PR authored or co-authored using generative AI tooling? Yes, using Opus 4.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
