This is an automated email from the ASF dual-hosted git repository.
lqc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f8fa1a8a57 Expose msg length info to metadata (#14688)
f8fa1a8a57 is described below
commit f8fa1a8a572f7668058bc29de23da586dc40c855
Author: lnbest0707 <[email protected]>
AuthorDate: Thu Jan 2 10:47:17 2025 -0800
Expose msg length info to metadata (#14688)
* Expose msg length info to metadata
* Address comment
---
.../main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java | 2 ++
.../java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java | 3 ++-
2 files changed, 4 insertions(+), 1 deletion(-)
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
index 127ecfe121..35721fcb82 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
@@ -30,6 +30,7 @@ public class StreamDataDecoderImpl implements
StreamDataDecoder {
public static final String KEY = "__key";
public static final String HEADER_KEY_PREFIX = "__header$";
public static final String METADATA_KEY_PREFIX = "__metadata$";
+ public static final String RECORD_SERIALIZED_VALUE_SIZE_KEY =
METADATA_KEY_PREFIX + "recordSerializedValueSize";
private final StreamMessageDecoder _valueDecoder;
private final GenericRow _reuse = new GenericRow();
@@ -65,6 +66,7 @@ public class StreamDataDecoderImpl implements
StreamDataDecoder {
if (metadata.getRecordMetadata() != null) {
metadata.getRecordMetadata().forEach((key, value) ->
row.putValue(METADATA_KEY_PREFIX + key, value));
}
+ row.putValue(RECORD_SERIALIZED_VALUE_SIZE_KEY, message.getLength());
}
return new StreamDataDecoderResult(row, null);
} else {
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
index f9f6aafc11..a2ddec6d99 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
@@ -71,11 +71,12 @@ public class StreamDataDecoderImplTest {
Assert.assertNotNull(result.getResult());
GenericRow row = result.getResult();
- Assert.assertEquals(row.getFieldToValueMap().size(), 4);
+ Assert.assertEquals(row.getFieldToValueMap().size(), 5);
Assert.assertEquals(row.getValue(NAME_FIELD), value);
Assert.assertEquals(row.getValue(StreamDataDecoderImpl.KEY), key, "Failed
to decode record key");
Assert.assertEquals(row.getValue(StreamDataDecoderImpl.HEADER_KEY_PREFIX +
AGE_HEADER_KEY), 3);
Assert.assertEquals(row.getValue(StreamDataDecoderImpl.METADATA_KEY_PREFIX
+ SEQNO_RECORD_METADATA), "1");
+
Assert.assertEquals(row.getValue(StreamDataDecoderImpl.RECORD_SERIALIZED_VALUE_SIZE_KEY),
value.length());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]