navina opened a new pull request, #18621:
URL: https://github.com/apache/pinot/pull/18621

   ## Summary
   
   Adds a merge-only reduction surface that performs the cross-server merge step
   **without** finalizing — producing a single intermediate `DataTable` that
   carries non-finalized aggregation state, byte-shape identical to a single
   server's partial response. A downstream consumer can intercept this merged
   intermediate, custom-handle it, and later re-inject it through the standard
   reduce path for finalization. The read path is unchanged; tests verify a
   re-injected intermediate finalizes to the same result as a direct reduce.
   
   No caller is wired in by this change — this is the API surface only.
   
   ## API
   
   ```java
   // DataTableReducer (new default method, throws by default)
   default DataTable mergeDataTablesOnly(String tableName, DataSchema 
dataSchema,
       Map<ServerRoutingInstance, DataTable> dataTableMap,
       DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics);
   
   // BrokerReduceService (merge-only counterpart of reduceOnDataTable)
   @Nullable
   public DataTable mergeOnDataTable(BrokerRequest serverBrokerRequest,
       Map<ServerRoutingInstance, DataTable> dataTableMap, long reduceTimeOutMs,
       BrokerMetrics brokerMetrics);
   ```
   
   ### Supported reducers
   - `AggregationDataTableReducer`, `GroupByDataTableReducer`,
     `DistinctDataTableReducer` — implement `mergeDataTablesOnly`.
   - `SelectionDataTableReducer`, `ExplainPlanDataTableReducer` — inherit the
     default-throwing implementation (out of scope).
   - Aggregation and group-by additionally throw `UnsupportedOperationException`
     when `_queryContext.isServerReturnFinalResult()` is true — inputs are then
     finalized rather than intermediate, so the contract can't be honored.
   
   ### Surfaced on the merged DataTable's metadata
   - Group-by completeness: `GROUPS_TRIMMED`, `NUM_GROUPS_LIMIT_REACHED`.
   - New `PARTIAL_INTERMEDIATE_RESULT` flag — set when one or more input
     DataTables are dropped during merge (e.g. schema conflict). The
     `BrokerMeter.RESPONSE_MERGE_EXCEPTIONS` meter is also bumped, symmetric
     with how the regular reduce path surfaces conflicting-schema servers.
   - Execution stats round-trip via the new
     `ExecutionStatsAggregator#setStatsOnMergedDataTable`: additive longs
     (`numDocsScanned`, `numSegments*`, `threadCpuTimeNs`, ...), MIN-reduced
     `minConsumingFreshnessTimeMs`, boolean flags, per-server exceptions, and
     JSON-encoded trace info (when `trace=true`). Method does **not** bump
     broker meters/timers (the merge path is expected to run off the
     request-serving path; metric increments fire on eventual re-reduce).
   
   ### Limitations of the round-trip
   - CPU/mem stats lose the offline-vs-realtime split visible on
     `BrokerResponseNative` — wire format has only combined keys, so a
     re-reduce dumps the whole value into one bucket based on the synthetic
     server's tableType.
   - Exception attribution to original servers is lost; collisions on the
     same error code are last-write-wins (wire format is `Map<Integer, 
String>`).
   - Per-server trace info is JSON-encoded into one `TRACE_INFO` entry; the
     downstream aggregator reads it back as one trace blob under the synthetic
     server's name.
   
   ## Server-side consolidations
   
   Two helpers extracted to avoid duplication between the existing server-side
   serialization and the new broker-side merge-only path:
   - `AggregationFunctionUtils.setIntermediateResult` (now also used by
     `AggregationResultsBlock`).
   - `DataTableBuilderUtils.setColumn` (now also used by `GroupByResultsBlock`).
   
   These are behavior-preserving — private methods promoted to shared statics
   with identical bodies.
   
   ## Wire format
   
   Adds one new `DataTable.MetadataKey` entry (`PARTIAL_INTERMEDIATE_RESULT`,
   id=43). Older readers ignore unknown metadata keys, so this is
   forward-compatible.
   
   ## Test plan
   
   - [x] `MergeDataTablesOnlyTest` (23 tests): round-trip equivalence for
         Aggregation/GroupBy/Distinct (including OBJECT-column DISTINCTCOUNT),
         `LIMIT 0` group-by round-trip, intermediate-schema preservation,
         conflicting-schema partial-merge flag, `serverReturnFinalResult`
         rejection, additive stats end-to-end (`numDocsScanned` survives
         round-trip), MIN freshness reduction, exception/trace propagation,
         null-handling round-trip, selection-throws, empty/no-input null 
returns.
   - [x] `./mvnw -pl pinot-core test-compile`, spotless, checkstyle, license 
all clean.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to