yashmayya commented on code in PR #18621:
URL: https://github.com/apache/pinot/pull/18621#discussion_r3327048559


##########
pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java:
##########
@@ -156,11 +156,15 @@ enum MetadataKey {
     WORKLOAD_NAME(40, "workloadName", MetadataValueType.STRING),
     // Needed so that we can track query id in Netty channel response.
     QUERY_ID(41, "queryId", MetadataValueType.STRING),
-    EARLY_TERMINATION_REASON(42, "earlyTerminationReason", 
MetadataValueType.STRING);
+    EARLY_TERMINATION_REASON(42, "earlyTerminationReason", 
MetadataValueType.STRING),
+    // Set on a merged-only DataTable when one or more input server DataTables 
were dropped during the
+    // merge (e.g., due to a schema conflict). Signals to a downstream 
consumer that the merge is
+    // partial; how to react (skip, retry, accept with annotation) is the 
consumer's policy.
+    PARTIAL_INTERMEDIATE_RESULT(43, "partialIntermediateResult", 
MetadataValueType.STRING);

Review Comment:
   This looks a bit odd, aren't all the intermediate results partial? Should it 
be something like `INCOMPLETE_MERGE` instead?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java:
##########
@@ -346,4 +348,107 @@ private void withNotNullLongMetadata(Map<String, String> 
metadata, DataTable.Met
       consumer.accept(Long.parseLong(strValue));
     }
   }
+
+  /**
+   * Writes the accumulated execution stats onto the given DataTable's 
metadata (and exception map),
+   * so a merged-only DataTable can be re-injected into the regular reduce 
path with the same
+   * downstream totals as a direct reduce of the original inputs would have 
produced.
+   *
+   * <p>Unlike {@link #setStats(String, BrokerResponseNative, BrokerMetrics)}, 
this method does NOT
+   * bump broker meters or timers. The merge-only path is expected to run off 
the request-serving
+   * path; meter increments fire when the result is eventually re-reduced.
+   *
+   * <p>Limitations of the round-trip via DataTable metadata:
+   * <ul>
+   *   <li>CPU and memory stats round-trip as a single combined value per key
+   *       ({@link DataTable.MetadataKey#THREAD_CPU_TIME_NS}, etc.) because 
the wire format has no
+   *       per-tableType keys. In the standard reduce path the aggregator 
attributes each server's
+   *       value to offline vs realtime based on {@code 
routingInstance.getTableType()} and surfaces
+   *       them as separate fields on {@link BrokerResponseNative}; on a 
re-reduce of the merged
+   *       DataTable the whole combined value lands in one bucket — whichever 
tableType the caller
+   *       assigned to the synthetic server response. So the per-tableType 
split visible on
+   *       BrokerResponse is lost across the round-trip, even though the total 
is preserved.
+   *   <li>Per-server exceptions are written via {@link 
DataTable#addException(int, String)} which
+   *       backs a {@code Map<Integer, String>} keyed by error code; if two 
inputs reported the
+   *       same error code the merged DataTable carries last-write-wins for 
the message.
+   *   <li>Per-server trace info is JSON-encoded into a single
+   *       {@link DataTable.MetadataKey#TRACE_INFO} entry; the downstream 
aggregator reads it back
+   *       as one trace blob attributed to the synthetic server.
+   * </ul>
+   */
+  public void setStatsOnMergedDataTable(DataTable dataTable) {

Review Comment:
   Is this dropping `EARLY_TERMINATION_REASON` for distinct queries that was 
added recently?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java:
##########
@@ -346,4 +348,107 @@ private void withNotNullLongMetadata(Map<String, String> 
metadata, DataTable.Met
       consumer.accept(Long.parseLong(strValue));
     }
   }
+
+  /**
+   * Writes the accumulated execution stats onto the given DataTable's 
metadata (and exception map),
+   * so a merged-only DataTable can be re-injected into the regular reduce 
path with the same
+   * downstream totals as a direct reduce of the original inputs would have 
produced.
+   *
+   * <p>Unlike {@link #setStats(String, BrokerResponseNative, BrokerMetrics)}, 
this method does NOT
+   * bump broker meters or timers. The merge-only path is expected to run off 
the request-serving
+   * path; meter increments fire when the result is eventually re-reduced.
+   *
+   * <p>Limitations of the round-trip via DataTable metadata:
+   * <ul>
+   *   <li>CPU and memory stats round-trip as a single combined value per key
+   *       ({@link DataTable.MetadataKey#THREAD_CPU_TIME_NS}, etc.) because 
the wire format has no
+   *       per-tableType keys. In the standard reduce path the aggregator 
attributes each server's
+   *       value to offline vs realtime based on {@code 
routingInstance.getTableType()} and surfaces
+   *       them as separate fields on {@link BrokerResponseNative}; on a 
re-reduce of the merged
+   *       DataTable the whole combined value lands in one bucket — whichever 
tableType the caller
+   *       assigned to the synthetic server response. So the per-tableType 
split visible on
+   *       BrokerResponse is lost across the round-trip, even though the total 
is preserved.
+   *   <li>Per-server exceptions are written via {@link 
DataTable#addException(int, String)} which
+   *       backs a {@code Map<Integer, String>} keyed by error code; if two 
inputs reported the
+   *       same error code the merged DataTable carries last-write-wins for 
the message.
+   *   <li>Per-server trace info is JSON-encoded into a single
+   *       {@link DataTable.MetadataKey#TRACE_INFO} entry; the downstream 
aggregator reads it back
+   *       as one trace blob attributed to the synthetic server.
+   * </ul>
+   */
+  public void setStatsOnMergedDataTable(DataTable dataTable) {

Review Comment:
   This is also going to be prone to becoming stale in the future - I'd prefer 
avoiding this pattern altogether. Maybe we can just carry the execution stats 
aggregate alongside the data table? Another option is to collapse the two paths 
(this and setStats) into one by refactoring it into a pattern like 
`forEachStat((MetadataKey key, long value) -> …)` that both consumers drive? 
One writes to the response, the other to metadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to