gortiz commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1587275146


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -36,108 +39,129 @@
  */
 public class MetadataBlock extends BaseDataBlock {
 
-  private static final ObjectMapper JSON = new ObjectMapper();
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataBlock.class);
   @VisibleForTesting
-  static final int VERSION = 1;
+  static final int VERSION = 2;
+  @Nullable
+  private List<ByteBuffer> _statsByStage;
 
-  public enum MetadataBlockType {
-    /**
-     * Indicates that this block is the final block to be sent
-     * (End Of Stream) as part of an operator chain computation.
-     */
-    EOS,
+  private final MetadataBlockType _type;
 
-    /**
-     * An {@code ERROR} metadata block indicates that there was
-     * some error during computation. To retrieve the error that
-     * occurred, use {@link MetadataBlock#getExceptions()}
-     */
-    ERROR
+  public MetadataBlock(MetadataBlockType type) {
+    this(type, Collections.emptyList());
   }
 
-  /**
-   * Used to serialize the contents of the metadata block conveniently and in
-   * a backwards compatible way. Use JSON because the performance of metadata 
block
-   * SerDe should not be a bottleneck.
-   */
-  @JsonIgnoreProperties(ignoreUnknown = true)
-  @VisibleForTesting
-  static class Contents {
-
-    private String _type;
-    private Map<String, String> _stats;
-
-    @JsonCreator
-    public Contents(@JsonProperty("type") String type, @JsonProperty("stats") 
Map<String, String> stats) {
-      _type = type;
-      _stats = stats;
-    }
-
-    @JsonCreator
-    public Contents() {
-      this(null, new HashMap<>());
-    }
-
-    public String getType() {
-      return _type;
-    }
+  public MetadataBlock(MetadataBlockType type, List<ByteBuffer> statsByStage) {
+    super(0, null, new String[0], new byte[]{(byte) (type.ordinal() & 0xFF)}, 
new byte[0]);
+    _type = type;
+    _statsByStage = statsByStage;
+  }
 
-    public void setType(String type) {
-      _type = type;
+  MetadataBlock(ByteBuffer byteBuffer)
+      throws IOException {
+    super(byteBuffer);
+    // Remember: At this point deserializeMetadata is already being called.
+    if (_fixedSizeDataBytes == null) {
+       if (_errCodeToExceptionMap.isEmpty()) {
+         _type = MetadataBlockType.EOS;
+       } else {
+         _type = MetadataBlockType.ERROR;
+       }
+    } else {
+      _type = MetadataBlockType.values()[_fixedSizeDataBytes[0]];
     }
+  }
 
-    public Map<String, String> getStats() {
-      return _stats;
+  @Override
+  protected void serializeMetadata(DataOutputStream output)
+      throws IOException {
+    if (_statsByStage == null) {
+      output.writeInt(0);
+      return;
     }
-
-    public void setStats(Map<String, String> stats) {
-      _stats = stats;
+    int size = _statsByStage.size();
+    output.writeInt(size);
+    if (size > 0) {
+      byte[] bytes = new byte[4096];
+      for (ByteBuffer stat : _statsByStage) {
+        if (stat == null) {
+          output.writeBoolean(false);
+        } else {
+          output.writeBoolean(true);
+          output.writeInt(stat.remaining());
+          ByteBuffer duplicate = stat.duplicate();
+          while (duplicate.hasRemaining()) {
+            int length = Math.min(duplicate.remaining(), bytes.length);
+            duplicate.get(bytes, 0, length);
+            output.write(bytes, 0, length);
+          }
+        }
+      }
     }
   }
 
-  private final Contents _contents;
-
-  public MetadataBlock(MetadataBlockType type) {
-    this(type, new HashMap<>());
-  }
-
-  public MetadataBlock(MetadataBlockType type, Map<String, String> stats) {
-    super(0, null, new String[0], new byte[]{0}, toContents(new 
Contents(type.name(), stats)));
-    _contents = new Contents(type.name(), stats);
-  }
-
-  private static byte[] toContents(Contents type) {
-    try {
-      return JSON.writeValueAsBytes(type);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
+  public static MetadataBlock deserialize(ByteBuffer byteBuffer, int version)
+      throws IOException {
+    switch (version) {
+      case 1: {
+        V1MetadataBlock decoded = new V1MetadataBlock(byteBuffer);

Review Comment:
   > Hmm, if we are encoding errors differently in v1 and v2, how does v1 
understand the errors in v2?
   
   In V1 the quality of being an error is reported in two different ways:
   1. By having a map of exceptions, which is serialized in a specific byte 
buffer section.
   2. By setting the `type` in the json `Contents` stored in the variable size 
section.
   
   In V2 we just use the first option. I that is what the very old _V0_ (whose 
source code is in `MetadataBlockTest.V0MetadataBlock`) in version was doing. 
Given V1 can read V0, it works as expected.
   
   Specifically, V1 is able to read bytes where the variable section is empty 
or `Contents.type` is not defined because `getType` is defined as:
   
   ```java
     public MetadataBlock.MetadataBlockType getType() {
       String type = _contents.getType();
   
       // if type is null, then we're reading a legacy block where we didn't 
encode any
       // data. assume that it is an EOS block if there's no exceptions and an 
ERROR block
       // otherwise
       return type == null
           ? (getExceptions().isEmpty() ? MetadataBlock.MetadataBlockType.EOS : 
MetadataBlock.MetadataBlockType.ERROR)
           : MetadataBlock.MetadataBlockType.valueOf(type);
     }
   ```
   
   Given we don't set `Contents.getType`, its value is null and we return EOS 
or ERROR depending on whether exceptions is empty or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to