[ 
https://issues.apache.org/jira/browse/KAFKA-7140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577062#comment-16577062
 ] 

ASF GitHub Bot commented on KAFKA-7140:
---------------------------------------

hachikuji closed pull request #5319: KAFKA-7140: Remove deprecated poll usages
URL: https://github.com/apache/kafka/pull/5319
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 47f8529e2d1..692331ed13f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -53,6 +53,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -441,7 +442,7 @@ public String toString() {
     }
 
     private ConsumerRecords<byte[], byte[]> pollConsumer(long timeoutMs) {
-        ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
+        ConsumerRecords<byte[], byte[]> msgs = 
consumer.poll(Duration.ofMillis(timeoutMs));
 
         // Exceptions raised from the task during a rebalance should be 
rethrown to stop the worker
         if (rebalanceException != null) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index de1ceb3be10..ea9b4c621f9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -35,6 +35,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -253,7 +254,7 @@ public void send(K key, V value, 
org.apache.kafka.clients.producer.Callback call
 
     private void poll(long timeoutMs) {
         try {
-            ConsumerRecords<K, V> records = consumer.poll(timeoutMs);
+            ConsumerRecords<K, V> records = 
consumer.poll(Duration.ofMillis(timeoutMs));
             for (ConsumerRecord<K, V> record : records)
                 consumedCallback.onCompletion(null, record);
         } catch (WakeupException e) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 1bf9c717068..6d92c34adef 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -65,6 +65,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -180,8 +181,8 @@ public void testErrorHandlingInSinkTasks() throws Exception 
{
         // bad json
         ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(TOPIC, 
PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes());
 
-        
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record1));
-        
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record2));
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record1));
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record2));
 
         sinkTask.put(EasyMock.anyObject());
         EasyMock.expectLastCall().times(2);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 4a7c760fc74..33ab2ef06e0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -58,6 +58,7 @@
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -458,7 +459,7 @@ public void testWakeupInCommitSyncCausesRetry() throws 
Exception {
         sinkTask.open(partitions);
         EasyMock.expectLastCall();
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
@@ -893,7 +894,7 @@ public void run() {
         // Expect the next poll to discover and perform the rebalance, THEN 
complete the previous callback handler,
         // and then return one record for TP1 and one for TP3.
         final AtomicBoolean rebalanced = new AtomicBoolean();
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
@@ -1273,7 +1274,7 @@ private void 
expectRebalanceRevocationError(RuntimeException e) {
         sinkTask.preCommit(EasyMock.<Map<TopicPartition, 
OffsetAndMetadata>>anyObject());
         EasyMock.expectLastCall().andReturn(Collections.emptyMap());
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
@@ -1298,7 +1299,7 @@ private void 
expectRebalanceAssignmentError(RuntimeException e) {
         sinkTask.open(partitions);
         EasyMock.expectLastCall().andThrow(e);
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
@@ -1315,7 +1316,7 @@ private void expectPollInitialAssignment() {
         sinkTask.open(partitions);
         EasyMock.expectLastCall();
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new 
IAnswer<ConsumerRecords<byte[], byte[]>>() {
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(new
 IAnswer<ConsumerRecords<byte[], byte[]>>() {
             @Override
             public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                 rebalanceListener.getValue().onPartitionsAssigned(partitions);
@@ -1332,7 +1333,7 @@ private void expectPollInitialAssignment() {
     private void expectConsumerWakeup() {
         consumer.wakeup();
         EasyMock.expectLastCall();
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andThrow(new 
WakeupException());
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andThrow(new
 WakeupException());
     }
 
     private void expectConsumerPoll(final int numMessages) {
@@ -1340,7 +1341,7 @@ private void expectConsumerPoll(final int numMessages) {
     }
 
     private void expectConsumerPoll(final int numMessages, final long 
timestamp, final TimestampType timestampType) {
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 73689d35710..d0089e92b6b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -55,6 +55,7 @@
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -525,7 +526,7 @@ private void expectPollInitialAssignment() throws Exception 
{
         sinkTask.open(partitions);
         EasyMock.expectLastCall();
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new 
IAnswer<ConsumerRecords<byte[], byte[]>>() {
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(new
 IAnswer<ConsumerRecords<byte[], byte[]>>() {
             @Override
             public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                 rebalanceListener.getValue().onPartitionsAssigned(partitions);
@@ -557,7 +558,7 @@ private void expectStopTask() throws Exception {
     private Capture<Collection<SinkRecord>> expectPolls(final long 
pollDelayMs) throws Exception {
         // Stub out all the consumer stream/iterator responses, which we just 
want to verify occur,
         // but don't care about the exact details here.
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andStubAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
@@ -595,7 +596,7 @@ public SinkRecord answer() {
         // Currently the SinkTask's put() method will not be invoked unless we 
provide some data, so instead of
         // returning empty data, we return one record. The expectation is that 
the data will be ignored by the
         // response behavior specified using the return value of this method.
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
@@ -625,7 +626,7 @@ public SinkRecord answer() {
         final Map<TopicPartition, Long> offsets = new HashMap<>();
         offsets.put(TOPIC_PARTITION, startOffset);
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 7e2c5644a3b..365652a75b5 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -19,6 +19,7 @@ package kafka.tools
 
 import java.io.PrintStream
 import java.nio.charset.StandardCharsets
+import java.time.Duration
 import java.util.concurrent.CountDownLatch
 import java.util.regex.Pattern
 import java.util.{Collections, Locale, Properties, Random}
@@ -388,7 +389,7 @@ object ConsoleConsumer extends Logging {
   private[tools] class ConsumerWrapper(topic: Option[String], partitionId: 
Option[Int], offset: Option[Long], whitelist: Option[String],
                                        consumer: Consumer[Array[Byte], 
Array[Byte]], val timeoutMs: Long = Long.MaxValue) {
     consumerInit()
-    var recordIter = consumer.poll(0).iterator
+    var recordIter = Collections.emptyList[ConsumerRecord[Array[Byte], 
Array[Byte]]]().iterator()
 
     def consumerInit() {
       (topic, partitionId, offset, whitelist) match {
@@ -432,7 +433,7 @@ object ConsoleConsumer extends Logging {
 
     def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = {
       if (!recordIter.hasNext) {
-        recordIter = consumer.poll(timeoutMs).iterator
+        recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator
         if (!recordIter.hasNext)
           throw new TimeoutException()
       }
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala 
b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 5af55a8d7f1..2e7b8ddf094 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -28,8 +28,8 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
 import kafka.utils.{CommandLineUtils, ToolsUtils}
 import java.util.{Collections, Properties, Random}
-
 import java.text.SimpleDateFormat
+import java.time.Duration
 
 import com.typesafe.scalalogging.LazyLogging
 
@@ -127,7 +127,7 @@ object ConsumerPerformance extends LazyLogging {
     var currentTimeMillis = lastConsumedTime
 
     while (messagesRead < count && currentTimeMillis - lastConsumedTime <= 
timeout) {
-      val records = consumer.poll(100).asScala
+      val records = consumer.poll(Duration.ofMillis(100)).asScala
       currentTimeMillis = System.currentTimeMillis
       if (records.nonEmpty)
         lastConsumedTime = currentTimeMillis
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala 
b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 3beaf827f59..4849b1ed8c6 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -18,11 +18,13 @@
 package kafka.tools
 
 import java.nio.charset.StandardCharsets
-import java.util.{Arrays, Collections, Properties}
+import java.time.Duration
+import java.util.{Arrays, Properties}
 
 import kafka.utils.Exit
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
@@ -69,9 +71,7 @@ object EndToEndLatency {
     consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
     consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure 
we have no temporal batching
-
     val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
-    consumer.subscribe(Collections.singletonList(topic))
 
     val producerProps = loadProps
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
@@ -82,16 +82,21 @@ object EndToEndLatency {
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
     val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
 
+    // sends a dummy message to create the topic if it doesn't exist
+    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
Array[Byte]())).get()
+
     def finalise() {
       consumer.commitSync()
       producer.close()
       consumer.close()
     }
 
-    //Ensure we are at latest offset. seekToEnd evaluates lazily, that is to 
say actually performs the seek only when
-    //a poll() or position() request is issued. Hence we need to poll after we 
seek to ensure we see our first write.
-    consumer.seekToEnd(Collections.emptyList())
-    consumer.poll(0)
+
+    val topicPartitions = consumer.partitionsFor(topic).asScala
+      .map(p => new TopicPartition(p.topic(), p.partition())).asJava
+    consumer.assign(topicPartitions)
+    consumer.seekToEnd(topicPartitions)
+    consumer.assignment().asScala.foreach(consumer.position)
 
     var totalTime = 0.0
     val latencies = new Array[Long](numMessages)
@@ -103,7 +108,7 @@ object EndToEndLatency {
 
       //Send message (of random bytes) synchronously then immediately poll for 
it
       producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
message)).get()
-      val recordIter = consumer.poll(timeout).iterator
+      val recordIter = consumer.poll(Duration.ofMillis(timeout)).iterator
 
       val elapsed = System.nanoTime - begin
 
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index d7e09e4efdb..d55d96bd65b 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -17,6 +17,7 @@
 
 package kafka.tools
 
+import java.time.Duration
 import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
@@ -452,7 +453,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         // uncommitted record since last poll. Using one second as poll's 
timeout ensures that
         // offsetCommitIntervalMs, of value greater than 1 second, does not 
see delays in offset
         // commit.
-        recordIter = consumer.poll(1000).iterator
+        recordIter = consumer.poll(Duration.ofSeconds(1)).iterator
         if (!recordIter.hasNext)
           throw new NoRecordsException
       }
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 3c045c69eb2..09d3b9ea9a1 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -47,6 +47,7 @@
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -57,6 +58,7 @@
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * {@link StreamsResetter} resets the processing state of a Kafka Streams 
application so that, for example, you can reprocess its input from scratch.
@@ -313,10 +315,12 @@ private int 
maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consum
         config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
         try (final KafkaConsumer<byte[], byte[]> client = new 
KafkaConsumer<>(config, new ByteArrayDeserializer(), new 
ByteArrayDeserializer())) {
-            client.subscribe(topicsToSubscribe);
-            client.poll(1);
+            Collection<TopicPartition> partitions = 
topicsToSubscribe.stream().map(client::partitionsFor)
+                    .flatMap(Collection::stream)
+                    .map(info -> new TopicPartition(info.topic(), 
info.partition()))
+                    .collect(Collectors.toList());
+            client.assign(partitions);
 
-            final Set<TopicPartition> partitions = client.assignment();
             final Set<TopicPartition> inputTopicPartitions = new HashSet<>();
             final Set<TopicPartition> intermediateTopicPartitions = new 
HashSet<>();
 
diff --git a/examples/src/main/java/kafka/examples/Consumer.java 
b/examples/src/main/java/kafka/examples/Consumer.java
index be062b309df..26d6e23a3f8 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Properties;
 
@@ -47,7 +48,7 @@ public Consumer(String topic) {
     @Override
     public void doWork() {
         consumer.subscribe(Collections.singletonList(this.topic));
-        ConsumerRecords<Integer, String> records = consumer.poll(1000);
+        ConsumerRecords<Integer, String> records = 
consumer.poll(Duration.ofSeconds(1));
         for (ConsumerRecord<Integer, String> record : records) {
             System.out.println("Received message: (" + record.key() + ", " + 
record.value() + ") at offset " + record.offset());
         }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java 
b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 0d74645379e..27e7c7fda5d 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -35,6 +35,7 @@
 import org.apache.kafka.common.errors.ProducerFencedException;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -287,7 +288,7 @@ public void run() {
                 try {
                     producer.beginTransaction();
                     while (messagesInCurrentTransaction < 
numMessagesForNextTransaction) {
-                        ConsumerRecords<String, String> records = 
consumer.poll(200L);
+                        ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(200));
                         for (ConsumerRecord<String, String> record : records) {
                             
producer.send(producerRecordFromConsumerRecord(outputTopic, record));
                             messagesInCurrentTransaction++;
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index cc09b233167..58f34718b8a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -47,6 +47,7 @@
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -220,7 +221,7 @@ public void run() {
             consumer.subscribe(Collections.singletonList(topic), this);
 
             while (!isFinished()) {
-                ConsumerRecords<String, String> records = 
consumer.poll(Long.MAX_VALUE);
+                ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                 Map<TopicPartition, OffsetAndMetadata> offsets = 
onRecordsReceived(records);
 
                 if (!useAutoCommit) {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
index 1a852964070..c3a90e4da6a 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -38,6 +38,7 @@
 
 import org.apache.kafka.trogdor.task.TaskWorker;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
@@ -135,7 +136,7 @@ public Void call() throws Exception {
             long startBatchMs = startTimeMs;
             try {
                 while (messagesConsumed < spec.maxMessages()) {
-                    ConsumerRecords<byte[], byte[]> records = 
consumer.poll(50);
+                    ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(50));
                     if (records.isEmpty()) {
                         continue;
                     }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 570f6a11e34..669fafcc75e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -50,6 +50,7 @@
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -337,7 +338,7 @@ public void run() {
                 while (true) {
                     try {
                         pollInvoked++;
-                        ConsumerRecords<byte[], byte[]> records = 
consumer.poll(50);
+                        ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(50));
                         for (Iterator<ConsumerRecord<byte[], byte[]>> iter = 
records.iterator(); iter.hasNext(); ) {
                             ConsumerRecord<byte[], byte[]> record = 
iter.next();
                             int messageIndex = 
ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove deprecated poll usages
> -----------------------------
>
>                 Key: KAFKA-7140
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7140
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Viktor Somogyi
>            Assignee: Viktor Somogyi
>            Priority: Minor
>
> There are a couple of poll(long) usages of the consumer in test and non-test 
> code. This jira would aim to remove the non-test usages of the method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to