[ https://issues.apache.org/jira/browse/KAFKA-7140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577062#comment-16577062 ]
ASF GitHub Bot commented on KAFKA-7140: --------------------------------------- hachikuji closed pull request #5319: KAFKA-7140: Remove deprecated poll usages URL: https://github.com/apache/kafka/pull/5319 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 47f8529e2d1..692331ed13f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -53,6 +53,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -441,7 +442,7 @@ public String toString() { } private ConsumerRecords<byte[], byte[]> pollConsumer(long timeoutMs) { - ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs); + ConsumerRecords<byte[], byte[]> msgs = consumer.poll(Duration.ofMillis(timeoutMs)); // Exceptions raised from the task during a rebalance should be rethrown to stop the worker if (rebalanceException != null) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index de1ceb3be10..ea9b4c621f9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Iterator; @@ -253,7 +254,7 @@ public void send(K key, V value, org.apache.kafka.clients.producer.Callback call private void poll(long timeoutMs) { try { - ConsumerRecords<K, V> records = consumer.poll(timeoutMs); + ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(timeoutMs)); for (ConsumerRecord<K, V> record : records) consumedCallback.onCompletion(null, record); } catch (WakeupException e) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 1bf9c717068..6d92c34adef 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -65,6 +65,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -180,8 +181,8 @@ public void testErrorHandlingInSinkTasks() throws Exception { // bad json ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(TOPIC, PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes()); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record1)); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record2)); + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record1)); + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record2)); sinkTask.put(EasyMock.anyObject()); EasyMock.expectLastCall().times(2); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 4a7c760fc74..33ab2ef06e0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -58,6 +58,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -458,7 +459,7 @@ public void testWakeupInCommitSyncCausesRetry() throws Exception { sinkTask.open(partitions); EasyMock.expectLastCall(); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { @@ -893,7 +894,7 @@ public void run() { // Expect the next poll to discover and perform the rebalance, THEN complete the previous callback handler, // and then return one record for TP1 and one for TP3. final AtomicBoolean rebalanced = new AtomicBoolean(); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { @@ -1273,7 +1274,7 @@ private void expectRebalanceRevocationError(RuntimeException e) { sinkTask.preCommit(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); EasyMock.expectLastCall().andReturn(Collections.emptyMap()); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { @@ -1298,7 +1299,7 @@ private void expectRebalanceAssignmentError(RuntimeException e) { sinkTask.open(partitions); EasyMock.expectLastCall().andThrow(e); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { @@ -1315,7 +1316,7 @@ private void expectPollInitialAssignment() { sinkTask.open(partitions); EasyMock.expectLastCall(); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() { + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { rebalanceListener.getValue().onPartitionsAssigned(partitions); @@ -1332,7 +1333,7 @@ private void expectPollInitialAssignment() { private void expectConsumerWakeup() { consumer.wakeup(); EasyMock.expectLastCall(); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andThrow(new WakeupException()); + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andThrow(new WakeupException()); } private void expectConsumerPoll(final int numMessages) { @@ -1340,7 +1341,7 @@ private void expectConsumerPoll(final int numMessages) { } private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) { - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 73689d35710..d0089e92b6b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -55,6 +55,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -525,7 +526,7 @@ private void expectPollInitialAssignment() throws Exception { sinkTask.open(partitions); EasyMock.expectLastCall(); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() { + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { rebalanceListener.getValue().onPartitionsAssigned(partitions); @@ -557,7 +558,7 @@ private void expectStopTask() throws Exception { private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) throws Exception { // Stub out all the consumer stream/iterator responses, which we just want to verify occur, // but don't care about the exact details here. - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andStubAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { @@ -595,7 +596,7 @@ public SinkRecord answer() { // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of // returning empty data, we return one record. The expectation is that the data will be ignored by the // response behavior specified using the return value of this method. - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { @@ -625,7 +626,7 @@ public SinkRecord answer() { final Map<TopicPartition, Long> offsets = new HashMap<>(); offsets.put(TOPIC_PARTITION, startOffset); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 7e2c5644a3b..365652a75b5 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -19,6 +19,7 @@ package kafka.tools import java.io.PrintStream import java.nio.charset.StandardCharsets +import java.time.Duration import java.util.concurrent.CountDownLatch import java.util.regex.Pattern import java.util.{Collections, Locale, Properties, Random} @@ -388,7 +389,7 @@ object ConsoleConsumer extends Logging { private[tools] class ConsumerWrapper(topic: Option[String], partitionId: Option[Int], offset: Option[Long], whitelist: Option[String], consumer: Consumer[Array[Byte], Array[Byte]], val timeoutMs: Long = Long.MaxValue) { consumerInit() - var recordIter = consumer.poll(0).iterator + var recordIter = Collections.emptyList[ConsumerRecord[Array[Byte], Array[Byte]]]().iterator() def consumerInit() { (topic, partitionId, offset, whitelist) match { @@ -432,7 +433,7 @@ object ConsoleConsumer extends Logging { def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = { if (!recordIter.hasNext) { - recordIter = consumer.poll(timeoutMs).iterator + recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator if (!recordIter.hasNext) throw new TimeoutException() } diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 5af55a8d7f1..2e7b8ddf094 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -28,8 +28,8 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{Metric, MetricName, TopicPartition} import kafka.utils.{CommandLineUtils, ToolsUtils} import java.util.{Collections, Properties, Random} - import java.text.SimpleDateFormat +import java.time.Duration import com.typesafe.scalalogging.LazyLogging @@ -127,7 +127,7 @@ object ConsumerPerformance extends LazyLogging { var currentTimeMillis = lastConsumedTime while (messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout) { - val records = consumer.poll(100).asScala + val records = consumer.poll(Duration.ofMillis(100)).asScala currentTimeMillis = System.currentTimeMillis if (records.nonEmpty) lastConsumedTime = currentTimeMillis diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala index 3beaf827f59..4849b1ed8c6 100755 --- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -18,11 +18,13 @@ package kafka.tools import java.nio.charset.StandardCharsets -import java.util.{Arrays, Collections, Properties} +import java.time.Duration +import java.util.{Arrays, Properties} import kafka.utils.Exit import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer._ +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Utils import scala.collection.JavaConverters._ @@ -69,9 +71,7 @@ object EndToEndLatency { consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal batching - val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) - consumer.subscribe(Collections.singletonList(topic)) val producerProps = loadProps producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) @@ -82,16 +82,21 @@ object EndToEndLatency { producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps) + // sends a dummy message to create the topic if it doesn't exist + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, Array[Byte]())).get() + def finalise() { consumer.commitSync() producer.close() consumer.close() } - //Ensure we are at latest offset. seekToEnd evaluates lazily, that is to say actually performs the seek only when - //a poll() or position() request is issued. Hence we need to poll after we seek to ensure we see our first write. - consumer.seekToEnd(Collections.emptyList()) - consumer.poll(0) + + val topicPartitions = consumer.partitionsFor(topic).asScala + .map(p => new TopicPartition(p.topic(), p.partition())).asJava + consumer.assign(topicPartitions) + consumer.seekToEnd(topicPartitions) + consumer.assignment().asScala.foreach(consumer.position) var totalTime = 0.0 val latencies = new Array[Long](numMessages) @@ -103,7 +108,7 @@ object EndToEndLatency { //Send message (of random bytes) synchronously then immediately poll for it producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)).get() - val recordIter = consumer.poll(timeout).iterator + val recordIter = consumer.poll(Duration.ofMillis(timeout)).iterator val elapsed = System.nanoTime - begin diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index d7e09e4efdb..d55d96bd65b 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,6 +17,7 @@ package kafka.tools +import java.time.Duration import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -452,7 +453,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // uncommitted record since last poll. Using one second as poll's timeout ensures that // offsetCommitIntervalMs, of value greater than 1 second, does not see delays in offset // commit. - recordIter = consumer.poll(1000).iterator + recordIter = consumer.poll(Duration.ofSeconds(1)).iterator if (!recordIter.hasNext) throw new NoRecordsException } diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 3c045c69eb2..09d3b9ea9a1 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -47,6 +47,7 @@ import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -57,6 +58,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch. @@ -313,10 +315,12 @@ private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consum config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { - client.subscribe(topicsToSubscribe); - client.poll(1); + Collection<TopicPartition> partitions = topicsToSubscribe.stream().map(client::partitionsFor) + .flatMap(Collection::stream) + .map(info -> new TopicPartition(info.topic(), info.partition())) + .collect(Collectors.toList()); + client.assign(partitions); - final Set<TopicPartition> partitions = client.assignment(); final Set<TopicPartition> inputTopicPartitions = new HashSet<>(); final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>(); diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index be062b309df..26d6e23a3f8 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import java.time.Duration; import java.util.Collections; import java.util.Properties; @@ -47,7 +48,7 @@ public Consumer(String topic) { @Override public void doWork() { consumer.subscribe(Collections.singletonList(this.topic)); - ConsumerRecords<Integer, String> records = consumer.poll(1000); + ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index 0d74645379e..27e7c7fda5d 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.ProducerFencedException; import java.io.IOException; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -287,7 +288,7 @@ public void run() { try { producer.beginTransaction(); while (messagesInCurrentTransaction < numMessagesForNextTransaction) { - ConsumerRecords<String, String> records = consumer.poll(200L); + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200)); for (ConsumerRecord<String, String> record : records) { producer.send(producerRecordFromConsumerRecord(outputTopic, record)); messagesInCurrentTransaction++; diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index cc09b233167..58f34718b8a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -47,6 +47,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.PrintStream; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -220,7 +221,7 @@ public void run() { consumer.subscribe(Collections.singletonList(topic), this); while (!isFinished()) { - ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE)); Map<TopicPartition, OffsetAndMetadata> offsets = onRecordsReceived(records); if (!useAutoCommit) { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java index 1a852964070..c3a90e4da6a 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -38,6 +38,7 @@ import org.apache.kafka.trogdor.task.TaskWorker; +import java.time.Duration; import java.util.Collection; import java.util.HashSet; import java.util.Map; @@ -135,7 +136,7 @@ public Void call() throws Exception { long startBatchMs = startTimeMs; try { while (messagesConsumed < spec.maxMessages()) { - ConsumerRecords<byte[], byte[]> records = consumer.poll(50); + ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(50)); if (records.isEmpty()) { continue; } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java index 570f6a11e34..669fafcc75e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java @@ -50,6 +50,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -337,7 +338,7 @@ public void run() { while (true) { try { pollInvoked++; - ConsumerRecords<byte[], byte[]> records = consumer.poll(50); + ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(50)); for (Iterator<ConsumerRecord<byte[], byte[]>> iter = records.iterator(); iter.hasNext(); ) { ConsumerRecord<byte[], byte[]> record = iter.next(); int messageIndex = ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove deprecated poll usages > ----------------------------- > > Key: KAFKA-7140 > URL: https://issues.apache.org/jira/browse/KAFKA-7140 > Project: Kafka > Issue Type: Improvement > Reporter: Viktor Somogyi > Assignee: Viktor Somogyi > Priority: Minor > > There are a couple of poll(long) usages of the consumer in test and non-test > code. This jira would aim to remove the non-test usages of the method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)