gortiz commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1583030085


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -36,108 +39,129 @@
  */
 public class MetadataBlock extends BaseDataBlock {
 
-  private static final ObjectMapper JSON = new ObjectMapper();
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataBlock.class);
   @VisibleForTesting
-  static final int VERSION = 1;
+  static final int VERSION = 2;
+  @Nullable
+  private List<ByteBuffer> _statsByStage;
 
-  public enum MetadataBlockType {
-    /**
-     * Indicates that this block is the final block to be sent
-     * (End Of Stream) as part of an operator chain computation.
-     */
-    EOS,
+  private final MetadataBlockType _type;
 
-    /**
-     * An {@code ERROR} metadata block indicates that there was
-     * some error during computation. To retrieve the error that
-     * occurred, use {@link MetadataBlock#getExceptions()}
-     */
-    ERROR
+  public MetadataBlock(MetadataBlockType type) {
+    this(type, Collections.emptyList());
   }
 
-  /**
-   * Used to serialize the contents of the metadata block conveniently and in
-   * a backwards compatible way. Use JSON because the performance of metadata 
block
-   * SerDe should not be a bottleneck.
-   */
-  @JsonIgnoreProperties(ignoreUnknown = true)
-  @VisibleForTesting
-  static class Contents {
-
-    private String _type;
-    private Map<String, String> _stats;
-
-    @JsonCreator
-    public Contents(@JsonProperty("type") String type, @JsonProperty("stats") 
Map<String, String> stats) {
-      _type = type;
-      _stats = stats;
-    }
-
-    @JsonCreator
-    public Contents() {
-      this(null, new HashMap<>());
-    }
-
-    public String getType() {
-      return _type;
-    }
+  public MetadataBlock(MetadataBlockType type, List<ByteBuffer> statsByStage) {
+    super(0, null, new String[0], new byte[]{(byte) (type.ordinal() & 0xFF)}, 
new byte[0]);
+    _type = type;
+    _statsByStage = statsByStage;
+  }
 
-    public void setType(String type) {
-      _type = type;
+  MetadataBlock(ByteBuffer byteBuffer)
+      throws IOException {
+    super(byteBuffer);
+    // Remember: At this point deserializeMetadata is already being called.
+    if (_fixedSizeDataBytes == null) {
+       if (_errCodeToExceptionMap.isEmpty()) {
+         _type = MetadataBlockType.EOS;
+       } else {
+         _type = MetadataBlockType.ERROR;
+       }
+    } else {
+      _type = MetadataBlockType.values()[_fixedSizeDataBytes[0]];
     }
+  }
 
-    public Map<String, String> getStats() {
-      return _stats;
+  @Override
+  protected void serializeMetadata(DataOutputStream output)
+      throws IOException {
+    if (_statsByStage == null) {
+      output.writeInt(0);
+      return;
     }
-
-    public void setStats(Map<String, String> stats) {
-      _stats = stats;
+    int size = _statsByStage.size();
+    output.writeInt(size);
+    if (size > 0) {
+      byte[] bytes = new byte[4096];
+      for (ByteBuffer stat : _statsByStage) {
+        if (stat == null) {
+          output.writeBoolean(false);
+        } else {
+          output.writeBoolean(true);
+          output.writeInt(stat.remaining());
+          ByteBuffer duplicate = stat.duplicate();
+          while (duplicate.hasRemaining()) {
+            int length = Math.min(duplicate.remaining(), bytes.length);
+            duplicate.get(bytes, 0, length);
+            output.write(bytes, 0, length);
+          }
+        }
+      }
     }
   }
 
-  private final Contents _contents;
-
-  public MetadataBlock(MetadataBlockType type) {
-    this(type, new HashMap<>());
-  }
-
-  public MetadataBlock(MetadataBlockType type, Map<String, String> stats) {
-    super(0, null, new String[0], new byte[]{0}, toContents(new 
Contents(type.name(), stats)));
-    _contents = new Contents(type.name(), stats);
-  }
-
-  private static byte[] toContents(Contents type) {
-    try {
-      return JSON.writeValueAsBytes(type);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
+  public static MetadataBlock deserialize(ByteBuffer byteBuffer, int version)
+      throws IOException {
+    switch (version) {
+      case 1: {
+        V1MetadataBlock decoded = new V1MetadataBlock(byteBuffer);

Review Comment:
   >  Is this the only place where v1 metadata block is used?
   Yes. It is used only here and in the tests.
   
   > Does it work if we ignore the version and always return new 
MetadataBlock(byteBuffer);?
   Yes. 
   
   > If the error handling is different, then sending v2 metadata block to old 
server will cause backward incompatibility right?
   
   It is no different.
   
   Remember that the main reason to have this class is to be able to run tests 
with it. The fact that it is useful here is just an extra.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to