This is an automated email from the ASF dual-hosted git repository.

rabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 68810c79e KafkaSpout: moving to poll(Duration) due to deprecation of 
poll(long)
     new c93795ca9 Merge pull request #3612 from reiabreu/master
68810c79e is described below

commit 68810c79e6dbb836e98093f98f67f6e874f1b0a8
Author: Rui Abreu <rui.ab...@gmail.com>
AuthorDate: Wed Dec 20 16:18:46 2023 +0000

    KafkaSpout: moving to poll(Duration) due to deprecation of poll(long)
---
 .../java/org/apache/storm/kafka/spout/KafkaSpout.java |  4 +++-
 .../apache/storm/kafka/spout/KafkaSpoutEmitTest.java  | 13 +++++++------
 .../spout/KafkaSpoutLogCompactionSupportTest.java     |  9 +++++----
 .../kafka/spout/KafkaSpoutMessagingGuaranteeTest.java | 19 ++++++++++---------
 .../storm/kafka/spout/KafkaSpoutRebalanceTest.java    |  3 ++-
 .../storm/kafka/spout/KafkaSpoutRetryLimitTest.java   |  7 ++++---
 .../spout/SpoutWithMockedConsumerSetupHelper.java     |  4 ++--
 7 files changed, 33 insertions(+), 26 deletions(-)

diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 89c021d7f..aeff50cc1 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -24,6 +24,8 @@ import static 
org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_E
 import static 
org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -355,7 +357,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         
pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
         try {
             consumer.pause(pausedPartitions);
-            final ConsumerRecords<K, V> consumerRecords = 
consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+            final ConsumerRecords<K, V> consumerRecords = 
consumer.poll(Duration.ofMillis(kafkaSpoutConfig.getPollTimeoutMs()));
             
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets,
 consumerRecords);
             final int numPolledRecords = consumerRecords.count();
             LOG.debug("Polled [{}] records from Kafka",
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index 2c442c6cd..4d69633f1 100755
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -80,7 +81,7 @@ public class KafkaSpoutEmitTest {
         Map<TopicPartition, List<ConsumerRecord<String, String>>> records = 
new HashMap<>();
         records.put(partition, 
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 10));
 
-        when(consumerMock.poll(anyLong()))
+        when(consumerMock.poll(any(Duration.class)))
             .thenReturn(new ConsumerRecords<>(records));
 
         spout.nextTuple();
@@ -100,7 +101,7 @@ public class KafkaSpoutEmitTest {
             //This is cheating a bit since maxPollRecords would normally 
spread this across multiple polls
             records.put(partition, 
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, numRecords));
 
-            when(consumerMock.poll(anyLong()))
+            when(consumerMock.poll(any(Duration.class)))
                 .thenReturn(new ConsumerRecords<>(records));
 
             for (int i = 0; i < numRecords; i++) {
@@ -132,7 +133,7 @@ public class KafkaSpoutEmitTest {
             }
             InOrder inOrder = inOrder(consumerMock);
             inOrder.verify(consumerMock).seek(partition, failedOffsets.get(0));
-            inOrder.verify(consumerMock).poll(anyLong());
+            inOrder.verify(consumerMock).poll(any(Duration.class));
         }
     }
 
@@ -148,7 +149,7 @@ public class KafkaSpoutEmitTest {
             records.put(partitionTwo, 
SpoutWithMockedConsumerSetupHelper.createRecords(partitionTwo, 0, 
spoutConfig.getMaxUncommittedOffsets() + 1));
             int numMessages = spoutConfig.getMaxUncommittedOffsets()*2 + 1;
 
-            when(consumerMock.poll(anyLong()))
+            when(consumerMock.poll(any(Duration.class)))
                 .thenReturn(new ConsumerRecords<>(records));
 
             for (int i = 0; i < numMessages; i++) {
@@ -175,7 +176,7 @@ public class KafkaSpoutEmitTest {
             reset(collectorMock);
             
             Time.advanceTime(50);
-            when(consumerMock.poll(anyLong()))
+            when(consumerMock.poll(any(Duration.class)))
                 .thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition, 
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 
failedMessageIdPartitionOne.get().offset(), 1))));
             
             spout.nextTuple();
@@ -187,7 +188,7 @@ public class KafkaSpoutEmitTest {
             //Should not seek on the paused partition
             inOrder.verify(consumerMock, never()).seek(eq(partitionTwo), 
anyLong());
             
inOrder.verify(consumerMock).pause(Collections.singleton(partitionTwo));
-            inOrder.verify(consumerMock).poll(anyLong());
+            inOrder.verify(consumerMock).poll(any(Duration.class));
             
inOrder.verify(consumerMock).resume(Collections.singleton(partitionTwo));
             
             reset(collectorMock);
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
index 961f6c564..a19bb9d26 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
@@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.hasKey;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -86,7 +87,7 @@ public class KafkaSpoutLogCompactionSupportTest {
             
recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition,
 8, 2));
             records.put(partition, recordsForPartition);
 
