xiangfu0 commented on code in PR #18458: URL: https://github.com/apache/pinot/pull/18458#discussion_r3226305020
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/streaming/StreamingDispatchObserver.java: ########## @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.service.dispatch.streaming; + +import io.grpc.stub.StreamObserver; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import javax.annotation.Nullable; +import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.query.routing.QueryServerInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Broker-side gRPC client {@code StreamObserver} for the {@code SubmitWithStream} bidi RPC. Routes inbound + * {@link Worker.ServerToBroker} messages to either the {@code submit_ack} callback (first message) or the + * {@link StreamingQuerySession} (subsequent {@code OpChainComplete}s), and adapts the inbound side of the stream + * (the {@link StreamObserver} we use to send {@code BrokerToServer} messages) into a + * {@link StreamingServerHandle} so the session can fan out cancel. + * + * <p>Lifecycle — created when the broker opens a {@code SubmitWithStream} call to one server, then registered with + * the session via {@link StreamingQuerySession#registerStream}. Receives: + * <ol> + * <li>Exactly one {@code submit_ack} (always the first server→broker message).</li> + * <li>Zero or more {@code OpChainComplete} messages (one per opchain that ran on this server).</li> + * <li>Exactly one {@code ServerDone} after the last opchain has reported.</li> + * </ol> + * + * <p>On {@code onError} or unexpected message order, drains the latch by {@code remainingExpected} via the session + * so {@link StreamingQuerySession#awaitCompletion} can still finalize. + */ +public class StreamingDispatchObserver + implements StreamObserver<Worker.ServerToBroker>, StreamingServerHandle { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamingDispatchObserver.class); + + private final QueryServerInstance _server; + private final StreamingQuerySession _session; + /// Receives the submit-ack response or a failure throwable. Called exactly once per call (either with response on + /// the first ServerToBroker.submit_ack, or with an error if the stream breaks before the ack arrives). + private final BiConsumer<Worker.QueryResponse, Throwable> _ackCallback; + private final int _expectedOpChainsForThisServer; + private final AtomicBoolean _ackReceived = new AtomicBoolean(false); + + /** + * Counts how many opchains we've already drained from the session for this server, so an onError doesn't + * double-drain after some opchains already reported successfully. + */ + private int _opChainsReportedForThisServer = 0; + + /** + * The inbound side of the bidi stream — used to send {@code submit} (initial) and {@code cancel} (fan-out) from + * the broker. Set once via {@link #attachOutboundStream} after the gRPC stub is asked to start the stream; lives + * for the duration of the call. + */ + private volatile StreamObserver<Worker.BrokerToServer> _outbound; + + public StreamingDispatchObserver(QueryServerInstance server, StreamingQuerySession session, + int expectedOpChainsForThisServer, BiConsumer<Worker.QueryResponse, Throwable> ackCallback) { + _server = server; + _session = session; + _expectedOpChainsForThisServer = expectedOpChainsForThisServer; + _ackCallback = ackCallback; + } + + public QueryServerInstance getServer() { + return _server; + } + + /** Wires the outbound side of the bidi stream once the gRPC stub returns it. */ + public void attachOutboundStream(StreamObserver<Worker.BrokerToServer> outbound) { + _outbound = outbound; + } + + /** Sends the initial {@code BrokerToServer.submit} message on the outbound side. */ + public void sendSubmit(Worker.QueryRequest request) { + StreamObserver<Worker.BrokerToServer> outbound = _outbound; + if (outbound == null) { + throw new IllegalStateException("attachOutboundStream must be called before sendSubmit"); + } + outbound.onNext(Worker.BrokerToServer.newBuilder().setSubmit(request).build()); + } + + @Override + public void onNext(Worker.ServerToBroker message) { + switch (message.getPayloadCase()) { + case SUBMIT_ACK: + if (_ackReceived.compareAndSet(false, true)) { + _ackCallback.accept(message.getSubmitAck(), null); + } else { + LOGGER.warn("Ignoring duplicate submit_ack from {}", _server); + } + break; + case OPCHAIN: + _session.recordOpChainComplete(message.getOpchain()); + _opChainsReportedForThisServer++; + break; + case DONE: + _session.unregisterStream(this); Review Comment: If the server sends `submit_ack(STATUS_ERROR)` and then `ServerDone` before any opchain starts, this branch only unregisters the stream and never drains the remaining expected count. `tryRecoverWithStream()` will then block in `session.awaitCompletion(...)` until the full query deadline instead of returning the submission error promptly. Please decrement the outstanding count here (or in `onCompleted()`) for any opchains this server still owed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
