This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 42a2fc70ff Add Support for Schema Registry in Protobuf Decoder (#9220)
42a2fc70ff is described below
commit 42a2fc70ff37fddaaba300becd36b41f170e77bd
Author: Kartik Khare <[email protected]>
AuthorDate: Thu Aug 18 12:17:59 2022 +0530
Add Support for Schema Registry in Protobuf Decoder (#9220)
* working on schema registry. refactor pending
* Add tests for confluent protobuf and delete duplicate files
* move dependency versions to parent pom
* Fix protobuf decoder bug: Not honouring offset and length
Co-authored-by: Kartik Khare <[email protected]>
---
.../v0_deprecated/pinot-ingestion-common/pom.xml | 6 +
.../pinot-confluent-avro/pom.xml | 1 -
.../pinot-input-format/pinot-protobuf/pom.xml | 97 +++++++++++++-
...fluentSchemaRegistryProtoBufMessageDecoder.java | 143 +++++++++++++++++++++
.../protobuf/ProtoBufMessageDecoder.java | 11 +-
.../protobuf/ProtoBufConfluentSchemaTest.java | 130 +++++++++++++++++++
.../schemaregistry/SchemaRegistryStarter.java | 105 +++++++++++++++
pom.xml | 1 +
8 files changed, 486 insertions(+), 8 deletions(-)
diff --git
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
index 8fb34e6fe3..4d0f323e2b 100644
---
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
+++
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
@@ -33,6 +33,12 @@
<artifactId>pinot-ingestion-common</artifactId>
<name>Pinot Ingestion Common</name>
<url>https://pinot.apache.org/</url>
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <url>https://packages.confluent.io/maven/</url>
+ </repository>
+ </repositories>
<properties>
<pinot.root>${basedir}/../../../..</pinot.root>
<phase.prop>package</phase.prop>
diff --git a/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml
b/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml
index 57d46b0d5b..e1e4f87b8a 100644
--- a/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml
+++ b/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml
@@ -35,7 +35,6 @@
<properties>
<pinot.root>${basedir}/../../..</pinot.root>
<kafka.lib.version>2.8.1</kafka.lib.version>
- <confluent.version>5.3.1</confluent.version>
<phase.prop>package</phase.prop>
</properties>
<repositories>
diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml
b/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml
index d3befbbc91..c1e25bb9b6 100644
--- a/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml
+++ b/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml
@@ -35,9 +35,16 @@
<url>https://pinot.apache.org/</url>
<properties>
<pinot.root>${basedir}/../../..</pinot.root>
- <proto.version>3.11.4</proto.version>
+ <kafka.lib.version>2.8.1</kafka.lib.version>
+ <testcontainers.version>1.17.3</testcontainers.version>
<phase.prop>package</phase.prop>
</properties>
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <url>https://packages.confluent.io/maven/</url>
+ </repository>
+ </repositories>
<dependencies>
<dependency>
<groupId>commons-lang</groupId>
@@ -46,7 +53,6 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
- <version>${proto.version}</version>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
@@ -59,6 +65,93 @@
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.lib.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-schema-registry-client</artifactId>
+ <version>${confluent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.swagger</groupId>
+ <artifactId>swagger-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-protobuf-serializer</artifactId>
+ <version>${confluent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.squareup.okio</groupId>
+ <artifactId>okio</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.errorprone</groupId>
+ <artifactId>error_prone_annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/KafkaConfluentSchemaRegistryProtoBufMessageDecoder.java
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/KafkaConfluentSchemaRegistryProtoBufMessageDecoder.java
new file mode 100644
index 0000000000..5556e6a97e
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/KafkaConfluentSchemaRegistryProtoBufMessageDecoder.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.RestService;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
+import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractor;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkState;
+
+
+public class KafkaConfluentSchemaRegistryProtoBufMessageDecoder implements
StreamMessageDecoder<byte[]> {
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(KafkaConfluentSchemaRegistryProtoBufMessageDecoder.class);
+ private static final String SCHEMA_REGISTRY_REST_URL =
"schema.registry.rest.url";
+ private static final String SCHEMA_REGISTRY_OPTS_PREFIX = "schema.registry.";
+ public static final String CACHED_SCHEMA_MAP_CAPACITY =
"cached.schema.map.capacity";
+ public static final String DEFAULT_CACHED_SCHEMA_MAP_CAPACITY = "1000";
+
+ private KafkaProtobufDeserializer<Message> _deserializer;
+ private RecordExtractor<Message> _protoBufRecordExtractor;
+ private String _topicName;
+
+ private RestService createRestService(String schemaRegistryUrl, Map<String,
String> configs) {
+ RestService restService = new RestService(schemaRegistryUrl);
+
+ ConfigDef configDef = new ConfigDef();
+ SslConfigs.addClientSslSupport(configDef);
+ Map<String, ConfigDef.ConfigKey> configKeyMap = configDef.configKeys();
+ Map<String, Object> sslConfigs = new HashMap<>();
+ for (String key : configs.keySet()) {
+ if (!key.equals(SCHEMA_REGISTRY_REST_URL) &&
key.startsWith(SCHEMA_REGISTRY_OPTS_PREFIX)) {
+ String value = configs.get(key);
+ String schemaRegistryOptKey =
key.substring(SCHEMA_REGISTRY_OPTS_PREFIX.length());
+
+ if (configKeyMap.containsKey(schemaRegistryOptKey)) {
+ if (configKeyMap.get(schemaRegistryOptKey).type ==
ConfigDef.Type.PASSWORD) {
+ sslConfigs.put(schemaRegistryOptKey, new Password(value));
+ } else {
+ sslConfigs.put(schemaRegistryOptKey, value);
+ }
+ }
+ }
+ }
+
+ if (!sslConfigs.isEmpty()) {
+ DefaultSslEngineFactory sslFactory = new DefaultSslEngineFactory();
+ sslFactory.configure(sslConfigs);
+
restService.setSslSocketFactory(sslFactory.sslContext().getSocketFactory());
+ }
+ return restService;
+ }
+
+ @Override
+ public void init(Map<String, String> props, Set<String> fieldsToRead, String
topicName)
+ throws Exception {
+ checkState(props.containsKey(SCHEMA_REGISTRY_REST_URL), "Missing required
property '%s'", SCHEMA_REGISTRY_REST_URL);
+ String schemaRegistryUrl = props.get(SCHEMA_REGISTRY_REST_URL);
+ ProtobufSchemaProvider protobufSchemaProvider = new
ProtobufSchemaProvider();
+ int identityMapCapacity = Integer.parseInt(
+ props.getOrDefault(CACHED_SCHEMA_MAP_CAPACITY,
DEFAULT_CACHED_SCHEMA_MAP_CAPACITY));
+ SchemaRegistryClient schemaRegistryClient =
+ new CachedSchemaRegistryClient(createRestService(schemaRegistryUrl,
props),
+ identityMapCapacity,
Collections.singletonList(protobufSchemaProvider), props, null);
+ _deserializer = new KafkaProtobufDeserializer<>(schemaRegistryClient);
+ Preconditions.checkNotNull(topicName, "Topic must be provided");
+ _topicName = topicName;
+ _protoBufRecordExtractor =
PluginManager.get().createInstance(ProtoBufRecordExtractor.class.getName());
+ _protoBufRecordExtractor.init(fieldsToRead, null);
+ }
+
+ @Override
+ public GenericRow decode(byte[] payload, GenericRow destination) {
+ try {
+ Message protoMessage = _deserializer.deserialize(_topicName, payload);
+ return _protoBufRecordExtractor.extract(protoMessage, destination);
+ } catch (RuntimeException e) {
+ ignoreOrRethrowException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public GenericRow decode(byte[] payload, int offset, int length, GenericRow
destination) {
+ return decode(Arrays.copyOfRange(payload, offset, offset + length),
destination);
+ }
+
+ /**
+ * This method handles specific serialisation exceptions. If the exception
cannot be ignored the method
+ * re-throws the exception.
+ *
+ * @param e exception to handle
+ */
+ private void ignoreOrRethrowException(RuntimeException e) {
+ if (isUnknownMagicByte(e) || isUnknownMagicByte(e.getCause())) {
+ // Do nothing, the message is not an ProtoBuf message and can't be
decoded
+ LOGGER.error("Caught exception while decoding row in topic {},
discarding row", _topicName, e);
+ return;
+ }
+ throw e;
+ }
+
+ private boolean isUnknownMagicByte(Throwable e) {
+ return e != null && e instanceof SerializationException && e.getMessage()
!= null && e.getMessage().toLowerCase()
+ .contains("unknown magic byte");
+ }
+}
diff --git
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java
index db1995a6b2..fb0c5b3e8d 100644
---
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java
+++
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java
@@ -25,6 +25,7 @@ import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
@@ -78,11 +79,6 @@ public class ProtoBufMessageDecoder implements
StreamMessageDecoder<byte[]> {
@Override
public GenericRow decode(byte[] payload, GenericRow destination) {
- return decode(payload, 0, payload.length, destination);
- }
-
- @Override
- public GenericRow decode(byte[] payload, int offset, int length, GenericRow
destination) {
Message message;
try {
_builder.mergeFrom(payload);
@@ -96,4 +92,9 @@ public class ProtoBufMessageDecoder implements
StreamMessageDecoder<byte[]> {
_recordExtractor.extract(message, destination);
return destination;
}
+
+ @Override
+ public GenericRow decode(byte[] payload, int offset, int length, GenericRow
destination) {
+ return decode(Arrays.copyOfRange(payload, offset, offset + length),
destination);
+ }
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufConfluentSchemaTest.java
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufConfluentSchemaTest.java
new file mode 100644
index 0000000000..ca0270ada4
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufConfluentSchemaTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import
org.apache.pinot.plugin.inputformat.protobuf.kafka.schemaregistry.SchemaRegistryStarter;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class ProtoBufConfluentSchemaTest {
+ public static final String TOPIC_PROTO = "test_topic_proto";
+ SchemaRegistryStarter.KafkaSchemaRegistryInstance _schemaRegistry;
+ private Producer<byte[], Message> _protoProducer;
+
+ @BeforeClass
+ public void setup() {
+ _schemaRegistry = SchemaRegistryStarter.startLocalInstance(9093);
+
+ Properties protoBufProducerProps = new Properties();
+ protoBufProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ _schemaRegistry._kafkaContainer.getBootstrapServers());
+
protoBufProducerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
_schemaRegistry.getUrl());
+ protoBufProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ protoBufProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
+ _protoProducer = new KafkaProducer<>(protoBufProducerProps);
+ }
+
+ @Test
+ public void testSamplePinotConsumer()
+ throws Exception {
+ int numRecords = 10;
+ List<Sample.SampleRecord> recordList = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ Sample.SampleRecord sampleRecord =
Sample.SampleRecord.newBuilder().addFriends(UUID.randomUUID().toString())
+
.addFriends(UUID.randomUUID().toString()).setEmail(UUID.randomUUID().toString())
+ .setName(UUID.randomUUID().toString()).setId(i).build();
+
+ _protoProducer.send(new ProducerRecord<>(TOPIC_PROTO, sampleRecord));
+ recordList.add(sampleRecord);
+ }
+
+ Properties consumerProps = new Properties();
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
_schemaRegistry._kafkaContainer.getBootstrapServers());
+
consumerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
_schemaRegistry.getUrl());
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "foo_bar");
+ KafkaConsumer<byte[], byte[]> kafkaConsumer = new
KafkaConsumer<>(consumerProps);
+ kafkaConsumer.subscribe(Collections.singletonList(TOPIC_PROTO));
+ ConsumerRecords<byte[], byte[]> consumerRecords =
kafkaConsumer.poll(Duration.ofMillis(1000));
+ Iterator<ConsumerRecord<byte[], byte[]>> iter = consumerRecords.iterator();
+
+ KafkaConfluentSchemaRegistryProtoBufMessageDecoder decoder =
+ new KafkaConfluentSchemaRegistryProtoBufMessageDecoder();
+ Map<String, String> decoderProps = new HashMap<>();
+ decoderProps.put("schema.registry.rest.url", _schemaRegistry.getUrl());
+ decoder.init(decoderProps, null, TOPIC_PROTO);
+ GenericRow reuse = new GenericRow();
+ List<GenericRow> result = new ArrayList<>();
+ while (iter.hasNext()) {
+ byte[] arr = iter.next().value();
+ decoder.decode(arr, reuse);
+ result.add(reuse.copy());
+ reuse.clear();
+ }
+
+ Assert.assertEquals(result.size(), numRecords);
+
+ for (int i = 0; i < numRecords; i++) {
+ Sample.SampleRecord originalValue = recordList.get(i);
+ GenericRow decodedValue = result.get(i);
+
+ for (Map.Entry<Descriptors.FieldDescriptor, Object> fieldWithValue :
originalValue.getAllFields().entrySet()) {
+
Assert.assertNotNull(decodedValue.getValue(fieldWithValue.getKey().getName()));
+ if (!fieldWithValue.getKey().isRepeated()) {
+ Assert.assertEquals(fieldWithValue.getValue(),
decodedValue.getValue(fieldWithValue.getKey().getName()));
+ }
+ }
+ }
+ }
+
+ @AfterClass
+ public void tearDown() {
+ _schemaRegistry.stop();
+ }
+}
diff --git
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/kafka/schemaregistry/SchemaRegistryStarter.java
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/kafka/schemaregistry/SchemaRegistryStarter.java
new file mode 100644
index 0000000000..c6d7a59770
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/kafka/schemaregistry/SchemaRegistryStarter.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf.kafka.schemaregistry;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+
+public class SchemaRegistryStarter {
+ public static final int DEFAULT_PORT = 8081;
+ private static final String CONFLUENT_PLATFORM_VERSION = "7.2.0";
+ private static final DockerImageName KAFKA_DOCKER_IMAGE_NAME =
+ DockerImageName.parse("confluentinc/cp-kafka:" +
CONFLUENT_PLATFORM_VERSION);
+ private static final DockerImageName SCHEMA_REGISTRY_DOCKER_IMAGE_NAME =
+ DockerImageName.parse("confluentinc/cp-schema-registry:" +
CONFLUENT_PLATFORM_VERSION);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SchemaRegistryStarter.class);
+
+ private SchemaRegistryStarter() {
+ }
+
+ public static KafkaSchemaRegistryInstance startLocalInstance(int port) {
+ KafkaSchemaRegistryInstance kafkaSchemaRegistry = new
KafkaSchemaRegistryInstance(port);
+ kafkaSchemaRegistry.start();
+ return kafkaSchemaRegistry;
+ }
+
+ public static class KafkaSchemaRegistryInstance {
+ private final int _port;
+ public KafkaContainer _kafkaContainer;
+ private Network _network;
+ private GenericContainer _schemaRegistryContainer;
+
+ private KafkaSchemaRegistryInstance(int port) {
+ _port = port;
+ }
+
+ public String getUrl() {
+ return "http://" + _schemaRegistryContainer.getHost() + ":" +
_schemaRegistryContainer.getMappedPort(_port);
+ }
+
+ public void start() {
+ LOGGER.info("Starting schema registry");
+ if (_kafkaContainer != null || _schemaRegistryContainer != null) {
+ throw new IllegalStateException("Schema registry is already running");
+ }
+
+ _network = Network.newNetwork();
+
+ _kafkaContainer = new
KafkaContainer(KAFKA_DOCKER_IMAGE_NAME).withNetwork(_network).withNetworkAliases("kafka")
+ .withCreateContainerCmdModifier(it ->
it.withHostName("kafka")).waitingFor(Wait.forListeningPort());
+ _kafkaContainer.start();
+
+ Map<String, String> schemaRegistryProps = new HashMap<>();
+ schemaRegistryProps.put("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
"kafka:9092");
+ schemaRegistryProps.put("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry");
+ schemaRegistryProps.put("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" +
_port);
+ schemaRegistryProps.put("SCHEMA_REGISTRY_DEBUG", "true");
+ _schemaRegistryContainer =
+ new
GenericContainer(SCHEMA_REGISTRY_DOCKER_IMAGE_NAME).dependsOn(_kafkaContainer).withNetwork(_network)
+
.withNetworkAliases("schemaregistry").withEnv(schemaRegistryProps).withExposedPorts(_port)
+ .waitingFor(Wait.forListeningPort());
+ _schemaRegistryContainer.start();
+ }
+
+ public void stop() {
+ LOGGER.info("Stopping schema registry");
+ if (_schemaRegistryContainer != null) {
+ _schemaRegistryContainer.stop();
+ _schemaRegistryContainer = null;
+ }
+
+ if (_kafkaContainer != null) {
+ _kafkaContainer.stop();
+ _kafkaContainer = null;
+ }
+
+ if (_network != null) {
+ _network.close();
+ }
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index b61f381f90..5876c72893 100644
--- a/pom.xml
+++ b/pom.xml
@@ -168,6 +168,7 @@
<kafka.version>2.0</kafka.version>
<protobuf.version>3.19.2</protobuf.version>
<grpc.version>1.41.0</grpc.version>
+ <confluent.version>5.5.3</confluent.version>
<!-- Checkstyle violation prop.-->
<checkstyle.violation.severity>warning</checkstyle.violation.severity>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]