Jackie-Jiang commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1581631413


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -798,7 +798,6 @@ protected BrokerResponse handleRequest(long requestId, 
String query, @Nullable S
         _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED,
             1);
       }
-      brokerResponse.setPartialResult(isPartialResult(brokerResponse));

Review Comment:
   (minor) Also remove `isPartialResult()` method



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -419,4 +550,122 @@ public void send(BaseResultsBlock block)
       addResultsBlock(block);
     }
   }
+
+  public enum StatKey implements StatMap.Key {
+    TABLE(StatMap.Type.STRING),
+    EXECUTION_TIME_MS(StatMap.Type.LONG, null, 
DataTable.MetadataKey.TIME_USED_MS) {
+      @Override
+      public boolean includeDefaultInJson() {
+        return true;
+      }
+    },
+    EMITTED_ROWS(StatMap.Type.LONG, null, DataTable.MetadataKey.NUM_ROWS) {
+      @Override
+      public boolean includeDefaultInJson() {
+        return true;
+      }
+    },
+    NUM_DOCS_SCANNED(StatMap.Type.LONG),
+    NUM_ENTRIES_SCANNED_IN_FILTER(StatMap.Type.LONG),
+    NUM_ENTRIES_SCANNED_POST_FILTER(StatMap.Type.LONG),
+    NUM_SEGMENTS_QUERIED(StatMap.Type.INT),
+    NUM_SEGMENTS_PROCESSED(StatMap.Type.INT),
+    NUM_SEGMENTS_MATCHED(StatMap.Type.INT),
+    NUM_CONSUMING_SEGMENTS_QUERIED(StatMap.Type.INT),
+    // the timestamp indicating the freshness of the data queried in consuming 
segments.
+    // This can be ingestion timestamp if provided by the stream, or the last 
index time
+    MIN_CONSUMING_FRESHNESS_TIME_MS(StatMap.Type.LONG) {
+      @Override
+      public long merge(long value1, long value2) {
+        return StatMap.Key.minPositive(value1, value2);
+      }
+    },
+    TOTAL_DOCS(StatMap.Type.LONG),
+    NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
+    //TRACE_INFO(StatMap.Type.STRING),
+    //REQUEST_ID(StatMap.Type.LONG),
+    NUM_RESIZES(StatMap.Type.INT),
+    RESIZE_TIME_MS(StatMap.Type.LONG),
+    THREAD_CPU_TIME_NS(StatMap.Type.LONG),
+    SYSTEM_ACTIVITIES_CPU_TIME_NS(StatMap.Type.LONG),
+    RESPONSE_SER_CPU_TIME_NS(StatMap.Type.LONG, 
"responseSerializationCpuTimeNs"),
+    NUM_SEGMENTS_PRUNED_BY_SERVER(StatMap.Type.INT),
+    NUM_SEGMENTS_PRUNED_INVALID(StatMap.Type.INT),
+    NUM_SEGMENTS_PRUNED_BY_LIMIT(StatMap.Type.INT),
+    NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT),
+    //EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS(StatMap.Type.INT),
+    //EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS(StatMap.Type.INT),
+    NUM_CONSUMING_SEGMENTS_PROCESSED(StatMap.Type.INT),
+    NUM_CONSUMING_SEGMENTS_MATCHED(StatMap.Type.INT),
+    NUM_BLOCKS(StatMap.Type.INT),
+    OPERATOR_EXECUTION_TIME_MS(StatMap.Type.LONG),
+    OPERATOR_ID(StatMap.Type.STRING),
+    OPERATOR_EXEC_START_TIME_MS(StatMap.Type.LONG) {
+      @Override
+      public long merge(long value1, long value2) {
+        return StatMap.Key.minPositive(value1, value2);
+      }
+    },
+    OPERATOR_EXEC_END_TIME_MS(StatMap.Type.LONG) {
+      @Override
+      public long merge(long value1, long value2) {
+        return Math.max(value1, value2);
+      }
+    },;
+    private final StatMap.Type _type;
+    @Nullable
+    private final DataTable.MetadataKey _v1Key;
+    private final String _statName;
+
+    StatKey(StatMap.Type type) {
+      this(type, null);
+    }
+
+    StatKey(StatMap.Type type, @Nullable String statName) {
+      _type = type;
+      _statName = statName == null ? StatMap.getDefaultStatName(this) : 
statName;
+      _v1Key = DataTable.MetadataKey.getByName(getStatName());
+    }
+
+    StatKey(StatMap.Type type, @Nullable String statName, @Nullable 
DataTable.MetadataKey v1Key) {
+      _type = type;
+      _statName = statName == null ? StatMap.getDefaultStatName(this) : 
statName;
+      _v1Key = v1Key == null ? DataTable.MetadataKey.getByName(getStatName()) 
: v1Key;
+    }
+
+    @Override
+    public String getStatName() {
+      return _statName;
+    }
+
+    @Override
+    public StatMap.Type getType() {
+      return _type;
+    }
+
+    public void updateV1Metadata(StatMap<DataTable.MetadataKey> oldMetadata, 
StatMap<StatKey> stats) {

Review Comment:
   Here we maintained a map from v2 key to v1 key, and use v2 stats to fill the 
v1 stats, then use v1 stats to fill the broker response. IMO this will make 
future decoupling of v1 and v2 hard.
   Does it work if we do not fill the v1 stats, but directly use v2 stats to 
fill the broker response? I'm not sure if you did this to simplify the broker 
response V1 handling, but since we already decoupled broker response v1 and v2, 
should we also decouple this?
   
   The v1 stats key to v2 mapping is done through the switch case in 
`mergeExecutionStats()`. If we remove the mapping from v2 to v1, can we make v1 
stats not implementing `StatMap.Key`? The overall goal here is to decouple the 
v2 and v1 handling so that in the future we can remove v1 easier if necessary.



##########
pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java:
##########
@@ -123,6 +127,17 @@ String toJsonString()
    */
   long getMinConsumingFreshnessTimeMs();
 
+  /**
+   * Get the max number of rows seen by a single operator in the query 
processing chain.
+   * <p>
+   * In single-stage this value is equal to {@link 
#getNumEntriesScannedPostFilter()} because single-stage operators
+   * cannot generate more rows than the number of rows received. This is not 
the case in multi-stage operators, where
+   * operators like join or union can emit more rows than the ones received on 
each of its children operators.
+   */
+  default long getMaxRowsInOperator() {

Review Comment:
   I'd suggest not adding new stats on the top level.
   In the future we might want to re-design the UI for V2 stats, and all the V2 
stats should come from `StatMap`.
   If this stats is truly useful, we may revisit it in a separate PR.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -135,14 +160,128 @@ protected TransferableBlock getNextBlock()
     }
   }
 
