yashmayya commented on code in PR #18316:
URL: https://github.com/apache/pinot/pull/18316#discussion_r3140679183
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java:
##########
@@ -142,7 +142,28 @@ protected BrokerResponseNative processBrokerRequest(long
requestId, BrokerReques
serversNotResponded.add(entry.getKey());
}
}
- int numServersResponded = dataTableMap.size();
+ return new ScatterResult(dataTableMap, serversNotResponded,
totalResponseSize, timedOut,
+ asyncQueryResponse.getException());
+ }
+
+ /**
+ * Executes the reduce step on the given dataTableMap and populates the
response with scatter stats.
+ * Subclasses may pass a dataTableMap that differs from
scatterResult.getDataTableMap() (e.g., with
+ * additional cached DataTables merged in), while scatterResult is always
used for server stats so
+ * that cached entries are never counted as real servers queried.
+ */
+ protected BrokerResponseNative doReduce(BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest,
+ Map<ServerRoutingInstance, DataTable> dataTableMap, ScatterResult
scatterResult,
Review Comment:
`ScatterResult` containing a `Map<ServerRoutingInstance, DataTable>` and
also passing in a separate data table map here feels like a major footgun
inviting confusion and future bugs. Can we avoid it somehow by merging
beforehand and consolidating?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java:
##########
@@ -114,13 +127,42 @@ protected BrokerResponseNative processBrokerRequest(long
requestId, BrokerReques
sendRequest(requestId, TableType.REALTIME, realtimeBrokerRequest,
realtimeRoutingTable, responseMap,
requestContext.isSampledRequest());
}
+ return responseMap;
+ }
+
+ /**
+ * Executes the reduce step on the given responseMap.
+ * Subclasses may pass a responseMap that differs from the live scatter
result (e.g., with cached
+ * entries injected as synthetic streaming iterators).
+ */
+ protected BrokerResponseNative doReduce(BrokerRequest originalBrokerRequest,
+ Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap,
long timeoutMs)
+ throws Exception {
long reduceStartTimeNs = System.nanoTime();
BrokerResponseNative brokerResponse =
_streamingReduceService.reduceOnStreamResponse(originalBrokerRequest,
responseMap, timeoutMs, _brokerMetrics);
brokerResponse.setBrokerReduceTimeMs(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- reduceStartTimeNs));
return brokerResponse;
}
+ /**
+ * Wraps a {@link DataTable} as a single-element {@code
Iterator<Server.ServerResponse>}.
+ * This allows cached DataTables to be injected into the streaming reduce
pipeline without
+ * any changes to {@link StreamingReduceService}: it deserialises each
+ * {@code ServerResponse.payload} via {@code
DataTableFactory.getDataTable()}, so round-tripping
+ * through {@code DataTable.toBytes()} / {@code ByteString} is sufficient.
+ *
+ * @throws java.io.IOException if the DataTable cannot be serialised;
callers should record a
+ * processing exception on the broker response rather than silently
dropping the cached entry.
+ */
+ protected static Iterator<Server.ServerResponse>
dataTableToStreamingIterator(DataTable dataTable)
Review Comment:
What's the purpose of this method exactly? It's not being used anywhere here
and `protected static` seems odd. Should this live elsewhere?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]