This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5b0e3ff2d14 Add realtime integration test for
ProtoBufCodeGenMessageDecoder (#18234)
5b0e3ff2d14 is described below
commit 5b0e3ff2d14d959f5d96b8e742e2ca2bef3eb082
Author: Rekha Seethamraju <[email protected]>
AuthorDate: Mon Apr 20 12:09:34 2026 -0700
Add realtime integration test for ProtoBufCodeGenMessageDecoder (#18234)
Adds ProtoBufCodeGenMessageDecoderTest, a realtime cluster integration
test that pushes protobuf-serialized SampleRecord messages to Kafka and
verifies end-to-end ingestion and query correctness via
ProtoBufCodeGenMessageDecoder.
Also adds a pushDataIntoKafka() extensibility hook to
CustomDataQueryClusterIntegrationTest so non-Avro realtime tests can
override the data push step without duplicating the full table setup
lifecycle.
Changes:
- pinot-integration-tests/pom.xml: add pinot-protobuf test dependency
- CustomDataQueryClusterIntegrationTest: extract pushDataIntoKafka()
hook (default delegates to pushAvroIntoKafka)
- ProtoBufCodeGenMessageDecoderTest: new realtime integration test
covering scalar fields, repeated/multi-value fields, and distinct
value queries over 200 ingested SampleRecord messages
- sample-samplerecord.jar / .desc: test resources copied from
pinot-protobuf, renamed to avoid classpath conflicts
---
pinot-integration-tests/pom.xml | 5 +
.../CustomDataQueryClusterIntegrationTest.java | 14 +-
.../custom/ProtoBufCodeGenMessageDecoderTest.java | 290 +++++++++++++++++++++
.../src/test/resources/sample-samplerecord.desc | 8 +
.../src/test/resources/sample-samplerecord.jar | Bin 0 -> 17678 bytes
5 files changed, 315 insertions(+), 2 deletions(-)
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index e42426ff95f..761513d83ee 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -292,6 +292,11 @@
<artifactId>pinot-kafka-4.0</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-protobuf</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
index 381c5847287..865e87b3bb9 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
@@ -150,8 +150,8 @@ public abstract class CustomDataQueryClusterIntegrationTest
extends BaseClusterI
TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
addRealtimeTableConfigWithRetry(tableConfig);
- // Push data into Kafka
- pushAvroIntoKafka(avroFiles);
+ // Push data into Kafka — override pushDataIntoKafka() for non-Avro
formats
+ pushDataIntoKafka(avroFiles);
} else {
// create offline table
TableConfig tableConfig = createOfflineTableConfig();
@@ -248,6 +248,16 @@ public abstract class
CustomDataQueryClusterIntegrationTest extends BaseClusterI
getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn(), injectTombstones());
}
+ /**
+ * Pushes data into Kafka for realtime ingestion. Subclasses using non-Avro
formats
+ * (e.g., protobuf, CSV) can override this method and ignore the dataFiles
parameter.
+ * The default implementation delegates to {@link #pushAvroIntoKafka(List)}.
+ */
+ protected void pushDataIntoKafka(List<File> dataFiles)
+ throws Exception {
+ pushAvroIntoKafka(dataFiles);
+ }
+
@Override
public String getZkUrl() {
if (_sharedClusterTestSuite != this) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ProtoBufCodeGenMessageDecoderTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ProtoBufCodeGenMessageDecoderTest.java
new file mode 100644
index 00000000000..80cc0140b6f
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ProtoBufCodeGenMessageDecoderTest.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.primitives.Longs;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import java.io.File;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import
org.apache.pinot.plugin.inputformat.protobuf.ProtoBufCodeGenMessageDecoder;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Integration test for {@link ProtoBufCodeGenMessageDecoder}.
+ *
+ * <p>Pushes protobuf-serialized {@code SampleRecord} messages (defined in
{@code sample.proto})
+ * into Kafka, configures a realtime Pinot table to decode them with {@link
ProtoBufCodeGenMessageDecoder},
+ * and validates that data is ingested and queryable.
+ *
+ * <p>Required test resources in {@code
pinot-integration-tests/src/test/resources/}:
+ * <ul>
+ * <li>{@code sample-samplerecord.jar} — compiled protobuf classes for
{@code SampleRecord}.
+ * To regenerate: copy {@code sample.jar} from
+ * {@code
pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/}.</li>
+ * <li>{@code sample-samplerecord.desc} — binary {@code FileDescriptorSet}
for constructing
+ * messages via {@link DynamicMessage} without a generated Java class.
+ * To regenerate: copy {@code sample.desc} from the same source
directory.</li>
+ * </ul>
+ * Both resources are copies of the test resources in {@code pinot-protobuf},
renamed to avoid
+ * classpath conflicts with that module's own resources when both are on the
test classpath.
+ *
+ * <p>Thread-safety: inherits the shared-cluster pattern from {@link
CustomDataQueryClusterIntegrationTest}.
+ */
+public class ProtoBufCodeGenMessageDecoderTest extends
CustomDataQueryClusterIntegrationTest {
+
+ private static final String TABLE_NAME = "ProtoBufCodeGenTest";
+ private static final int NUM_RECORDS = 200;
+ private static final int NUM_DISTINCT_FRIENDS = 5;
+ private static final String PROTO_CLASS_NAME_VALUE =
+ "org.apache.pinot.plugin.inputformat.protobuf.Sample$SampleRecord";
+ private static final String PROTO_DESCRIPTOR_RESOURCE =
"sample-samplerecord.desc";
+ private static final String PROTO_JAR_RESOURCE = "sample-samplerecord.jar";
+ private static final String PROTO_MESSAGE_TYPE = "SampleRecord";
+
+ private static final String COL_NAME = "name";
+ private static final String COL_ID = "id";
+ private static final String COL_EMAIL = "email";
+ private static final String COL_FRIENDS = "friends";
+
+ @Override
+ public String getTableName() {
+ return TABLE_NAME;
+ }
+
+ @Override
+ public Schema createSchema() {
+ return new Schema.SchemaBuilder()
+ .setSchemaName(TABLE_NAME)
+ .addField(new DimensionFieldSpec(COL_NAME, FieldSpec.DataType.STRING,
true))
+ .addField(new DimensionFieldSpec(COL_EMAIL, FieldSpec.DataType.STRING,
true))
+ .addField(new DimensionFieldSpec(COL_FRIENDS,
FieldSpec.DataType.STRING, false))
+ .addField(new DateTimeFieldSpec(COL_ID, FieldSpec.DataType.INT,
+ "1:SECONDS:EPOCH", "1:SECONDS"))
+ .build();
+ }
+
+ @Override
+ public long getCountStarResult() {
+ return NUM_RECORDS;
+ }
+
+ @Override
+ public boolean isRealtimeTable() {
+ return true;
+ }
+
+ @Override
+ protected Map<String, String> getStreamConfigMap() {
+ Map<String, String> streamConfigMap = super.getStreamConfigMap();
+ String streamType = "kafka";
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_DECODER_CLASS),
+ ProtoBufCodeGenMessageDecoder.class.getName());
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType,
+ "decoder.prop." +
ProtoBufCodeGenMessageDecoder.PROTOBUF_JAR_FILE_PATH),
+ getJarFileUri());
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType,
+ "decoder.prop." + ProtoBufCodeGenMessageDecoder.PROTO_CLASS_NAME),
+ PROTO_CLASS_NAME_VALUE);
+ return streamConfigMap;
+ }
+
+ @Override
+ protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
+ // sampleAvroFile is not used — data is pushed as protobuf bytes, not Avro
+ return new TableConfigBuilder(TableType.REALTIME)
+ .setTableName(getTableName())
+ .setStreamConfigs(getStreamConfigs())
+ .setTimeColumnName(COL_ID)
+ .setNumReplicas(getNumReplicas())
+ .build();
+ }
+
+ /**
+ * Returns a single dummy file. The base class passes {@code
avroFiles.get(0)} to
+ * {@link #createRealtimeTableConfig(File)}, so the list must be non-empty.
The file is not read.
+ */
+ @Override
+ public List<File> createAvroFiles()
+ throws Exception {
+ File dummy = new File(_tempDir, "dummy.avro");
+ dummy.createNewFile();
+ return List.of(dummy);
+ }
+
+ /**
+ * Pushes {@link #NUM_RECORDS} protobuf-encoded {@code SampleRecord}
messages to Kafka.
+ *
+ * <p>Uses {@link DynamicMessage} with the pre-compiled {@code
sample-samplerecord.desc} descriptor
+ * so no generated Java sources are needed at compile time.
+ *
+ * <p>Each record {@code i} has:
+ * <ul>
+ * <li>{@code id = i} (also the time column, values 0–{@link
#NUM_RECORDS})</li>
+ * <li>{@code name = "name-i"}</li>
+ * <li>{@code email = "[email protected]"}</li>
+ * <li>{@code friends = ["friend-{i % NUM_DISTINCT_FRIENDS}"]}</li>
+ * </ul>
+ */
+ @Override
+ protected void pushDataIntoKafka(List<File> dataFiles)
+ throws Exception {
+ Descriptors.Descriptor descriptor = loadSampleRecordDescriptor();
+ Descriptors.FieldDescriptor nameField =
descriptor.findFieldByName(COL_NAME);
+ Descriptors.FieldDescriptor idField = descriptor.findFieldByName(COL_ID);
+ Descriptors.FieldDescriptor emailField =
descriptor.findFieldByName(COL_EMAIL);
+ Descriptors.FieldDescriptor friendsField =
descriptor.findFieldByName(COL_FRIENDS);
+
+ try (StreamDataProducer producer =
+
ClusterIntegrationTestUtils.getKafkaProducer(getSharedKafkaBrokerList())) {
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ DynamicMessage record = DynamicMessage.newBuilder(descriptor)
+ .setField(nameField, "name-" + i)
+ .setField(idField, i)
+ .setField(emailField, "user" + i + "@example.com")
+ .addRepeatedField(friendsField, "friend-" + (i %
NUM_DISTINCT_FRIENDS))
+ .build();
+ producer.produce(getKafkaTopic(), Longs.toByteArray(i),
record.toByteArray());
+ }
+ }
+ }
+
+ /**
+ * Verifies that all pushed records are ingested: {@code COUNT(*) =
NUM_RECORDS}.
+ */
+ @Test
+ public void testCountStar()
+ throws Exception {
+ JsonNode response = postQuery("SELECT COUNT(*) FROM " + getTableName());
+
assertEquals(response.get("resultTable").get("rows").get(0).get(0).asLong(),
NUM_RECORDS);
+ }
+
+ /**
+ * Verifies scalar field decoding by selecting a specific record by {@code
id}.
+ */
+ @Test
+ public void testSelectById()
+ throws Exception {
+ int targetId = 42;
+ JsonNode response = postQuery(
+ String.format("SELECT %s, %s FROM %s WHERE %s = %d", COL_NAME,
COL_EMAIL, getTableName(), COL_ID, targetId));
+ JsonNode rows = response.get("resultTable").get("rows");
+ assertEquals(rows.size(), 1, "Expected exactly one row for id=" +
targetId);
+ assertEquals(rows.get(0).get(0).asText(), "name-" + targetId);
+ assertEquals(rows.get(0).get(1).asText(), "user" + targetId +
"@example.com");
+ }
+
+ /**
+ * Verifies that {@code repeated string friends} is decoded as a multi-value
STRING column.
+ * Each record has one friend: {@code "friend-{id % NUM_DISTINCT_FRIENDS}"},
+ * so each distinct friend value should match {@code NUM_RECORDS /
NUM_DISTINCT_FRIENDS} rows.
+ */
+ @Test
+ public void testMultiValueFriendsCount()
+ throws Exception {
+ long expectedPerFriend = NUM_RECORDS / NUM_DISTINCT_FRIENDS;
+ for (int f = 0; f < NUM_DISTINCT_FRIENDS; f++) {
+ JsonNode response = postQuery(
+ String.format("SELECT COUNT(*) FROM %s WHERE %s = 'friend-%d'",
getTableName(), COL_FRIENDS, f));
+ long count =
response.get("resultTable").get("rows").get(0).get(0).asLong();
+ assertEquals(count, expectedPerFriend,
+ "Unexpected row count for friend-" + f + ": " + count + " (expected
" + expectedPerFriend + ")");
+ }
+ }
+
+ /**
+ * Verifies there are exactly {@link #NUM_DISTINCT_FRIENDS} distinct values
in the {@code friends} column.
+ */
+ @Test
+ public void testDistinctFriendCount()
+ throws Exception {
+ JsonNode response = postQuery(
+ String.format("SELECT COUNT(DISTINCT %s) FROM %s", COL_FRIENDS,
getTableName()));
+ long count =
response.get("resultTable").get("rows").get(0).get(0).asLong();
+ assertEquals(count, NUM_DISTINCT_FRIENDS,
+ "Expected " + NUM_DISTINCT_FRIENDS + " distinct friend values, got " +
count);
+ }
+
+ /**
+ * Verifies that the {@code email} field is decoded and non-null for all
records.
+ */
+ @Test
+ public void testEmailFieldNotNull()
+ throws Exception {
+ JsonNode response = postQuery(
+ String.format("SELECT COUNT(*) FROM %s WHERE %s IS NOT NULL",
getTableName(), COL_EMAIL));
+ long count =
response.get("resultTable").get("rows").get(0).get(0).asLong();
+ assertEquals(count, NUM_RECORDS, "All records should have a non-null
email");
+ }
+
+ // ---- Helpers ----
+
+ /**
+ * Loads the {@code SampleRecord} {@link Descriptors.Descriptor} from the
binary descriptor file
+ * on the test classpath. Fails with a clear assertion message if the
resource or message type
+ * is not found.
+ */
+ private static Descriptors.Descriptor loadSampleRecordDescriptor()
+ throws Exception {
+ try (InputStream is =
ProtoBufCodeGenMessageDecoderTest.class.getClassLoader()
+ .getResourceAsStream(PROTO_DESCRIPTOR_RESOURCE)) {
+ assertNotNull(is, PROTO_DESCRIPTOR_RESOURCE + " not found on test
classpath");
+ DynamicSchema schema = DynamicSchema.parseFrom(is);
+ Descriptors.Descriptor descriptor =
schema.getMessageDescriptor(PROTO_MESSAGE_TYPE);
+ assertNotNull(descriptor,
+ "Message type '" + PROTO_MESSAGE_TYPE + "' not found in " +
PROTO_DESCRIPTOR_RESOURCE
+ + ". Verify the descriptor was generated correctly from
sample.proto.");
+ return descriptor;
+ }
+ }
+
+ private static String getJarFileUri() {
+ URL jarUrl = ProtoBufCodeGenMessageDecoderTest.class.getClassLoader()
+ .getResource(PROTO_JAR_RESOURCE);
+ try {
+ return Objects.requireNonNull(jarUrl, PROTO_JAR_RESOURCE + " not found
on test classpath")
+ .toURI().toString();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to resolve JAR URI for " +
PROTO_JAR_RESOURCE, e);
+ }
+ }
+}
diff --git
a/pinot-integration-tests/src/test/resources/sample-samplerecord.desc
b/pinot-integration-tests/src/test/resources/sample-samplerecord.desc
new file mode 100644
index 00000000000..9b004e56417
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/sample-samplerecord.desc
@@ -0,0 +1,8 @@
+
+�
+sample.proto,org.apache.pinot.plugin.inputformat.protobuf"b
+SampleRecord
+name ( Rname
+id (Rid
+email ( Remail
+friends ( Rfriendsbproto3
\ No newline at end of file
diff --git a/pinot-integration-tests/src/test/resources/sample-samplerecord.jar
b/pinot-integration-tests/src/test/resources/sample-samplerecord.jar
new file mode 100644
index 00000000000..da974d4d082
Binary files /dev/null and
b/pinot-integration-tests/src/test/resources/sample-samplerecord.jar differ
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]