Hi Yi, Thanks for the great work on FLIP-582. I've been reading through the proposal and have a few questions regarding the failover/recovery semantics and resource efficiency of RpcOperator Service. I'd appreciate any clarification.
1. Cost of recomputation due to stateless design The proposal states that "RPC services do not participate in checkpointing." This means that upon any data-plane failure and checkpoint rollback, all in-flight RPC requests are lost and must be re-triggered from scratch. For workloads where a single RPC invocation is expensive (e.g., LLM inference taking seconds to minutes per request), this could lead to significant wasted computation. Has there been any consideration of a lightweight mechanism to avoid redundant recomputation — for example, a result cache on the service side keyed by request identity, or an optional "resumable request" protocol? Or is the position that this cost is acceptable given the target workload characteristics? 2. Cancelling in-flight RPC requests on data-plane failure When a data-plane operator fails and its Pipelined Region restarts, there may still be RPC requests in progress on the service side that were issued by the now-restarting data-plane tasks. Since those results will be discarded anyway (the data plane will re-issue them after recovery), continuing to process them wastes resources — particularly expensive GPU cycles. Is there a plan to provide an interface or mechanism for the data plane (or the framework) to proactively cancel outstanding RPC requests upon failure/restart? For example, a cancellation signal propagated from the RosClient to the RPC service, or a request-scoping mechanism tied to the checkpoint epoch, so that the service side can abort stale requests early. 3. Data-plane behavior when an RPC service subtask fails The proposal mentions that when an RPC service subtask fails, "requests are routed to other surviving instances." However, I'm curious about the behavior for requests that are already in-flight to the failed instance: - Does the data-plane operator have to wait for a timeout (as configured) before it knows the request has failed? - Is there any fast-failure notification mechanism (e.g., connection-closed detection) that allows the RosClient to immediately fail over in-flight requests to other instances without waiting for the full timeout? - If the service has only one instance (parallelism = 1) and it fails, does the data plane simply block (with backpressure) until the instance recovers, or does it fail fast with UNAVAILABLE errors? A fast-failure path would be important for reducing end-to-end latency impact during service-side failures. 4. Resource utilization concerns with separate SlotSharingGroup The proposal states that "each RPC service automatically creates a separate SlotSharingGroup" and that this is "fully compatible with Flink's existing K8s/YARN resource management." However, in Flink's current resource model, all TaskManagers share the same configuration template (determined by global taskmanager.memory.* and CPU settings). Fine-Grained Resource Management controls how slots are sized within a TM, but does not enable heterogeneous TM specifications across the cluster. This raises a practical concern: if an RpcOperator requires GPU resources, the TM configuration must include GPU allocation. Since all TMs share the same template, this means every TM in the cluster would be provisioned with GPUs — even those exclusively hosting data-plane tasks that have no GPU requirement. This leads to significant GPU resource waste. Furthermore, the separate SlotSharingGroup design prevents co-locating RpcOperator and data-plane tasks on the same TaskManager. In practice, co-location (e.g., RpcOperator using the GPU while data-plane tasks use the CPU on the same GPU node) would improve overall resource utilization. The current design prioritizes isolation at the cost of efficiency. Has there been consideration of: - Supporting heterogeneous TaskManager specifications (different TM templates for different SlotSharingGroups) to avoid provisioning GPU on CPU-only TMs?(As a reference, KubeRay's Worker Group model addresses exactly this problem for Ray clusters. In KubeRay, a RayCluster defines multiple workerGroupSpecs, each with its own pod template, resource configuration, node selectors, and independent autoscaling bounds) - Allowing optional co-location of RpcOperator and data-plane slots on the same TM (in separate slots) for scenarios where utilization is prioritized over strict process-level isolation? - Or is there a dependency on other sub-FLIPs under FLIP-577 to address this gap in GPU resource scheduling? Thanks in advance for the clarification. Looking forward to the continued discussion. Best regards Hongshun On Thu, May 28, 2026 at 4:40 PM Guowei Ma <[email protected]> wrote: > HI,YI > > One more question I'd like to raise is around how the service behaves when > it's referenced by more than one job. If two jobs are submitted within the > same application and both reference the same RpcOperator service, is that > service shared across them, and what is the expected behavior in that case? > I'm also curious how this plays out for bounded (batch) jobs in particular: > if one job completes while the other is still running, what does the > service lifecycle look like — is the service torn down as soon as the first > job finishes, and if so, how does that affect the job that's still running? > > Best, > Guowei > > > On Thu, May 28, 2026 at 2:55 PM Guowei Ma <[email protected]> wrote: > > > Hi,YI > > > > Thanks for putting together this FLIP — it's a well-thought-out proposal, > > and the RpcOperator Service is a really valuable addition for > AI/inference > > workloads on Flink. I have a few questions and suggestions: > > > > > > 1. Autoscaling > > How is scale-up/scale-down expressed in this design? Could you consider > > allowing concurrency to be configured as a (min, max) range? Currently it > > appears only min = max (i.e. fixed parallelism) is supported. > > > > > > 2. Async invocation & end-to-end examples > > The example shows a synchronous invocation style. I'd suggest also > > providing an asynchronous invocation example — or even just a scaffold > that > > exposes only the RpcOperator's handle and method. In addition, it would > be > > best to include a complete end-to-end example of invoking a model (CPU > > inference would be fine). > > > > > > 3. GPU as a resource > > GPU is surely a common/first-class resource. Accessing it here through an > > extend mechanism — wouldn't that mislead users? It would seem more > > natural to declare GPU as a standard resource rather than going through > an > > extension. > > > > > > 4. Serial method invocation > > Each method of an RpcOperator is invoked serially. I'd suggest stating > > this explicitly in both the documentation and the code comments, so that > > users — or an AI — can be clearly aware of it. > > > > > > 5. Why synchronous methods? > > Why is every method in RpcOperator a synchronous call? Does this imply > > that no model inference supports concurrent invocation? > > > > > > 6. SQL UDF usage > > How would SQL UDF users invoke this capability? I'd suggest providing a > > complete example for this as well. > > > > > > Thanks again for the great work! > > > > Best, > > Guowei > > > > > > On Wed, May 27, 2026 at 8:36 PM Charles Zhang <[email protected]> > > wrote: > > > >> Hi Yi, and Flink Community, > >> > >> Thanks for bringing up this excellent proposal. I am fully in favor of > >> FLIP-582. > >> > >> In our production workloads, especially regarding large-scale AI > >> inference, > >> the tight coupling of the data plane and compute units has always been a > >> major pain point. If an inference subtask fails due to a GPU OOM or > driver > >> issue, triggering a global rollback is incredibly expensive since model > >> reloading takes minutes. > >> > >> The introduction of the RpcOperator Service as a first-class primitive > >> masterfully decouples the heavy inference tasks from the mainstream > >> topology. The fault isolation, independent scaling, and stateless design > >> perfectly match the requirements of modern AI-oriented data processing. > >> > >> This is a clean and robust architecture. Looking forward to seeing this > >> merged! > >> > >> > >> Best wishes, > >> Charles Zhang > >> from Apache InLong > >> > >> > >> Yi Zhang <[email protected]> 于2026年5月27日周三 14:12写道: > >> > >> > Hi everyone, > >> > > >> > > >> > > >> > I would like to start a discussion on FLIP-582: Support RpcOperator > >> > Service [1]. > >> > > >> > > >> > AI-oriented workloads like multimodal data processing and model > >> inference > >> > are > >> > growing rapidly in recent years. These workloads are characterized by > >> > expensive > >> > resources (GPUs) and high initialization costs (seconds to minutes for > >> > model > >> > loading). In today's Flink, embedding them in the data plane couples > >> their > >> > parallelism and failover with surrounding operators; deploying them as > >> > external > >> > services disconnects their lifecycle from the job and doubles > >> operational > >> > overhead. > >> > > >> > > >> > This FLIP introduces RpcOperator Service — a framework-level primitive > >> > that runs > >> > user-defined compute as RPC services in an independent Pipelined > Region > >> > within > >> > the Flink job. Because the service is isolated at the scheduling > level, > >> it > >> > can achieve > >> > fault isolation, independent scaling, and dedicated resource > allocation. > >> > As a native > >> > Flink primitive, it also lays the foundation for automatic flow > control, > >> > flexible load > >> > balancing, and coordinated auto-scaling — all without introducing > >> external > >> > infrastructure or additional operational burden. > >> > > >> > > >> > > >> > > >> > Looking forward to your feedback and suggestions! > >> > > >> > > >> > > >> > > >> > [1] > >> > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-582%3A+Support+RpcOperator+Service > >> > > >> > > >> > > >> > > >> > > >> > Best Regards, > >> > Yi Zhang > >> > > >