-            when(consumerMock.poll(anyLong()))
+            when(consumerMock.poll(any(Duration.class)))
                     .thenReturn(new ConsumerRecords<>(records));
 
             for (int i = 0; i < recordsForPartition.size(); i++) {
@@ -102,13 +103,13 @@ public class KafkaSpoutLogCompactionSupportTest {
 
             // Advance time and then trigger first call to kafka consumer 
commit; the commit must progress to offset 9
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
-            when(consumerMock.poll(anyLong()))
+            when(consumerMock.poll(any(Duration.class)))
                     .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
             spout.nextTuple();
 
             InOrder inOrder = inOrder(consumerMock);
             inOrder.verify(consumerMock).commitSync(commitCapture.capture());
-            inOrder.verify(consumerMock).poll(anyLong());
+            inOrder.verify(consumerMock).poll(any(Duration.class));
 
             //verify that Offset 10 was last committed offset, since this is 
the offset the spout should resume at
             Map<TopicPartition, OffsetAndMetadata> commits = 
commitCapture.getValue();
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 386d46a19..44c5d0a03 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -36,6 +36,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -85,14 +86,14 @@ public class KafkaSpoutMessagingGuaranteeTest {
             .build();
         KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
 
-        when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
+        when(consumerMock.poll(any(Duration.class))).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
             SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
1))));
 
         spout.nextTuple();
 
         //The spout should have emitted the tuple, and must have committed it 
before emit
         InOrder inOrder = inOrder(consumerMock, collectorMock);
-        inOrder.verify(consumerMock).poll(anyLong());
+        inOrder.verify(consumerMock).poll(any(Duration.class));
         inOrder.verify(consumerMock).commitSync(commitCapture.capture());
         
inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
 anyList());
 
@@ -105,7 +106,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
     private void 
doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> 
spoutConfig) {
         KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
 
-        when(consumerMock.poll(anyLong()))
+        when(consumerMock.poll(any(Duration.class)))
             .thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
                 SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
spoutConfig.getMaxUncommittedOffsets()))))
             .thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
@@ -115,7 +116,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
             spout.nextTuple();
         }
 
-        verify(consumerMock, times(2)).poll(anyLong());
+        verify(consumerMock, times(2)).poll(any(Duration.class));
         verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets() * 
2)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
     }
 
@@ -140,7 +141,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
     private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> 
spoutConfig) {
         KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
 
-        when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
+        when(consumerMock.poll(any(Duration.class))).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
             SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
1))));
 
         spout.nextTuple();
@@ -189,7 +190,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
         try (SimulatedTime ignored = new SimulatedTime()) {
             KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
 
-            when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
+            when(consumerMock.poll(any(Duration.class))).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
                 SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
1))));
 
             spout.nextTuple();
@@ -203,7 +204,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
 
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + 
spoutConfig.getOffsetsCommitPeriodMs());
 
-            when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.emptyMap()));
+            when(consumerMock.poll(any(Duration.class))).thenReturn(new 
ConsumerRecords<>(Collections.emptyMap()));
 
             spout.nextTuple();
 
@@ -222,7 +223,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
         try (SimulatedTime ignored = new SimulatedTime()) {
             KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
 
-            when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
+            when(consumerMock.poll(any(Duration.class))).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
                 SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
1))));
 
             spout.nextTuple();
@@ -256,7 +257,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
         
         KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
 
-        when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
+        when(consumerMock.poll(any(Duration.class))).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
             SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
1))));
 
         spout.nextTuple();
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index e1964864a..7a42b3c0f 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.*;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -101,7 +102,7 @@ public class KafkaSpoutRebalanceTest {
         when(consumerMock.assignment()).thenReturn(assignedPartitions);
 
         //Make the consumer return a single message for each partition
-        when(consumerMock.poll(anyLong()))
+        when(consumerMock.poll(any(Duration.class)))
             .thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partitionThatWillBeRevoked, 
SpoutWithMockedConsumerSetupHelper.createRecords(partitionThatWillBeRevoked, 0, 
1))))
             .thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(assignedPartition, 
SpoutWithMockedConsumerSetupHelper.createRecords(assignedPartition, 0, 1))))
             .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index 7982a5131..381796b73 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -19,7 +19,7 @@ import static 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutC
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -84,7 +85,7 @@ public class KafkaSpoutRetryLimitTest {
             int numRecords = lastOffset + 1;
             records.put(partition, 
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, numRecords));
             
-            when(consumerMock.poll(anyLong()))
+            when(consumerMock.poll(any(Duration.class)))
                 .thenReturn(new ConsumerRecords<>(records));
             
             for (int i = 0; i < numRecords; i++) {
@@ -104,7 +105,7 @@ public class KafkaSpoutRetryLimitTest {
             
             InOrder inOrder = inOrder(consumerMock);
             inOrder.verify(consumerMock).commitSync(commitCapture.capture());
-            inOrder.verify(consumerMock).poll(anyLong());
+            inOrder.verify(consumerMock).poll(any(Duration.class));
 
             //verify that offset 4 was committed for the given TopicPartition, 
since processing should resume at 4.
             assertTrue(commitCapture.getValue().containsKey(partition));
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
index 77e8e40b3..1c9823702 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -18,7 +18,6 @@ package org.apache.storm.kafka.spout;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -27,6 +26,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -152,7 +152,7 @@ public class SpoutWithMockedConsumerSetupHelper {
             records.put(tp, tpRecords);
         }
 
-        when(consumerMock.poll(anyLong()))
+        when(consumerMock.poll(any(Duration.class)))
             .thenReturn(new ConsumerRecords<>(records));
 
         for (int i = 0; i < totalOffsets; i++) {

Reply via email to