-  private TransferableBlock constructMetadataBlock() {
-    // All data blocks have been returned. Record the stats and return EOS.
-    Map<String, String> executionStats = _executionStats;
+  private void mergeExecutionStats(@Nullable Map<String, String> 
executionStats) {
     if (executionStats != null) {
-      OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, 
getOperatorId());
-      operatorStats.recordExecutionStats(executionStats);
+      for (Map.Entry<String, String> entry : executionStats.entrySet()) {
+        DataTable.MetadataKey key = 
DataTable.MetadataKey.getByName(entry.getKey());
+        if (key == null) {
+          LOGGER.debug("Skipping unknown execution stat: {}", entry.getKey());
+          continue;
+        }
+        switch (key) {
+          case UNKNOWN:
+            LOGGER.debug("Skipping unknown execution stat: {}", 
entry.getKey());
+            break;
+          case TABLE:

Review Comment:
   Do we need table? Seems we are simply overriding it?
   I think we can remove it from v2 stats



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -135,14 +160,128 @@ protected TransferableBlock getNextBlock()
     }
   }
 
-  private TransferableBlock constructMetadataBlock() {
-    // All data blocks have been returned. Record the stats and return EOS.
-    Map<String, String> executionStats = _executionStats;
+  private void mergeExecutionStats(@Nullable Map<String, String> 
executionStats) {
     if (executionStats != null) {
-      OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, 
getOperatorId());
-      operatorStats.recordExecutionStats(executionStats);
+      for (Map.Entry<String, String> entry : executionStats.entrySet()) {
+        DataTable.MetadataKey key = 
DataTable.MetadataKey.getByName(entry.getKey());
+        if (key == null) {
+          LOGGER.debug("Skipping unknown execution stat: {}", entry.getKey());
+          continue;
+        }
+        switch (key) {
+          case UNKNOWN:
+            LOGGER.debug("Skipping unknown execution stat: {}", 
entry.getKey());
+            break;
+          case TABLE:

Review Comment:
   After removing `TABLE` and `OPERATOR_ID`, we might be able to remove 
`STRING` type. string stats doesn't make much sense



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -36,108 +39,129 @@
  */
 public class MetadataBlock extends BaseDataBlock {
 
-  private static final ObjectMapper JSON = new ObjectMapper();
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataBlock.class);
   @VisibleForTesting
-  static final int VERSION = 1;
+  static final int VERSION = 2;
+  @Nullable
+  private List<ByteBuffer> _statsByStage;
 
-  public enum MetadataBlockType {
-    /**
-     * Indicates that this block is the final block to be sent
-     * (End Of Stream) as part of an operator chain computation.
-     */
-    EOS,
+  private final MetadataBlockType _type;
 
-    /**
-     * An {@code ERROR} metadata block indicates that there was
-     * some error during computation. To retrieve the error that
-     * occurred, use {@link MetadataBlock#getExceptions()}
-     */
-    ERROR
+  public MetadataBlock(MetadataBlockType type) {
+    this(type, Collections.emptyList());
   }
 
-  /**
-   * Used to serialize the contents of the metadata block conveniently and in
-   * a backwards compatible way. Use JSON because the performance of metadata 
block
-   * SerDe should not be a bottleneck.
-   */
-  @JsonIgnoreProperties(ignoreUnknown = true)
-  @VisibleForTesting
-  static class Contents {
-
-    private String _type;
-    private Map<String, String> _stats;
-
-    @JsonCreator
-    public Contents(@JsonProperty("type") String type, @JsonProperty("stats") 
Map<String, String> stats) {
-      _type = type;
-      _stats = stats;
-    }
-
-    @JsonCreator
-    public Contents() {
-      this(null, new HashMap<>());
-    }
-
-    public String getType() {
-      return _type;
-    }
+  public MetadataBlock(MetadataBlockType type, List<ByteBuffer> statsByStage) {
+    super(0, null, new String[0], new byte[]{(byte) (type.ordinal() & 0xFF)}, 
new byte[0]);
+    _type = type;
+    _statsByStage = statsByStage;
+  }
 
-    public void setType(String type) {
-      _type = type;
+  MetadataBlock(ByteBuffer byteBuffer)
+      throws IOException {
+    super(byteBuffer);
+    // Remember: At this point deserializeMetadata is already being called.
+    if (_fixedSizeDataBytes == null) {
+       if (_errCodeToExceptionMap.isEmpty()) {
+         _type = MetadataBlockType.EOS;
+       } else {
+         _type = MetadataBlockType.ERROR;
+       }
+    } else {
+      _type = MetadataBlockType.values()[_fixedSizeDataBytes[0]];
     }
+  }
 
-    public Map<String, String> getStats() {
-      return _stats;
+  @Override
+  protected void serializeMetadata(DataOutputStream output)
+      throws IOException {
+    if (_statsByStage == null) {
+      output.writeInt(0);
+      return;
     }
-
-    public void setStats(Map<String, String> stats) {
-      _stats = stats;
+    int size = _statsByStage.size();
+    output.writeInt(size);
+    if (size > 0) {
+      byte[] bytes = new byte[4096];
+      for (ByteBuffer stat : _statsByStage) {
+        if (stat == null) {
+          output.writeBoolean(false);
+        } else {
+          output.writeBoolean(true);
+          output.writeInt(stat.remaining());
+          ByteBuffer duplicate = stat.duplicate();
+          while (duplicate.hasRemaining()) {
+            int length = Math.min(duplicate.remaining(), bytes.length);
+            duplicate.get(bytes, 0, length);
+            output.write(bytes, 0, length);
+          }
+        }
+      }
     }
   }
 
-  private final Contents _contents;
-
-  public MetadataBlock(MetadataBlockType type) {
-    this(type, new HashMap<>());
-  }
-
-  public MetadataBlock(MetadataBlockType type, Map<String, String> stats) {
-    super(0, null, new String[0], new byte[]{0}, toContents(new 
Contents(type.name(), stats)));
-    _contents = new Contents(type.name(), stats);
-  }
-
-  private static byte[] toContents(Contents type) {
-    try {
-      return JSON.writeValueAsBytes(type);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
+  public static MetadataBlock deserialize(ByteBuffer byteBuffer, int version)
+      throws IOException {
+    switch (version) {
+      case 1: {
+        V1MetadataBlock decoded = new V1MetadataBlock(byteBuffer);

Review Comment:
   I'm still confused. Is this the only place where v1 metadata block is used? 
Does it handle error differently from v2 metadata block?
   Does it work if we ignore the version and always `return new 
MetadataBlock(byteBuffer);`?
   If the error handling is different, then sending v2 metadata block to old 
server will cause backward incompatibility right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to