This is an automated email from the ASF dual-hosted git repository. rabreu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new 68810c79e KafkaSpout: moving to poll(Duration) due to deprecation of poll(long) new c93795ca9 Merge pull request #3612 from reiabreu/master 68810c79e is described below commit 68810c79e6dbb836e98093f98f67f6e874f1b0a8 Author: Rui Abreu <rui.ab...@gmail.com> AuthorDate: Wed Dec 20 16:18:46 2023 +0000 KafkaSpout: moving to poll(Duration) due to deprecation of poll(long) --- .../java/org/apache/storm/kafka/spout/KafkaSpout.java | 4 +++- .../apache/storm/kafka/spout/KafkaSpoutEmitTest.java | 13 +++++++------ .../spout/KafkaSpoutLogCompactionSupportTest.java | 9 +++++---- .../kafka/spout/KafkaSpoutMessagingGuaranteeTest.java | 19 ++++++++++--------- .../storm/kafka/spout/KafkaSpoutRebalanceTest.java | 3 ++- .../storm/kafka/spout/KafkaSpoutRetryLimitTest.java | 7 ++++--- .../spout/SpoutWithMockedConsumerSetupHelper.java | 4 ++-- 7 files changed, 33 insertions(+), 26 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 89c021d7f..aeff50cc1 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -24,6 +24,8 @@ import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_E import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; import com.google.common.annotations.VisibleForTesting; + +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -355,7 +357,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains); try { consumer.pause(pausedPartitions); - final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + final ConsumerRecords<K, V> consumerRecords = consumer.poll(Duration.ofMillis(kafkaSpoutConfig.getPollTimeoutMs())); ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords); final int numPolledRecords = consumerRecords.count(); LOG.debug("Polled [{}] records from Kafka", diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 2c442c6cd..4d69633f1 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -80,7 +81,7 @@ public class KafkaSpoutEmitTest { Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 10)); - when(consumerMock.poll(anyLong())) + when(consumerMock.poll(any(Duration.class))) .thenReturn(new ConsumerRecords<>(records)); spout.nextTuple(); @@ -100,7 +101,7 @@ public class KafkaSpoutEmitTest { //This is cheating a bit since maxPollRecords would normally spread this across multiple polls records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, numRecords)); - when(consumerMock.poll(anyLong())) + when(consumerMock.poll(any(Duration.class))) .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < numRecords; i++) { @@ -132,7 +133,7 @@ public class KafkaSpoutEmitTest { } InOrder inOrder = inOrder(consumerMock); inOrder.verify(consumerMock).seek(partition, failedOffsets.get(0)); - inOrder.verify(consumerMock).poll(anyLong()); + inOrder.verify(consumerMock).poll(any(Duration.class)); } } @@ -148,7 +149,7 @@ public class KafkaSpoutEmitTest { records.put(partitionTwo, SpoutWithMockedConsumerSetupHelper.createRecords(partitionTwo, 0, spoutConfig.getMaxUncommittedOffsets() + 1)); int numMessages = spoutConfig.getMaxUncommittedOffsets()*2 + 1; - when(consumerMock.poll(anyLong())) + when(consumerMock.poll(any(Duration.class))) .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < numMessages; i++) { @@ -175,7 +176,7 @@ public class KafkaSpoutEmitTest { reset(collectorMock); Time.advanceTime(50); - when(consumerMock.poll(anyLong())) + when(consumerMock.poll(any(Duration.class))) .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, failedMessageIdPartitionOne.get().offset(), 1)))); spout.nextTuple(); @@ -187,7 +188,7 @@ public class KafkaSpoutEmitTest { //Should not seek on the paused partition inOrder.verify(consumerMock, never()).seek(eq(partitionTwo), anyLong()); inOrder.verify(consumerMock).pause(Collections.singleton(partitionTwo)); - inOrder.verify(consumerMock).poll(anyLong()); + inOrder.verify(consumerMock).poll(any(Duration.class)); inOrder.verify(consumerMock).resume(Collections.singleton(partitionTwo)); reset(collectorMock); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java index 961f6c564..a19bb9d26 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java @@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.hasKey; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -31,6 +31,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -86,7 +87,7 @@ public class KafkaSpoutLogCompactionSupportTest { recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 8, 2)); records.put(partition, recordsForPartition); - when(consumerMock.poll(anyLong())) + when(consumerMock.poll(any(Duration.class))) .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < recordsForPartition.size(); i++) { @@ -102,13 +103,13 @@ public class KafkaSpoutLogCompactionSupportTest { // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9 Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); - when(consumerMock.poll(anyLong())) + when(consumerMock.poll(any(Duration.class))) .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); spout.nextTuple(); InOrder inOrder = inOrder(consumerMock); inOrder.verify(consumerMock).commitSync(commitCapture.capture()); - inOrder.verify(consumerMock).poll(anyLong()); + inOrder.verify(consumerMock).poll(any(Duration.class)); //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java index 386d46a19..44c5d0a03 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -36,6 +36,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -85,14 +86,14 @@ public class KafkaSpoutMessagingGuaranteeTest { .build(); KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); - when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + when(consumerMock.poll(any(Duration.class))).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); spout.nextTuple(); //The spout should have emitted the tuple, and must have committed it before emit InOrder inOrder = inOrder(consumerMock, collectorMock); - inOrder.verify(consumerMock).poll(anyLong()); + inOrder.verify(consumerMock).poll(any(Duration.class)); inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList()); @@ -105,7 +106,7 @@ public class KafkaSpoutMessagingGuaranteeTest { private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) { KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); - when(consumerMock.poll(anyLong())) + when(consumerMock.poll(any(Duration.class))) .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets())))) .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, @@ -115,7 +116,7 @@ public class KafkaSpoutMessagingGuaranteeTest { spout.nextTuple(); } - verify(consumerMock, times(2)).poll(anyLong()); + verify(consumerMock, times(2)).poll(any(Duration.class)); verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets() * 2)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList()); } @@ -140,7 +141,7 @@ public class KafkaSpoutMessagingGuaranteeTest { private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> spoutConfig) { KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); - when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + when(consumerMock.poll(any(Duration.class))).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); spout.nextTuple(); @@ -189,7 +190,7 @@ public class KafkaSpoutMessagingGuaranteeTest { try (SimulatedTime ignored = new SimulatedTime()) { KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); - when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + when(consumerMock.poll(any(Duration.class))).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); spout.nextTuple(); @@ -203,7 +204,7 @@ public class KafkaSpoutMessagingGuaranteeTest { Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs()); - when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + when(consumerMock.poll(any(Duration.class))).thenReturn(new ConsumerRecords<>(Collections.emptyMap())); spout.nextTuple(); @@ -222,7 +223,7 @@ public class KafkaSpoutMessagingGuaranteeTest { try (SimulatedTime ignored = new SimulatedTime()) { KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); - when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + when(consumerMock.poll(any(Duration.class))).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); spout.nextTuple(); @@ -256,7 +257,7 @@ public class KafkaSpoutMessagingGuaranteeTest { KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); - when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + when(consumerMock.poll(any(Duration.class))).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); spout.nextTuple(); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index e1964864a..7a42b3c0f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -101,7 +102,7 @@ public class KafkaSpoutRebalanceTest { when(consumerMock.assignment()).thenReturn(assignedPartitions); //Make the consumer return a single message for each partition - when(consumerMock.poll(anyLong())) + when(consumerMock.poll(any(Duration.class))) .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partitionThatWillBeRevoked, SpoutWithMockedConsumerSetupHelper.createRecords(partitionThatWillBeRevoked, 0, 1)))) .thenReturn(new ConsumerRecords<>(Collections.singletonMap(assignedPartition, SpoutWithMockedConsumerSetupHelper.createRecords(assignedPartition, 0, 1)))) .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index 7982a5131..381796b73 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -19,7 +19,7 @@ import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutC import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -27,6 +27,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -84,7 +85,7 @@ public class KafkaSpoutRetryLimitTest { int numRecords = lastOffset + 1; records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, numRecords)); - when(consumerMock.poll(anyLong())) + when(consumerMock.poll(any(Duration.class))) .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < numRecords; i++) { @@ -104,7 +105,7 @@ public class KafkaSpoutRetryLimitTest { InOrder inOrder = inOrder(consumerMock); inOrder.verify(consumerMock).commitSync(commitCapture.capture()); - inOrder.verify(consumerMock).poll(anyLong()); + inOrder.verify(consumerMock).poll(any(Duration.class)); //verify that offset 4 was committed for the given TopicPartition, since processing should resume at 4. assertTrue(commitCapture.getValue().containsKey(partition)); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java index 77e8e40b3..1c9823702 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -18,7 +18,6 @@ package org.apache.storm.kafka.spout; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -27,6 +26,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -152,7 +152,7 @@ public class SpoutWithMockedConsumerSetupHelper { records.put(tp, tpRecords); } - when(consumerMock.poll(anyLong())) + when(consumerMock.poll(any(Duration.class))) .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < totalOffsets; i++) {