gortiz commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1589060972


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -36,108 +39,129 @@
  */
 public class MetadataBlock extends BaseDataBlock {
 
-  private static final ObjectMapper JSON = new ObjectMapper();
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataBlock.class);
   @VisibleForTesting
-  static final int VERSION = 1;
+  static final int VERSION = 2;
+  @Nullable
+  private List<ByteBuffer> _statsByStage;
 
-  public enum MetadataBlockType {
-    /**
-     * Indicates that this block is the final block to be sent
-     * (End Of Stream) as part of an operator chain computation.
-     */
-    EOS,
+  private final MetadataBlockType _type;
 
-    /**
-     * An {@code ERROR} metadata block indicates that there was
-     * some error during computation. To retrieve the error that
-     * occurred, use {@link MetadataBlock#getExceptions()}
-     */
-    ERROR
+  public MetadataBlock(MetadataBlockType type) {
+    this(type, Collections.emptyList());
   }
 
-  /**
-   * Used to serialize the contents of the metadata block conveniently and in
-   * a backwards compatible way. Use JSON because the performance of metadata 
block
-   * SerDe should not be a bottleneck.
-   */
-  @JsonIgnoreProperties(ignoreUnknown = true)
-  @VisibleForTesting
-  static class Contents {
-
-    private String _type;
-    private Map<String, String> _stats;
-
-    @JsonCreator
-    public Contents(@JsonProperty("type") String type, @JsonProperty("stats") 
Map<String, String> stats) {
-      _type = type;
-      _stats = stats;
-    }
-
-    @JsonCreator
-    public Contents() {
-      this(null, new HashMap<>());
-    }
-
-    public String getType() {
-      return _type;
-    }
+  public MetadataBlock(MetadataBlockType type, List<ByteBuffer> statsByStage) {
+    super(0, null, new String[0], new byte[]{(byte) (type.ordinal() & 0xFF)}, 
new byte[0]);
+    _type = type;
+    _statsByStage = statsByStage;
+  }
 
-    public void setType(String type) {
-      _type = type;
+  MetadataBlock(ByteBuffer byteBuffer)
+      throws IOException {
+    super(byteBuffer);
+    // Remember: At this point deserializeMetadata is already being called.
+    if (_fixedSizeDataBytes == null) {
+       if (_errCodeToExceptionMap.isEmpty()) {
+         _type = MetadataBlockType.EOS;
+       } else {
+         _type = MetadataBlockType.ERROR;
+       }
+    } else {
+      _type = MetadataBlockType.values()[_fixedSizeDataBytes[0]];
     }
+  }
 
-    public Map<String, String> getStats() {
-      return _stats;
+  @Override
+  protected void serializeMetadata(DataOutputStream output)
+      throws IOException {
+    if (_statsByStage == null) {
+      output.writeInt(0);
+      return;
     }
-
-    public void setStats(Map<String, String> stats) {
-      _stats = stats;
+    int size = _statsByStage.size();
+    output.writeInt(size);
+    if (size > 0) {
+      byte[] bytes = new byte[4096];
+      for (ByteBuffer stat : _statsByStage) {
+        if (stat == null) {
+          output.writeBoolean(false);
+        } else {
+          output.writeBoolean(true);
+          output.writeInt(stat.remaining());
+          ByteBuffer duplicate = stat.duplicate();
+          while (duplicate.hasRemaining()) {
+            int length = Math.min(duplicate.remaining(), bytes.length);
+            duplicate.get(bytes, 0, length);
+            output.write(bytes, 0, length);
+          }
+        }
+      }
     }
   }
 
-  private final Contents _contents;
-
-  public MetadataBlock(MetadataBlockType type) {
-    this(type, new HashMap<>());
-  }
-
-  public MetadataBlock(MetadataBlockType type, Map<String, String> stats) {
-    super(0, null, new String[0], new byte[]{0}, toContents(new 
Contents(type.name(), stats)));
-    _contents = new Contents(type.name(), stats);
-  }
-
-  private static byte[] toContents(Contents type) {
-    try {
-      return JSON.writeValueAsBytes(type);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
+  public static MetadataBlock deserialize(ByteBuffer byteBuffer, int version)
+      throws IOException {
+    switch (version) {
+      case 1: {
+        V1MetadataBlock decoded = new V1MetadataBlock(byteBuffer);

Review Comment:
   If we remove this case, then 
`v1ErrorWithExceptionsIsDecodedAsV2ErrorWithSameExceptions` fails because we 
decode as EOS instead of error.
   
   The reason for that is because when I modified `MetadataBlock` I decided to 
keep the `_type` attribute. But I guess it doesn't make much sense if we define 
that an error block must always contain at least one exception (like in _V0_). 
I've just changed that and now all tests are passing and V1MetadataBlock can be 
moved to the test directory.
   
   Anyway, one of my plans is to clean this code a lot in order to improve 
serialization/deserialization times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to