yashmayya commented on code in PR #18316:
URL: https://github.com/apache/pinot/pull/18316#discussion_r3140679183


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java:
##########
@@ -142,7 +142,28 @@ protected BrokerResponseNative processBrokerRequest(long 
requestId, BrokerReques
         serversNotResponded.add(entry.getKey());
       }
     }
-    int numServersResponded = dataTableMap.size();
+    return new ScatterResult(dataTableMap, serversNotResponded, 
totalResponseSize, timedOut,
+        asyncQueryResponse.getException());
+  }
+
+  /**
+   * Executes the reduce step on the given dataTableMap and populates the 
response with scatter stats.
+   * Subclasses may pass a dataTableMap that differs from 
scatterResult.getDataTableMap() (e.g., with
+   * additional cached DataTables merged in), while scatterResult is always 
used for server stats so
+   * that cached entries are never counted as real servers queried.
+   */
+  protected BrokerResponseNative doReduce(BrokerRequest originalBrokerRequest, 
BrokerRequest serverBrokerRequest,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, ScatterResult 
scatterResult,

Review Comment:
   `ScatterResult` containing a `Map<ServerRoutingInstance, DataTable>` and 
also passing in a separate data table map here feels like a major footgun 
inviting confusion and future bugs. Can we avoid it somehow by merging 
beforehand and consolidating?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java:
##########
@@ -114,13 +127,42 @@ protected BrokerResponseNative processBrokerRequest(long 
requestId, BrokerReques
       sendRequest(requestId, TableType.REALTIME, realtimeBrokerRequest, 
realtimeRoutingTable, responseMap,
           requestContext.isSampledRequest());
     }
+    return responseMap;
+  }
+
+  /**
+   * Executes the reduce step on the given responseMap.
+   * Subclasses may pass a responseMap that differs from the live scatter 
result (e.g., with cached
+   * entries injected as synthetic streaming iterators).
+   */
+  protected BrokerResponseNative doReduce(BrokerRequest originalBrokerRequest,
+      Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap, 
long timeoutMs)
+      throws Exception {
     long reduceStartTimeNs = System.nanoTime();
     BrokerResponseNative brokerResponse =
         _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, 
responseMap, timeoutMs, _brokerMetrics);
     
brokerResponse.setBrokerReduceTimeMs(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
 - reduceStartTimeNs));
     return brokerResponse;
   }
 
+  /**
+   * Wraps a {@link DataTable} as a single-element {@code 
Iterator<Server.ServerResponse>}.
+   * This allows cached DataTables to be injected into the streaming reduce 
pipeline without
+   * any changes to {@link StreamingReduceService}: it deserialises each
+   * {@code ServerResponse.payload} via {@code 
DataTableFactory.getDataTable()}, so round-tripping
+   * through {@code DataTable.toBytes()} / {@code ByteString} is sufficient.
+   *
+   * @throws java.io.IOException if the DataTable cannot be serialised; 
callers should record a
+   *     processing exception on the broker response rather than silently 
dropping the cached entry.
+   */
+  protected static Iterator<Server.ServerResponse> 
dataTableToStreamingIterator(DataTable dataTable)

Review Comment:
   What's the purpose of this method exactly? It's not being used anywhere here 
and `protected static` seems odd. Should this live elsewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to