gortiz commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1587305238


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -36,108 +39,129 @@
  */
 public class MetadataBlock extends BaseDataBlock {
 
-  private static final ObjectMapper JSON = new ObjectMapper();
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataBlock.class);
   @VisibleForTesting
-  static final int VERSION = 1;
+  static final int VERSION = 2;
+  @Nullable
+  private List<ByteBuffer> _statsByStage;
 
-  public enum MetadataBlockType {
-    /**
-     * Indicates that this block is the final block to be sent
-     * (End Of Stream) as part of an operator chain computation.
-     */
-    EOS,
+  private final MetadataBlockType _type;
 
-    /**
-     * An {@code ERROR} metadata block indicates that there was
-     * some error during computation. To retrieve the error that
-     * occurred, use {@link MetadataBlock#getExceptions()}
-     */
-    ERROR
+  public MetadataBlock(MetadataBlockType type) {
+    this(type, Collections.emptyList());
   }
 
-  /**
-   * Used to serialize the contents of the metadata block conveniently and in
-   * a backwards compatible way. Use JSON because the performance of metadata 
block
-   * SerDe should not be a bottleneck.
-   */
-  @JsonIgnoreProperties(ignoreUnknown = true)
-  @VisibleForTesting
-  static class Contents {
-
-    private String _type;
-    private Map<String, String> _stats;
-
-    @JsonCreator
-    public Contents(@JsonProperty("type") String type, @JsonProperty("stats") 
Map<String, String> stats) {
-      _type = type;
-      _stats = stats;
-    }
-
-    @JsonCreator
-    public Contents() {
-      this(null, new HashMap<>());
-    }
-
-    public String getType() {
-      return _type;
-    }
+  public MetadataBlock(MetadataBlockType type, List<ByteBuffer> statsByStage) {
+    super(0, null, new String[0], new byte[]{(byte) (type.ordinal() & 0xFF)}, 
new byte[0]);
+    _type = type;
+    _statsByStage = statsByStage;
+  }
 
-    public void setType(String type) {
-      _type = type;
+  MetadataBlock(ByteBuffer byteBuffer)
+      throws IOException {
+    super(byteBuffer);
+    // Remember: At this point deserializeMetadata is already being called.
+    if (_fixedSizeDataBytes == null) {
+       if (_errCodeToExceptionMap.isEmpty()) {
+         _type = MetadataBlockType.EOS;
+       } else {
+         _type = MetadataBlockType.ERROR;
+       }
+    } else {
+      _type = MetadataBlockType.values()[_fixedSizeDataBytes[0]];
     }
+  }
 
-    public Map<String, String> getStats() {
-      return _stats;
+  @Override
+  protected void serializeMetadata(DataOutputStream output)
+      throws IOException {
+    if (_statsByStage == null) {
+      output.writeInt(0);
+      return;
     }
-
-    public void setStats(Map<String, String> stats) {
-      _stats = stats;
+    int size = _statsByStage.size();
+    output.writeInt(size);
+    if (size > 0) {
+      byte[] bytes = new byte[4096];
+      for (ByteBuffer stat : _statsByStage) {
+        if (stat == null) {
+          output.writeBoolean(false);
+        } else {
+          output.writeBoolean(true);
+          output.writeInt(stat.remaining());
+          ByteBuffer duplicate = stat.duplicate();
+          while (duplicate.hasRemaining()) {
+            int length = Math.min(duplicate.remaining(), bytes.length);
+            duplicate.get(bytes, 0, length);
+            output.write(bytes, 0, length);
+          }
+        }
+      }
     }
   }
 
-  private final Contents _contents;
-
-  public MetadataBlock(MetadataBlockType type) {
-    this(type, new HashMap<>());
-  }
-
-  public MetadataBlock(MetadataBlockType type, Map<String, String> stats) {
-    super(0, null, new String[0], new byte[]{0}, toContents(new 
Contents(type.name(), stats)));
-    _contents = new Contents(type.name(), stats);
-  }
-
-  private static byte[] toContents(Contents type) {
-    try {
-      return JSON.writeValueAsBytes(type);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
+  public static MetadataBlock deserialize(ByteBuffer byteBuffer, int version)
+      throws IOException {
+    switch (version) {
+      case 1: {
+        V1MetadataBlock decoded = new V1MetadataBlock(byteBuffer);

Review Comment:
   > Let's list down the behavior on reading v1 bytes as v2 and also reading v2 
bytes as v1 so that user knows what to expect when upgrading.
   
   The compatibility matrix is verified by tests. I've just renamed the name of 
the tests and I'm adding here the test table to show which tests verifies each 
scenario:
   
   | input version | input type | input content | possible? | read in version | 
output type | output content |                   tested by                   |
   
|:-------------:|:----------:|:-------------:|:---------:|:---------------:|:-----------:|:--------------:|:---------------------------------------------:|
   |      v1       |    eos     |     empty     |    yes    |       v2        | 
    eos     |     empty      |  v1EosWithStatsIsDecodedAsV2EosWithoutStats   |
   |      v1       |    eos     |   not empty   |    yes    |       v2        | 
    eos     |     empty      | v1EosWithoutStatsIsDecodedAsV2EosWithoutStats |
   |      v1       |   error    |     empty     |    no     |        -        | 
     -      |       -        |                       -                       |
   |      v1       |   error    |   not empty   |    yes    |       v2        | 
   error    | same as input  |  v1ErrorIsDecodedAsV2ErrorWithSameExceptions  |
   |      v2       |    eos     |     empty     |    yes    |       v1        | 
    eos     |     empty      | v2EosWithoutStatsIsReadInV1AsEosWithoutStats  |
   |      v2       |    eos     |   not empty   |    yes    |       v1        | 
    eos     |     empty      |   v2EosWithStatsIsReadInV1AsEosWithoutStats   |
   |      v2       |   error    |     empty     |    no     |        -        | 
     -      |       -        |                       -                       |
   |      v2       |   error    |   not empty   |    yes    |       v1        | 
   error    | same as input  |  v2ErrorIsReadInV1AsErrorWithSameExceptions   |
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to