navina commented on code in PR #18316:
URL: https://github.com/apache/pinot/pull/18316#discussion_r3140722190
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java:
##########
@@ -114,13 +127,42 @@ protected BrokerResponseNative processBrokerRequest(long
requestId, BrokerReques
sendRequest(requestId, TableType.REALTIME, realtimeBrokerRequest,
realtimeRoutingTable, responseMap,
requestContext.isSampledRequest());
}
+ return responseMap;
+ }
+
+ /**
+ * Executes the reduce step on the given responseMap.
+ * Subclasses may pass a responseMap that differs from the live scatter
result (e.g., with cached
+ * entries injected as synthetic streaming iterators).
+ */
+ protected BrokerResponseNative doReduce(BrokerRequest originalBrokerRequest,
+ Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap,
long timeoutMs)
+ throws Exception {
long reduceStartTimeNs = System.nanoTime();
BrokerResponseNative brokerResponse =
_streamingReduceService.reduceOnStreamResponse(originalBrokerRequest,
responseMap, timeoutMs, _brokerMetrics);
brokerResponse.setBrokerReduceTimeMs(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- reduceStartTimeNs));
return brokerResponse;
}
+ /**
+ * Wraps a {@link DataTable} as a single-element {@code
Iterator<Server.ServerResponse>}.
+ * This allows cached DataTables to be injected into the streaming reduce
pipeline without
+ * any changes to {@link StreamingReduceService}: it deserialises each
+ * {@code ServerResponse.payload} via {@code
DataTableFactory.getDataTable()}, so round-tripping
+ * through {@code DataTable.toBytes()} / {@code ByteString} is sufficient.
+ *
+ * @throws java.io.IOException if the DataTable cannot be serialised;
callers should record a
+ * processing exception on the broker response rather than silently
dropping the cached entry.
+ */
+ protected static Iterator<Server.ServerResponse>
dataTableToStreamingIterator(DataTable dataTable)
Review Comment:
this is probably not needed here. Let me clean up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]