Drpc resquests
When should I got .. request timeout and request failed in drpc ?
[GitHub] storm pull request #2213: STORM-2633: Apply new code style to storm-sql-redi...
GitHub user vesense opened a pull request: https://github.com/apache/storm/pull/2213 STORM-2633: Apply new code style to storm-sql-redis Reduce the max allowed violation count to 0. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vesense/storm STORM-2633 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2213.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2213 commit 2dbe80a508c9b17ecf60fdc9256412841f4c0819 Author: Xin Wang Date: 2017-07-15T03:59:55Z STORM-2633: Apply new code style to storm-sql-redis --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2212: STORM-2632: Apply new code style to storm-sql-kafk...
GitHub user vesense opened a pull request: https://github.com/apache/storm/pull/2212 STORM-2632: Apply new code style to storm-sql-kafka Reduce the max allowed violation count to 0. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vesense/storm STORM-2632 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2212.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2212 commit c2d2bd2fc997bc80b03c61ab8ced7cac8110990f Author: Xin Wang Date: 2017-07-15T03:52:10Z STORM-2632: Apply new code style to storm-sql-kafka --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2211: STORM-2631: Apply new code style to storm-sql-mong...
GitHub user vesense opened a pull request: https://github.com/apache/storm/pull/2211 STORM-2631: Apply new code style to storm-sql-mongodb Reduce the max allowed violation count to 0. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vesense/storm STORM-2631 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2211.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2211 commit 38c91267278ecf40e14c5f2da9ddc53624c08c69 Author: Xin Wang Date: 2017-07-15T03:43:22Z STORM-2631: Apply new code style to storm-sql-mongodb --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2210: STORM-2630: Apply new code style to storm-sql-hdfs
GitHub user vesense opened a pull request: https://github.com/apache/storm/pull/2210 STORM-2630: Apply new code style to storm-sql-hdfs Reduce the max allowed violation count to 0. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vesense/storm STORM-2630 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2210.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2210 commit 9a7cb048556db04eab4d75629c0bded04e62d8e8 Author: Xin Wang Date: 2017-07-15T03:37:15Z STORM-2630: Apply new code style to storm-sql-hdfs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2201: STORM-2557: add tuple_population metric
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2201 @revans2 Isn't this patch related to STORM-2621 instead of STORM-2557? Just would like to clarify. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: New Committer/PMC Member: Stig Rohde Døssing
Congrats Stig! - Xin 2017-07-15 9:13 GMT+08:00 Satish Duggana : > Congrats Stig! > > ~Satish. > > On Sat, Jul 15, 2017 at 4:18 AM, Jungtaek Lim wrote: > > > Welcome Stig! Well deserved. > > > > - Jungtaek Lim (HeartSaVioR) > > On Sat, 15 Jul 2017 at 4:25 AM Hugo Da Cruz Louro < > hlo...@hortonworks.com> > > wrote: > > > > > Welcome Stig. Looking forward to collaborating with you. > > > > > > Hugo > > > > > > > On Jul 14, 2017, at 11:44 AM, P. Taylor Goetz > > wrote: > > > > > > > > Please join me in congratulating the latest Committer and PMC Member, > > > Stig Rohde Døssing. > > > > > > > > Stig has been very active contributing patches to Storm’s Kafka > > > integration, participating in code reviews, and answering questions on > > the > > > mailing lists. > > > > > > > > Welcome Stig! > > > > > > > > -Taylor > > > > > > > > > > > > >
Re: New Committer/PMC Member: Stig Rohde Døssing
Congrats Stig! ~Satish. On Sat, Jul 15, 2017 at 4:18 AM, Jungtaek Lim wrote: > Welcome Stig! Well deserved. > > - Jungtaek Lim (HeartSaVioR) > On Sat, 15 Jul 2017 at 4:25 AM Hugo Da Cruz Louro > wrote: > > > Welcome Stig. Looking forward to collaborating with you. > > > > Hugo > > > > > On Jul 14, 2017, at 11:44 AM, P. Taylor Goetz > wrote: > > > > > > Please join me in congratulating the latest Committer and PMC Member, > > Stig Rohde Døssing. > > > > > > Stig has been very active contributing patches to Storm’s Kafka > > integration, participating in code reviews, and answering questions on > the > > mailing lists. > > > > > > Welcome Stig! > > > > > > -Taylor > > > > > > > >
Re: New Committer/PMC Member: Stig Rohde Døssing
Thanks. Happy to be part of the team :) 2017-07-15 0:48 GMT+02:00 Jungtaek Lim : > Welcome Stig! Well deserved. > > - Jungtaek Lim (HeartSaVioR) > On Sat, 15 Jul 2017 at 4:25 AM Hugo Da Cruz Louro > wrote: > > > Welcome Stig. Looking forward to collaborating with you. > > > > Hugo > > > > > On Jul 14, 2017, at 11:44 AM, P. Taylor Goetz > wrote: > > > > > > Please join me in congratulating the latest Committer and PMC Member, > > Stig Rohde Døssing. > > > > > > Stig has been very active contributing patches to Storm’s Kafka > > integration, participating in code reviews, and answering questions on > the > > mailing lists. > > > > > > Welcome Stig! > > > > > > -Taylor > > > > > > > >
Re: New Committer/PMC Member: Stig Rohde Døssing
Welcome Stig! Well deserved. - Jungtaek Lim (HeartSaVioR) On Sat, 15 Jul 2017 at 4:25 AM Hugo Da Cruz Louro wrote: > Welcome Stig. Looking forward to collaborating with you. > > Hugo > > > On Jul 14, 2017, at 11:44 AM, P. Taylor Goetz wrote: > > > > Please join me in congratulating the latest Committer and PMC Member, > Stig Rohde Døssing. > > > > Stig has been very active contributing patches to Storm’s Kafka > integration, participating in code reviews, and answering questions on the > mailing lists. > > > > Welcome Stig! > > > > -Taylor > > > >
[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2152 Yes, it looks like there are only lambdas in the test code for these two PRs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...
Github user askprasanna commented on the issue: https://github.com/apache/storm/pull/2152 Will be happy to do that. Haven't checked fully and being a bit lazy here - we are using lambdas only in the unit tests, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2153#discussion_r127544434 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java --- @@ -0,0 +1,127 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; + +public class KafkaSpoutRetryLimitTest { + +private final long offsetCommitPeriodMs = 2_000; +private final TopologyContext contextMock = mock(TopologyContext.class); +private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); +private final Map conf = new HashMap<>(); +private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); +private KafkaConsumer consumerMock; +private KafkaSpout spout; +private KafkaSpoutConfig spoutConfig; + +public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = +new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), +0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + +private void setupSpoutWithNoRetry(Set assignedPartitions) { +spoutConfig = getKafkaSpoutConfigBuilder(-1) +.setOffsetCommitPeriodMs(offsetCommitPeriodMs) +.setRetry(ZERO_RETRIES_RETRY_SERVICE) +.build(); + +consumerMock = mock(KafkaConsumer.class); +KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + +//Set up a spout listening to 1 topic partition +spout = new KafkaSpout<>(spoutConfig, consumerFactory); + +spout.open(conf, contextMock, collectorMock); +spout.activate(); + +ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); +verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); + +//Assign partitions to the spout +ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); +consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); +} + +@Test +public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { +//Spout should ack failed messages after they hit the retry limit +try (SimulatedTime simulatedTime = new SimulatedTime()) { +setupSpoutWithNoRetry(Collections.singleton(partition)); +Map>> records = new HashMap<>(); +List> recordsForPartition = new ArrayList<>(); +int lastOffset = 3; +for (int i = 0; i <= lastOffset; i++) { +recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key"
[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2152 I forgot to mention it, but this seems applicable to the 1.x branch as well. We can't cherry pick this back since it uses a few lambdas, so would you be willing to make a PR for a backport? This would also be the case for https://github.com/apache/storm/pull/2153. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...
Github user askprasanna commented on a diff in the pull request: https://github.com/apache/storm/pull/2153#discussion_r127543142 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java --- @@ -0,0 +1,127 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; + +public class KafkaSpoutRetryLimitTest { + +private final long offsetCommitPeriodMs = 2_000; +private final TopologyContext contextMock = mock(TopologyContext.class); +private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); +private final Map conf = new HashMap<>(); +private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); +private KafkaConsumer consumerMock; +private KafkaSpout spout; +private KafkaSpoutConfig spoutConfig; + +public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = +new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), +0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + +private void setupSpoutWithNoRetry(Set assignedPartitions) { +spoutConfig = getKafkaSpoutConfigBuilder(-1) +.setOffsetCommitPeriodMs(offsetCommitPeriodMs) +.setRetry(ZERO_RETRIES_RETRY_SERVICE) +.build(); + +consumerMock = mock(KafkaConsumer.class); +KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + +//Set up a spout listening to 1 topic partition +spout = new KafkaSpout<>(spoutConfig, consumerFactory); + +spout.open(conf, contextMock, collectorMock); +spout.activate(); + +ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); +verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); + +//Assign partitions to the spout +ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); +consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); +} + +@Test +public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { +//Spout should ack failed messages after they hit the retry limit +try (SimulatedTime simulatedTime = new SimulatedTime()) { +setupSpoutWithNoRetry(Collections.singleton(partition)); +Map>> records = new HashMap<>(); +List> recordsForPartition = new ArrayList<>(); +int lastOffset = 3; +for (int i = 0; i <= lastOffset; i++) { +recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i
[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...
Github user askprasanna commented on a diff in the pull request: https://github.com/apache/storm/pull/2153#discussion_r127541772 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java --- @@ -0,0 +1,127 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; + +public class KafkaSpoutRetryLimitTest { + +private final long offsetCommitPeriodMs = 2_000; +private final TopologyContext contextMock = mock(TopologyContext.class); +private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); +private final Map conf = new HashMap<>(); +private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); +private KafkaConsumer consumerMock; +private KafkaSpout spout; +private KafkaSpoutConfig spoutConfig; + +public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = +new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), +0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + +private void setupSpoutWithNoRetry(Set assignedPartitions) { +spoutConfig = getKafkaSpoutConfigBuilder(-1) +.setOffsetCommitPeriodMs(offsetCommitPeriodMs) +.setRetry(ZERO_RETRIES_RETRY_SERVICE) +.build(); + +consumerMock = mock(KafkaConsumer.class); +KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + +//Set up a spout listening to 1 topic partition +spout = new KafkaSpout<>(spoutConfig, consumerFactory); + +spout.open(conf, contextMock, collectorMock); +spout.activate(); + +ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); +verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); + +//Assign partitions to the spout +ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); +consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); +} + +@Test +public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { +//Spout should ack failed messages after they hit the retry limit +try (SimulatedTime simulatedTime = new SimulatedTime()) { +setupSpoutWithNoRetry(Collections.singleton(partition)); +Map>> records = new HashMap<>(); +List> recordsForPartition = new ArrayList<>(); +int lastOffset = 3; +for (int i = 0; i <= lastOffset; i++) { +recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i
[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...
Github user askprasanna commented on a diff in the pull request: https://github.com/apache/storm/pull/2153#discussion_r127540789 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java --- @@ -0,0 +1,127 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; + +public class KafkaSpoutRetryLimitTest { + +private final long offsetCommitPeriodMs = 2_000; +private final TopologyContext contextMock = mock(TopologyContext.class); +private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); +private final Map conf = new HashMap<>(); +private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); +private KafkaConsumer consumerMock; +private KafkaSpout spout; +private KafkaSpoutConfig spoutConfig; + +public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = +new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), +0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + +private void setupSpoutWithNoRetry(Set assignedPartitions) { +spoutConfig = getKafkaSpoutConfigBuilder(-1) +.setOffsetCommitPeriodMs(offsetCommitPeriodMs) +.setRetry(ZERO_RETRIES_RETRY_SERVICE) +.build(); + +consumerMock = mock(KafkaConsumer.class); +KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + +//Set up a spout listening to 1 topic partition --- End diff -- will remove this comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2153#discussion_r127539919 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java --- @@ -0,0 +1,127 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; + +public class KafkaSpoutRetryLimitTest { + +private final long offsetCommitPeriodMs = 2_000; +private final TopologyContext contextMock = mock(TopologyContext.class); +private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); +private final Map conf = new HashMap<>(); +private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); +private KafkaConsumer consumerMock; +private KafkaSpout spout; +private KafkaSpoutConfig spoutConfig; + +public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = +new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), +0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + +private void setupSpoutWithNoRetry(Set assignedPartitions) { +spoutConfig = getKafkaSpoutConfigBuilder(-1) +.setOffsetCommitPeriodMs(offsetCommitPeriodMs) +.setRetry(ZERO_RETRIES_RETRY_SERVICE) +.build(); + +consumerMock = mock(KafkaConsumer.class); +KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + +//Set up a spout listening to 1 topic partition +spout = new KafkaSpout<>(spoutConfig, consumerFactory); + +spout.open(conf, contextMock, collectorMock); +spout.activate(); + +ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); +verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); + +//Assign partitions to the spout +ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); +consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); +} + +@Test +public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { +//Spout should ack failed messages after they hit the retry limit +try (SimulatedTime simulatedTime = new SimulatedTime()) { +setupSpoutWithNoRetry(Collections.singleton(partition)); +Map>> records = new HashMap<>(); +List> recordsForPartition = new ArrayList<>(); +int lastOffset = 3; +for (int i = 0; i <= lastOffset; i++) { +recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key"
[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2153#discussion_r127539677 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java --- @@ -0,0 +1,127 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; + +public class KafkaSpoutRetryLimitTest { + +private final long offsetCommitPeriodMs = 2_000; +private final TopologyContext contextMock = mock(TopologyContext.class); +private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); +private final Map conf = new HashMap<>(); +private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); +private KafkaConsumer consumerMock; +private KafkaSpout spout; +private KafkaSpoutConfig spoutConfig; + +public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = +new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), +0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + +private void setupSpoutWithNoRetry(Set assignedPartitions) { +spoutConfig = getKafkaSpoutConfigBuilder(-1) +.setOffsetCommitPeriodMs(offsetCommitPeriodMs) +.setRetry(ZERO_RETRIES_RETRY_SERVICE) +.build(); + +consumerMock = mock(KafkaConsumer.class); +KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + +//Set up a spout listening to 1 topic partition --- End diff -- Nit: This is only true if assignedPartitions only has one entry --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2153#discussion_r127539975 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java --- @@ -0,0 +1,127 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; + +public class KafkaSpoutRetryLimitTest { + +private final long offsetCommitPeriodMs = 2_000; +private final TopologyContext contextMock = mock(TopologyContext.class); +private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); +private final Map conf = new HashMap<>(); +private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); +private KafkaConsumer consumerMock; +private KafkaSpout spout; +private KafkaSpoutConfig spoutConfig; + +public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = +new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), +0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + +private void setupSpoutWithNoRetry(Set assignedPartitions) { +spoutConfig = getKafkaSpoutConfigBuilder(-1) +.setOffsetCommitPeriodMs(offsetCommitPeriodMs) +.setRetry(ZERO_RETRIES_RETRY_SERVICE) +.build(); + +consumerMock = mock(KafkaConsumer.class); +KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + +//Set up a spout listening to 1 topic partition +spout = new KafkaSpout<>(spoutConfig, consumerFactory); + +spout.open(conf, contextMock, collectorMock); +spout.activate(); + +ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); +verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); + +//Assign partitions to the spout +ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); +consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); +} + +@Test +public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { +//Spout should ack failed messages after they hit the retry limit +try (SimulatedTime simulatedTime = new SimulatedTime()) { +setupSpoutWithNoRetry(Collections.singleton(partition)); +Map>> records = new HashMap<>(); +List> recordsForPartition = new ArrayList<>(); +int lastOffset = 3; +for (int i = 0; i <= lastOffset; i++) { +recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key"
[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2152#discussion_r127538268 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java --- @@ -0,0 +1,146 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; + +public class KafkaSpoutCommitTest { + +private final long offsetCommitPeriodMs = 2_000; +private final TopologyContext contextMock = mock(TopologyContext.class); +private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); +private final Map conf = new HashMap<>(); +private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); +private KafkaConsumer consumerMock; +private KafkaSpout spout; +private KafkaSpoutConfig spoutConfig; + +@Captor +private ArgumentCaptor> commitCapture; + +private void setupSpout(Set assignedPartitions) { +MockitoAnnotations.initMocks(this); +spoutConfig = getKafkaSpoutConfigBuilder(-1) +.setOffsetCommitPeriodMs(offsetCommitPeriodMs) +.build(); + +consumerMock = mock(KafkaConsumer.class); +KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + +//Set up a spout listening to 1 topic partition +spout = new KafkaSpout<>(spoutConfig, consumerFactory); + +spout.open(conf, contextMock, collectorMock); +spout.activate(); + +ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); +verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); + +//Assign partitions to the spout +ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); +consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); +} + +@Test +public void testCommitSuccessWithOffsetVoids() { +//Verify that the commit logic can handle offset voids +try (SimulatedTime simulatedTime = new SimulatedTime()) { +setupSpout(Collections.singleton(partition)); +Map>> records = new HashMap<>(); +List> recordsForPartition = new ArrayList<>(); +// Offsets emitted are 0,1,2,3,4,...,8,9 --- End diff -- Nit: This comment is a little confusing, the ... would usually imply that 5,6,7 are included. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2152#discussion_r127538620 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java --- @@ -0,0 +1,146 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; + +public class KafkaSpoutCommitTest { + +private final long offsetCommitPeriodMs = 2_000; +private final TopologyContext contextMock = mock(TopologyContext.class); +private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); +private final Map conf = new HashMap<>(); +private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); +private KafkaConsumer consumerMock; +private KafkaSpout spout; +private KafkaSpoutConfig spoutConfig; + +@Captor +private ArgumentCaptor> commitCapture; + +private void setupSpout(Set assignedPartitions) { +MockitoAnnotations.initMocks(this); +spoutConfig = getKafkaSpoutConfigBuilder(-1) +.setOffsetCommitPeriodMs(offsetCommitPeriodMs) +.build(); + +consumerMock = mock(KafkaConsumer.class); +KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + +//Set up a spout listening to 1 topic partition +spout = new KafkaSpout<>(spoutConfig, consumerFactory); + +spout.open(conf, contextMock, collectorMock); +spout.activate(); + +ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); +verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); + +//Assign partitions to the spout +ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); +consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); +} + +@Test +public void testCommitSuccessWithOffsetVoids() { +//Verify that the commit logic can handle offset voids +try (SimulatedTime simulatedTime = new SimulatedTime()) { +setupSpout(Collections.singleton(partition)); +Map>> records = new HashMap<>(); +List> recordsForPartition = new ArrayList<>(); +// Offsets emitted are 0,1,2,3,4,...,8,9 +for (int i = 0; i < 5; i++) { +recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); +} +for (int i = 8; i < 10; i++) { +recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); +} +records.put(partition, recordsForPartition); + +when(consumerMock.poll(anyLong())) +.thenReturn(new ConsumerRecords(records)); + +for (int i = 0; i < r
[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2152#discussion_r127538410 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java --- @@ -0,0 +1,146 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; + +public class KafkaSpoutCommitTest { + +private final long offsetCommitPeriodMs = 2_000; +private final TopologyContext contextMock = mock(TopologyContext.class); +private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); +private final Map conf = new HashMap<>(); +private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); +private KafkaConsumer consumerMock; +private KafkaSpout spout; +private KafkaSpoutConfig spoutConfig; + +@Captor +private ArgumentCaptor> commitCapture; + +private void setupSpout(Set assignedPartitions) { +MockitoAnnotations.initMocks(this); +spoutConfig = getKafkaSpoutConfigBuilder(-1) +.setOffsetCommitPeriodMs(offsetCommitPeriodMs) +.build(); + +consumerMock = mock(KafkaConsumer.class); +KafkaConsumerFactory consumerFactory = (kafkaSpoutConfig) -> consumerMock; + +//Set up a spout listening to 1 topic partition +spout = new KafkaSpout<>(spoutConfig, consumerFactory); + +spout.open(conf, contextMock, collectorMock); +spout.activate(); + +ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); +verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); + +//Assign partitions to the spout +ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); +consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); +} + +@Test +public void testCommitSuccessWithOffsetVoids() { +//Verify that the commit logic can handle offset voids +try (SimulatedTime simulatedTime = new SimulatedTime()) { +setupSpout(Collections.singleton(partition)); +Map>> records = new HashMap<>(); +List> recordsForPartition = new ArrayList<>(); +// Offsets emitted are 0,1,2,3,4,...,8,9 +for (int i = 0; i < 5; i++) { +recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); +} +for (int i = 8; i < 10; i++) { +recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); +} +records.put(partition, recordsForPartition); + +when(consumerMock.poll(anyLong())) +.thenReturn(new ConsumerRecords(records)); + +for (int i = 0; i < r
Re: New Committer/PMC Member: Stig Rohde Døssing
Welcome Stig. Looking forward to collaborating with you. Hugo > On Jul 14, 2017, at 11:44 AM, P. Taylor Goetz wrote: > > Please join me in congratulating the latest Committer and PMC Member, Stig > Rohde Døssing. > > Stig has been very active contributing patches to Storm’s Kafka integration, > participating in code reviews, and answering questions on the mailing lists. > > Welcome Stig! > > -Taylor >
New Committer/PMC Member: Stig Rohde Døssing
Please join me in congratulating the latest Committer and PMC Member, Stig Rohde Døssing. Stig has been very active contributing patches to Storm’s Kafka integration, participating in code reviews, and answering questions on the mailing lists. Welcome Stig! -Taylor
[GitHub] storm issue #2209: [STORM-2622] Add owner resource summary on storm UI
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2209 ![screen shot 2017-07-14 at 9 08 15 am](https://user-images.githubusercontent.com/14900612/28215594-2b6aa5b8-6874-11e7-808c-09395d0ffaa5.png) https://user-images.githubusercontent.com/14900612/28215602-313cb8e6-6874-11e7-8055-55d6c0b491c3.png";> --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2209: [STORM-2622] Add owner resource summary on storm U...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2209#discussion_r127463843 --- Diff: storm-server/src/main/java/org/apache/storm/LocalCluster.java --- @@ -1095,6 +1096,12 @@ public TopologyHistoryInfo getTopologyHistory(String user) throws AuthorizationE return ret; } } + +@Override +public List getOwnerResourceSummaries(String owner) throws AuthorizationException, TException { +// TODO Auto-generated method stub +throw new RuntimeException("NOT IMPLMENETED YET"); --- End diff -- Thanks. Fixed it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2207: [STORM-2626] Provided a template for drpc-auth-acl.yaml
Github user liu-zhaokun commented on the issue: https://github.com/apache/storm/pull/2207 @HeartSaVioR I am so sorry to bother you.Do you have time to help me review it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2208: [STORM-2627] The annotation of storm.zookeeper.topology.a...
Github user liu-zhaokun commented on the issue: https://github.com/apache/storm/pull/2208 @HeartSaVioR I am so sorry to bother you.Do you have time to help me review it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---