Copilot commented on code in PR #17790:
URL: https://github.com/apache/pinot/pull/17790#discussion_r2886142757
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java:
##########
@@ -80,14 +94,183 @@ protected long getDocsLoadedTimeoutMs() {
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {
String kafkaBrokerList = getKafkaBrokerList();
- // the first transaction of kafka messages are aborted
- ClusterIntegrationTestUtils
- .pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList,
getKafkaTopic(),
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn(), false);
- // the second transaction of kafka messages are committed
- ClusterIntegrationTestUtils
- .pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList,
getKafkaTopic(),
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn(), true);
+ // Use System.err for diagnostics - log4j2 console appender is filtered to
ERROR in CI
+ System.err.println("[ExactlyOnce] Pushing transactional data to Kafka at:
" + kafkaBrokerList);
+ System.err.println("[ExactlyOnce] Avro files count: " + avroFiles.size());
+
+ Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaBrokerList);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+ producerProps.put(ProducerConfig.RETRIES_CONFIG,
Integer.toString(Integer.MAX_VALUE));
+ producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
"5");
+ producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "600000");
+ producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"test-transaction-" + UUID.randomUUID());
+
+ // Use a SINGLE producer for both abort and commit transactions.
+ // With a single producer, the coordinator's state machine ensures that
after
+ // abortTransaction() returns, it returns CONCURRENT_TRANSACTIONS for any
new
+ // transaction operations until the abort is fully done (markers written).
+ try (KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps)) {
+ producer.initTransactions();
+ System.err.println("[ExactlyOnce] initTransactions() succeeded");
+
+ // Transaction 1: aborted batch
+ long abortedCount = pushAvroRecords(producer, avroFiles, false);
+ System.err.println("[ExactlyOnce] Aborted batch: " + abortedCount + "
records");
+
+ // Transaction 2: committed batch
+ long committedCount = pushAvroRecords(producer, avroFiles, true);
+ System.err.println("[ExactlyOnce] Committed batch: " + committedCount +
" records");
+ }
+
+ // After producer is closed, verify data visibility with independent
consumers
+ System.err.println("[ExactlyOnce] Producer closed. Verifying data
visibility...");
+ waitForCommittedRecordsVisible(kafkaBrokerList);
+ }
+
+ /**
+ * Wait for committed records to be visible to a read_committed consumer.
+ * This ensures transaction markers have been fully propagated before
returning.
+ */
+ private void waitForCommittedRecordsVisible(String brokerList) {
+ long deadline = System.currentTimeMillis() + 60_000L;
+ int lastCommitted = 0;
+ int lastUncommitted = 0;
+ int iteration = 0;
+
+ while (System.currentTimeMillis() < deadline) {
+ iteration++;
+ lastCommitted = countRecords(brokerList, "read_committed");
+ if (lastCommitted > 0) {
+ System.err.println("[ExactlyOnce] Verification OK: read_committed=" +
lastCommitted
+ + " after " + iteration + " iterations");
+ return;
+ }
+ // Check if data reached Kafka at all
+ if (iteration == 1 || iteration % 5 == 0) {
+ lastUncommitted = countRecords(brokerList, "read_uncommitted");
+ System.err.println("[ExactlyOnce] Verification iteration " + iteration
+ + ": read_committed=" + lastCommitted + ", read_uncommitted=" +
lastUncommitted);
+ }
+ try {
+ Thread.sleep(2_000L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ // Final diagnostic dump
+ lastUncommitted = countRecords(brokerList, "read_uncommitted");
+ System.err.println("[ExactlyOnce] VERIFICATION FAILED after 60s:
read_committed=" + lastCommitted
+ + ", read_uncommitted=" + lastUncommitted);
+ System.err.println("[ExactlyOnce] This indicates transaction markers were
NOT propagated to data partitions.");
+ System.err.println("[ExactlyOnce] Proceeding anyway - Pinot consumer will
also not see records.");
+ }
+
+ /**
+ * Push Avro records to Kafka within a transaction. Does NOT call
initTransactions().
+ * Returns the number of records sent.
+ */
+ private long pushAvroRecords(KafkaProducer<byte[], byte[]> producer,
List<File> avroFiles, boolean commit)
+ throws Exception {
+ int maxMessagesPerTransaction =
+ getMaxNumKafkaMessagesPerBatch() > 0 ?
getMaxNumKafkaMessagesPerBatch() : Integer.MAX_VALUE;
+ long counter = 0;
+ int recordsInTransaction = 0;
+ boolean hasOpenTransaction = false;
+ byte[] header = getKafkaMessageHeader();
+ String partitionColumn = getPartitionColumn();
+
+ try (ByteArrayOutputStream outputStream = new
ByteArrayOutputStream(65536)) {
+ for (File avroFile : avroFiles) {
+ try (DataFileStream<GenericRecord> reader =
AvroUtils.getAvroReader(avroFile)) {
+ BinaryEncoder binaryEncoder = new
EncoderFactory().directBinaryEncoder(outputStream, null);
+ GenericDatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<>(reader.getSchema());
+ for (GenericRecord genericRecord : reader) {
+ if (!hasOpenTransaction) {
+ producer.beginTransaction();
+ hasOpenTransaction = true;
+ recordsInTransaction = 0;
+ }
+
+ outputStream.reset();
+ if (header != null && header.length > 0) {
+ outputStream.write(header);
+ }
+ datumWriter.write(genericRecord, binaryEncoder);
+ binaryEncoder.flush();
+
+ byte[] keyBytes = (partitionColumn == null) ?
Longs.toByteArray(counter++)
+ : genericRecord.get(partitionColumn).toString().getBytes();
Review Comment:
`String.getBytes()` uses the platform default charset, which can vary across
environments/CI images. Use an explicit charset (e.g., UTF-8) to make the
produced keys deterministic across platforms.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java:
##########
@@ -80,14 +94,183 @@ protected long getDocsLoadedTimeoutMs() {
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {
String kafkaBrokerList = getKafkaBrokerList();
- // the first transaction of kafka messages are aborted
- ClusterIntegrationTestUtils
- .pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList,
getKafkaTopic(),
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn(), false);
- // the second transaction of kafka messages are committed
- ClusterIntegrationTestUtils
- .pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList,
getKafkaTopic(),
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn(), true);
+ // Use System.err for diagnostics - log4j2 console appender is filtered to
ERROR in CI
+ System.err.println("[ExactlyOnce] Pushing transactional data to Kafka at:
" + kafkaBrokerList);
+ System.err.println("[ExactlyOnce] Avro files count: " + avroFiles.size());
+
+ Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaBrokerList);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+ producerProps.put(ProducerConfig.RETRIES_CONFIG,
Integer.toString(Integer.MAX_VALUE));
+ producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
"5");
+ producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "600000");
+ producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"test-transaction-" + UUID.randomUUID());
+
+ // Use a SINGLE producer for both abort and commit transactions.
+ // With a single producer, the coordinator's state machine ensures that
after
+ // abortTransaction() returns, it returns CONCURRENT_TRANSACTIONS for any
new
+ // transaction operations until the abort is fully done (markers written).
+ try (KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps)) {
+ producer.initTransactions();
+ System.err.println("[ExactlyOnce] initTransactions() succeeded");
+
+ // Transaction 1: aborted batch
+ long abortedCount = pushAvroRecords(producer, avroFiles, false);
+ System.err.println("[ExactlyOnce] Aborted batch: " + abortedCount + "
records");
+
+ // Transaction 2: committed batch
+ long committedCount = pushAvroRecords(producer, avroFiles, true);
+ System.err.println("[ExactlyOnce] Committed batch: " + committedCount +
" records");
+ }
+
+ // After producer is closed, verify data visibility with independent
consumers
+ System.err.println("[ExactlyOnce] Producer closed. Verifying data
visibility...");
+ waitForCommittedRecordsVisible(kafkaBrokerList);
+ }
+
+ /**
+ * Wait for committed records to be visible to a read_committed consumer.
+ * This ensures transaction markers have been fully propagated before
returning.
+ */
+ private void waitForCommittedRecordsVisible(String brokerList) {
+ long deadline = System.currentTimeMillis() + 60_000L;
+ int lastCommitted = 0;
+ int lastUncommitted = 0;
+ int iteration = 0;
+
+ while (System.currentTimeMillis() < deadline) {
+ iteration++;
+ lastCommitted = countRecords(brokerList, "read_committed");
+ if (lastCommitted > 0) {
+ System.err.println("[ExactlyOnce] Verification OK: read_committed=" +
lastCommitted
+ + " after " + iteration + " iterations");
+ return;
+ }
+ // Check if data reached Kafka at all
+ if (iteration == 1 || iteration % 5 == 0) {
+ lastUncommitted = countRecords(brokerList, "read_uncommitted");
+ System.err.println("[ExactlyOnce] Verification iteration " + iteration
+ + ": read_committed=" + lastCommitted + ", read_uncommitted=" +
lastUncommitted);
+ }
+ try {
+ Thread.sleep(2_000L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ // Final diagnostic dump
+ lastUncommitted = countRecords(brokerList, "read_uncommitted");
+ System.err.println("[ExactlyOnce] VERIFICATION FAILED after 60s:
read_committed=" + lastCommitted
+ + ", read_uncommitted=" + lastUncommitted);
+ System.err.println("[ExactlyOnce] This indicates transaction markers were
NOT propagated to data partitions.");
+ System.err.println("[ExactlyOnce] Proceeding anyway - Pinot consumer will
also not see records.");
+ }
+
+ /**
+ * Push Avro records to Kafka within a transaction. Does NOT call
initTransactions().
+ * Returns the number of records sent.
+ */
+ private long pushAvroRecords(KafkaProducer<byte[], byte[]> producer,
List<File> avroFiles, boolean commit)
+ throws Exception {
+ int maxMessagesPerTransaction =
+ getMaxNumKafkaMessagesPerBatch() > 0 ?
getMaxNumKafkaMessagesPerBatch() : Integer.MAX_VALUE;
+ long counter = 0;
+ int recordsInTransaction = 0;
+ boolean hasOpenTransaction = false;
+ byte[] header = getKafkaMessageHeader();
+ String partitionColumn = getPartitionColumn();
+
+ try (ByteArrayOutputStream outputStream = new
ByteArrayOutputStream(65536)) {
+ for (File avroFile : avroFiles) {
+ try (DataFileStream<GenericRecord> reader =
AvroUtils.getAvroReader(avroFile)) {
+ BinaryEncoder binaryEncoder = new
EncoderFactory().directBinaryEncoder(outputStream, null);
+ GenericDatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<>(reader.getSchema());
+ for (GenericRecord genericRecord : reader) {
+ if (!hasOpenTransaction) {
+ producer.beginTransaction();
+ hasOpenTransaction = true;
+ recordsInTransaction = 0;
+ }
+
+ outputStream.reset();
+ if (header != null && header.length > 0) {
+ outputStream.write(header);
+ }
+ datumWriter.write(genericRecord, binaryEncoder);
+ binaryEncoder.flush();
+
+ byte[] keyBytes = (partitionColumn == null) ?
Longs.toByteArray(counter++)
+ : genericRecord.get(partitionColumn).toString().getBytes();
+ byte[] bytes = outputStream.toByteArray();
+ producer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes,
bytes));
+
+ recordsInTransaction++;
+ if (recordsInTransaction >= maxMessagesPerTransaction) {
+ if (commit) {
+ producer.commitTransaction();
+ } else {
+ producer.abortTransaction();
+ }
+ hasOpenTransaction = false;
+ }
Review Comment:
`pushAvroRecords(...)` returns `counter`, but `counter` is only incremented
when `partitionColumn == null`. When a partition column is configured, the
method will return `0` even though records were sent, which makes the
aborted/committed counts incorrect and can mislead diagnostics. Consider
incrementing a dedicated record counter on every record and returning that,
while using a separate counter (if needed) for synthetic keys.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java:
##########
@@ -80,14 +94,183 @@ protected long getDocsLoadedTimeoutMs() {
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {
String kafkaBrokerList = getKafkaBrokerList();
- // the first transaction of kafka messages are aborted
- ClusterIntegrationTestUtils
- .pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList,
getKafkaTopic(),
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn(), false);
- // the second transaction of kafka messages are committed
- ClusterIntegrationTestUtils
- .pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList,
getKafkaTopic(),
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn(), true);
+ // Use System.err for diagnostics - log4j2 console appender is filtered to
ERROR in CI
+ System.err.println("[ExactlyOnce] Pushing transactional data to Kafka at:
" + kafkaBrokerList);
+ System.err.println("[ExactlyOnce] Avro files count: " + avroFiles.size());
+
+ Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaBrokerList);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+ producerProps.put(ProducerConfig.RETRIES_CONFIG,
Integer.toString(Integer.MAX_VALUE));
+ producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
"5");
+ producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "600000");
+ producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"test-transaction-" + UUID.randomUUID());
+
+ // Use a SINGLE producer for both abort and commit transactions.
+ // With a single producer, the coordinator's state machine ensures that
after
+ // abortTransaction() returns, it returns CONCURRENT_TRANSACTIONS for any
new
+ // transaction operations until the abort is fully done (markers written).
+ try (KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps)) {
+ producer.initTransactions();
+ System.err.println("[ExactlyOnce] initTransactions() succeeded");
+
+ // Transaction 1: aborted batch
+ long abortedCount = pushAvroRecords(producer, avroFiles, false);
+ System.err.println("[ExactlyOnce] Aborted batch: " + abortedCount + "
records");
+
+ // Transaction 2: committed batch
+ long committedCount = pushAvroRecords(producer, avroFiles, true);
+ System.err.println("[ExactlyOnce] Committed batch: " + committedCount +
" records");
+ }
+
+ // After producer is closed, verify data visibility with independent
consumers
+ System.err.println("[ExactlyOnce] Producer closed. Verifying data
visibility...");
+ waitForCommittedRecordsVisible(kafkaBrokerList);
+ }
+
+ /**
+ * Wait for committed records to be visible to a read_committed consumer.
+ * This ensures transaction markers have been fully propagated before
returning.
+ */
+ private void waitForCommittedRecordsVisible(String brokerList) {
+ long deadline = System.currentTimeMillis() + 60_000L;
+ int lastCommitted = 0;
+ int lastUncommitted = 0;
+ int iteration = 0;
+
+ while (System.currentTimeMillis() < deadline) {
+ iteration++;
+ lastCommitted = countRecords(brokerList, "read_committed");
+ if (lastCommitted > 0) {
+ System.err.println("[ExactlyOnce] Verification OK: read_committed=" +
lastCommitted
+ + " after " + iteration + " iterations");
+ return;
+ }
+ // Check if data reached Kafka at all
+ if (iteration == 1 || iteration % 5 == 0) {
+ lastUncommitted = countRecords(brokerList, "read_uncommitted");
+ System.err.println("[ExactlyOnce] Verification iteration " + iteration
+ + ": read_committed=" + lastCommitted + ", read_uncommitted=" +
lastUncommitted);
+ }
+ try {
+ Thread.sleep(2_000L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ // Final diagnostic dump
+ lastUncommitted = countRecords(brokerList, "read_uncommitted");
+ System.err.println("[ExactlyOnce] VERIFICATION FAILED after 60s:
read_committed=" + lastCommitted
+ + ", read_uncommitted=" + lastUncommitted);
+ System.err.println("[ExactlyOnce] This indicates transaction markers were
NOT propagated to data partitions.");
+ System.err.println("[ExactlyOnce] Proceeding anyway - Pinot consumer will
also not see records.");
Review Comment:
The verification helper logs a failure but then continues the test. If
committed records never become visible, subsequent failures are likely to be
noisier and farther from the root cause. Consider failing fast here (e.g.,
throwing an assertion/error) so the test output clearly indicates “transaction
markers not propagated” as the primary failure.
```suggestion
throw new AssertionError("[ExactlyOnce] Transaction markers were not
propagated within 60s; "
+ "committed records are not visible to read_committed consumers.");
```
##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java:
##########
@@ -816,108 +815,13 @@ protected void startKafka() {
protected void startKafkaWithoutTopic() {
int requestedBrokers = getNumKafkaBrokers();
- List<KafkaBrokerConfig> brokerConfigs =
getOrCreateKafkaBrokerConfigs(requestedBrokers);
- Throwable lastFailure = null;
- for (int attempt = 1; attempt <= KAFKA_START_MAX_ATTEMPTS; attempt++) {
- String clusterId = UUID.randomUUID().toString().replace("-", "");
- String networkName = "pinot-it-kafka-" +
UUID.randomUUID().toString().replace("-", "");
- String quorumVoters = brokerConfigs.stream()
- .map(config -> config._brokerId + "@" + config._containerName +
":9093")
- .collect(Collectors.joining(","));
-
- List<StreamDataServerStartable> kafkaStarters = new
ArrayList<>(requestedBrokers);
- try {
- for (KafkaBrokerConfig brokerConfig : brokerConfigs) {
- StreamDataServerStartable kafkaStarter =
- createKafkaServerStarter(brokerConfig, clusterId, networkName,
quorumVoters, requestedBrokers);
- kafkaStarter.start();
- kafkaStarters.add(kafkaStarter);
- }
- _kafkaStarters = kafkaStarters;
- waitForKafkaClusterReady(getKafkaBrokerList(), requestedBrokers,
useKafkaTransaction());
- if (attempt > 1) {
- LOGGER.info("Kafka startup succeeded on retry attempt {}/{}",
attempt, KAFKA_START_MAX_ATTEMPTS);
- }
- return;
- } catch (Throwable t) {
- if (t instanceof Error && !(t instanceof AssertionError)) {
- throw (Error) t;
- }
-
- lastFailure = t;
- LOGGER.warn("Kafka startup attempt {}/{} failed; stopping started
brokers before retry", attempt,
- KAFKA_START_MAX_ATTEMPTS, t);
- _kafkaStarters = kafkaStarters;
- try {
- stopKafka();
- } catch (RuntimeException stopException) {
- LOGGER.warn("Kafka cleanup failed after startup attempt {}/{}",
attempt, KAFKA_START_MAX_ATTEMPTS,
- stopException);
- t.addSuppressed(stopException);
- }
-
- if (attempt < KAFKA_START_MAX_ATTEMPTS) {
- try {
- Thread.sleep(KAFKA_START_RETRY_WAIT_MS);
- } catch (InterruptedException interruptedException) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted while waiting to retry
Kafka startup", interruptedException);
- }
- }
- }
- }
-
- _kafkaBrokerConfigs = null;
- throw new RuntimeException("Failed to start Kafka cluster after " +
KAFKA_START_MAX_ATTEMPTS + " attempts",
- lastFailure);
- }
-
- private List<KafkaBrokerConfig> getOrCreateKafkaBrokerConfigs(int
brokerCount) {
- if (_kafkaBrokerConfigs != null && _kafkaBrokerConfigs.size() ==
brokerCount) {
- return _kafkaBrokerConfigs;
- }
- _kafkaBrokerConfigs = createKafkaBrokerConfigs(brokerCount);
- return _kafkaBrokerConfigs;
- }
-
- private StreamDataServerStartable createKafkaServerStarter(KafkaBrokerConfig
brokerConfig, String clusterId,
- String networkName, String quorumVoters, int clusterSize) {
- Properties serverProperties = new Properties();
- serverProperties.put("kafka.server.owner.name",
getClass().getSimpleName());
- serverProperties.put("kafka.server.bootstrap.servers", "localhost:" +
brokerConfig._port);
- serverProperties.put("kafka.server.port",
Integer.toString(brokerConfig._port));
- serverProperties.put("kafka.server.broker.id",
Integer.toString(brokerConfig._brokerId));
- serverProperties.put("kafka.server.allow.managed.for.configured.broker",
"true");
- serverProperties.put("kafka.server.container.name",
brokerConfig._containerName);
- serverProperties.put("kafka.server.network.name", networkName);
- serverProperties.put("kafka.server.cluster.id", clusterId);
- serverProperties.put("kafka.server.cluster.size",
Integer.toString(clusterSize));
- serverProperties.put("kafka.server.controller.quorum.voters",
quorumVoters);
- serverProperties.put("kafka.server.internal.host",
brokerConfig._containerName);
- serverProperties.put("kafka.server.skip.readiness.check", "true");
- KafkaServerStartable kafkaServerStartable = new KafkaServerStartable();
- kafkaServerStartable.init(serverProperties);
- return kafkaServerStartable;
- }
-
- private List<KafkaBrokerConfig> createKafkaBrokerConfigs(int brokerCount) {
- String containerPrefix = "pinot-it-kafka-" +
UUID.randomUUID().toString().replace("-", "");
- List<KafkaBrokerConfig> brokerConfigs = new ArrayList<>(brokerCount);
- for (int i = 0; i < brokerCount; i++) {
- int brokerId = i + 1;
- int port = getAvailablePort();
- String containerName = containerPrefix + "-" + brokerId;
- brokerConfigs.add(new KafkaBrokerConfig(brokerId, port, containerName));
- }
- return brokerConfigs;
- }
-
- private static int getAvailablePort() {
- try (ServerSocket socket = new ServerSocket(0)) {
- return socket.getLocalPort();
- } catch (IOException e) {
- throw new RuntimeException("Failed to find an available port for Kafka",
e);
- }
+ Properties props = new Properties();
+ props.setProperty(EmbeddedKafkaCluster.BROKER_COUNT_PROP,
Integer.toString(requestedBrokers));
+ EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
+ cluster.init(props);
+ cluster.start();
+ _kafkaStarters = Collections.singletonList(cluster);
+ waitForKafkaClusterReady(getKafkaBrokerList(), requestedBrokers,
useKafkaTransaction());
Review Comment:
If `waitForKafkaClusterReady(...)` throws, the embedded cluster remains
running and `_kafkaStarters` stays set, which can leak resources and interfere
with later tests. Consider wrapping the readiness wait in a try/catch that
stops the cluster and clears `_kafkaStarters` on failure (or using a
try/finally pattern around initialization to guarantee cleanup).
```suggestion
try {
waitForKafkaClusterReady(getKafkaBrokerList(), requestedBrokers,
useKafkaTransaction());
} catch (RuntimeException e) {
try {
cluster.stop();
} catch (Exception suppressed) {
// Ignore exceptions during cleanup to preserve original failure
}
_kafkaStarters = null;
throw e;
}
```
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java:
##########
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka30.server;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * In-process embedded Kafka cluster using KRaft mode for integration tests.
+ * Eliminates Docker dependency and provides fast, reliable Kafka for testing.
+ */
+public class EmbeddedKafkaCluster implements StreamDataServerStartable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
+
+ public static final String BROKER_COUNT_PROP = "embedded.kafka.broker.count";
+
+ private static final int TOPIC_MUTATION_RETRIES = 5;
+
+ private int _brokerCount = 1;
+ private KafkaClusterTestKit _cluster;
+ private String _bootstrapServers;
+
+ @Override
+ public void init(Properties props) {
+ _brokerCount = Integer.parseInt(props.getProperty(BROKER_COUNT_PROP, "1"));
+ }
+
+ @Override
+ public void start() {
+ try {
+ int replicationFactor = Math.min(3, _brokerCount);
+
+ TestKitNodes nodes = new TestKitNodes.Builder()
+ .setCombined(true)
+ .setNumBrokerNodes(_brokerCount)
+ .setNumControllerNodes(1)
+ .setPerServerProperties(Collections.emptyMap())
+ .setBootstrapMetadataVersion(MetadataVersion.latestProduction())
+ .build();
+
+ _cluster = new KafkaClusterTestKit.Builder(nodes)
+ .setConfigProp("offsets.topic.replication.factor",
String.valueOf(replicationFactor))
+ .setConfigProp("offsets.topic.num.partitions", "1")
+ .setConfigProp("transaction.state.log.replication.factor",
String.valueOf(replicationFactor))
+ .setConfigProp("transaction.state.log.min.isr", "1")
+ .setConfigProp("transaction.state.log.num.partitions", "1")
+ .setConfigProp("group.initial.rebalance.delay.ms", "0")
+ .setConfigProp("log.flush.interval.messages", "1")
+ .build();
+
+ _cluster.format();
+ _cluster.startup();
+ _cluster.waitForReadyBrokers();
+ _bootstrapServers = _cluster.bootstrapServers();
+ LOGGER.info("Embedded Kafka cluster started with {} broker(s) at {}",
_brokerCount, _bootstrapServers);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start embedded Kafka cluster", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (_cluster != null) {
+ try {
+ _cluster.close();
+ LOGGER.info("Embedded Kafka cluster stopped");
+ } catch (Exception e) {
+ LOGGER.warn("Failed to stop embedded Kafka cluster cleanly", e);
+ } finally {
+ _cluster = null;
+ _bootstrapServers = null;
+ }
+ }
+ }
+
+ /**
+ * Returns the full bootstrap servers string (e.g.
"localhost:12345,localhost:12346").
+ */
+ public String bootstrapServers() {
+ return _bootstrapServers;
+ }
+
+ @Override
+ public int getPort() {
+ if (_bootstrapServers == null) {
+ throw new IllegalStateException("Embedded Kafka cluster is not started");
+ }
+ // Parse the port from the first broker in the bootstrap servers string
+ String firstBroker = _bootstrapServers.split(",")[0];
+ return Integer.parseInt(firstBroker.substring(firstBroker.lastIndexOf(':')
+ 1));
+ }
+
+ @Override
+ public void createTopic(String topic, Properties topicProps) {
+ int numPartitions =
Integer.parseInt(String.valueOf(topicProps.getOrDefault("partition", "1")));
+ int requestedReplicationFactor = Integer.parseInt(
+ String.valueOf(topicProps.getOrDefault("replicationFactor", "1")));
+ short replicationFactor = (short) Math.max(1, Math.min(_brokerCount,
requestedReplicationFactor));
+ try (AdminClient adminClient = createAdminClient()) {
+ NewTopic newTopic = new NewTopic(topic, numPartitions,
replicationFactor);
+ runAdminWithRetry(() ->
adminClient.createTopics(Collections.singletonList(newTopic)).all().get(),
+ "create topic: " + topic);
+ } catch (Exception e) {
+ if (e instanceof ExecutionException
+ && e.getCause() instanceof
org.apache.kafka.common.errors.TopicExistsException) {
+ return;
+ }
+ throw new RuntimeException("Failed to create topic: " + topic, e);
+ }
+ }
+
+ @Override
+ public void deleteTopic(String topic) {
+ try (AdminClient adminClient = createAdminClient()) {
+ runAdminWithRetry(() ->
adminClient.deleteTopics(Collections.singletonList(topic)).all().get(),
+ "delete topic: " + topic);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to delete topic: " + topic, e);
+ }
+ }
+
+ @Override
+ public void createPartitions(String topic, int numPartitions) {
+ try (AdminClient adminClient = createAdminClient()) {
+ runAdminWithRetry(() -> {
+ adminClient.createPartitions(Collections.singletonMap(topic,
NewPartitions.increaseTo(numPartitions)))
+ .all().get();
+ return null;
+ }, "create partitions for topic: " + topic);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create partitions for topic: " +
topic, e);
+ }
+ }
+
+ @Override
+ public void deleteRecordsBeforeOffset(String topic, int partition, long
offset) {
+ TopicPartition topicPartition = new TopicPartition(topic, partition);
+ try (AdminClient adminClient = createAdminClient()) {
+ runAdminWithRetry(() -> {
+ adminClient.deleteRecords(Collections.singletonMap(topicPartition,
RecordsToDelete.beforeOffset(offset)))
+ .all().get();
+ return null;
+ }, "delete records before offset for topic: " + topic + ", partition: "
+ partition);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to delete records before offset for
topic: " + topic
+ + ", partition: " + partition + ", offset: " + offset, e);
+ }
+ }
+
+ private AdminClient createAdminClient() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", _bootstrapServers);
+ return AdminClient.create(props);
+ }
+
+ private <T> T runAdminWithRetry(AdminOperation<T> operation, String action)
+ throws Exception {
+ for (int attempt = 1; attempt <= TOPIC_MUTATION_RETRIES; attempt++) {
+ try {
+ return operation.execute();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof
org.apache.kafka.common.errors.TimeoutException
+ && attempt < TOPIC_MUTATION_RETRIES) {
+ Thread.sleep(1000L);
+ continue;
+ }
+ throw e;
+ }
+ }
+ throw new IllegalStateException("Failed to " + action + " after retries");
Review Comment:
As written, the `throw new IllegalStateException(...)` after the loop is
effectively unreachable: the loop either returns or throws on the last attempt
(including the timeout case when `attempt == TOPIC_MUTATION_RETRIES`). Consider
restructuring to keep/throw the last exception after retries, and also ensure
that if `Thread.sleep(...)` is interrupted, the thread interrupt status is
preserved (re-interrupt before propagating).
```suggestion
ExecutionException lastException = null;
for (int attempt = 1; attempt <= TOPIC_MUTATION_RETRIES; attempt++) {
try {
return operation.execute();
} catch (ExecutionException e) {
if (e.getCause() instanceof
org.apache.kafka.common.errors.TimeoutException) {
lastException = e;
if (attempt < TOPIC_MUTATION_RETRIES) {
try {
Thread.sleep(1000L);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw ie;
}
continue;
} else {
break;
}
}
throw e;
}
}
throw new IllegalStateException("Failed to " + action + " after
retries", lastException);
```
##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java:
##########
@@ -1069,73 +966,11 @@ private boolean isKafkaTopicReadyOnBroker(String
brokerList, String topic, int e
}
protected void stopKafka() {
- if (_kafkaStarters == null || _kafkaStarters.isEmpty()) {
- return;
- }
- List<StreamDataServerStartable> kafkaStarters = _kafkaStarters;
- _kafkaStarters = null;
-
- RuntimeException stopException = null;
- for (int i = kafkaStarters.size() - 1; i >= 0; i--) {
- StreamDataServerStartable kafkaStarter = kafkaStarters.get(i);
- try {
- kafkaStarter.stop();
- } catch (Exception e) {
- RuntimeException wrapped = new RuntimeException(
- "Failed to stop Kafka broker on port: " + kafkaStarter.getPort(),
e);
- if (stopException == null) {
- stopException = wrapped;
- } else {
- stopException.addSuppressed(wrapped);
- }
- }
- }
-
- for (StreamDataServerStartable kafkaStarter : kafkaStarters) {
- try {
- waitForKafkaBrokerStopped(kafkaStarter.getPort());
- } catch (RuntimeException e) {
- if (stopException == null) {
- stopException = e;
- } else {
- stopException.addSuppressed(e);
- }
+ if (_kafkaStarters != null) {
+ for (StreamDataServerStartable starter : _kafkaStarters) {
+ starter.stop();
}
- }
-
- if (stopException != null) {
- throw stopException;
- }
- }
-
- private void waitForKafkaBrokerStopped(int brokerPort) {
- String brokerList = "localhost:" + brokerPort;
- TestUtils.waitForCondition(aVoid -> !isKafkaBrokerAvailable(brokerList),
200L, 60_000L,
- "Kafka broker is still reachable after stop: " + brokerList);
- }
-
- private boolean isKafkaBrokerAvailable(String brokerList) {
- Properties adminProps = new Properties();
- adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- adminProps.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
- adminProps.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "2000");
- try (AdminClient adminClient = AdminClient.create(adminProps)) {
- adminClient.describeCluster().nodes().get(2, TimeUnit.SECONDS);
- return true;
- } catch (Exception e) {
- return false;
- }
- }
-
- private static final class KafkaBrokerConfig {
- private final int _brokerId;
- private final int _port;
- private final String _containerName;
-
- private KafkaBrokerConfig(int brokerId, int port, String containerName) {
- _brokerId = brokerId;
- _port = port;
- _containerName = containerName;
+ _kafkaStarters = null;
}
Review Comment:
If any `starter.stop()` throws, `_kafkaStarters` won’t be set to null and
subsequent starters won’t be stopped, potentially leaving embedded brokers
running. Consider stopping all starters while collecting exceptions, and
nulling `_kafkaStarters` in a `finally` block to make cleanup robust.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]