Drpc resquests

2017-07-14 Thread sam mohel
When should I got ..  request timeout and request failed in drpc ?


[GitHub] storm pull request #2213: STORM-2633: Apply new code style to storm-sql-redi...

2017-07-14 Thread vesense
GitHub user vesense opened a pull request:

https://github.com/apache/storm/pull/2213

STORM-2633: Apply new code style to storm-sql-redis

Reduce the max allowed violation count to 0.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vesense/storm STORM-2633

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2213.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2213


commit 2dbe80a508c9b17ecf60fdc9256412841f4c0819
Author: Xin Wang 
Date:   2017-07-15T03:59:55Z

STORM-2633: Apply new code style to storm-sql-redis




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2212: STORM-2632: Apply new code style to storm-sql-kafk...

2017-07-14 Thread vesense
GitHub user vesense opened a pull request:

https://github.com/apache/storm/pull/2212

STORM-2632: Apply new code style to storm-sql-kafka

Reduce the max allowed violation count to 0.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vesense/storm STORM-2632

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2212.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2212


commit c2d2bd2fc997bc80b03c61ab8ced7cac8110990f
Author: Xin Wang 
Date:   2017-07-15T03:52:10Z

STORM-2632: Apply new code style to storm-sql-kafka




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2211: STORM-2631: Apply new code style to storm-sql-mong...

2017-07-14 Thread vesense
GitHub user vesense opened a pull request:

https://github.com/apache/storm/pull/2211

STORM-2631: Apply new code style to storm-sql-mongodb

Reduce the max allowed violation count to 0.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vesense/storm STORM-2631

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2211.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2211


commit 38c91267278ecf40e14c5f2da9ddc53624c08c69
Author: Xin Wang 
Date:   2017-07-15T03:43:22Z

STORM-2631: Apply new code style to storm-sql-mongodb




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2210: STORM-2630: Apply new code style to storm-sql-hdfs

2017-07-14 Thread vesense
GitHub user vesense opened a pull request:

https://github.com/apache/storm/pull/2210

STORM-2630: Apply new code style to storm-sql-hdfs

Reduce the max allowed violation count to 0.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vesense/storm STORM-2630

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2210.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2210


commit 9a7cb048556db04eab4d75629c0bded04e62d8e8
Author: Xin Wang 
Date:   2017-07-15T03:37:15Z

STORM-2630: Apply new code style to storm-sql-hdfs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2201: STORM-2557: add tuple_population metric

2017-07-14 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2201
  
@revans2 Isn't this patch related to STORM-2621 instead of STORM-2557? Just 
would like to clarify.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: New Committer/PMC Member: Stig Rohde Døssing

2017-07-14 Thread Xin Wang
Congrats Stig!

- Xin

2017-07-15 9:13 GMT+08:00 Satish Duggana :

> Congrats Stig!
>
> ~Satish.
>
> On Sat, Jul 15, 2017 at 4:18 AM, Jungtaek Lim  wrote:
>
> > Welcome Stig! Well deserved.
> >
> > - Jungtaek Lim (HeartSaVioR)
> > On Sat, 15 Jul 2017 at 4:25 AM Hugo Da Cruz Louro <
> hlo...@hortonworks.com>
> > wrote:
> >
> > > Welcome Stig. Looking forward to collaborating with you.
> > >
> > > Hugo
> > >
> > > > On Jul 14, 2017, at 11:44 AM, P. Taylor Goetz 
> > wrote:
> > > >
> > > > Please join me in congratulating the latest Committer and PMC Member,
> > > Stig Rohde Døssing.
> > > >
> > > > Stig has been very active contributing patches to Storm’s Kafka
> > > integration, participating in code reviews, and answering questions on
> > the
> > > mailing lists.
> > > >
> > > > Welcome Stig!
> > > >
> > > > -Taylor
> > > >
> > >
> > >
> >
>


Re: New Committer/PMC Member: Stig Rohde Døssing

2017-07-14 Thread Satish Duggana
Congrats Stig!

~Satish.

On Sat, Jul 15, 2017 at 4:18 AM, Jungtaek Lim  wrote:

> Welcome Stig! Well deserved.
>
> - Jungtaek Lim (HeartSaVioR)
> On Sat, 15 Jul 2017 at 4:25 AM Hugo Da Cruz Louro 
> wrote:
>
> > Welcome Stig. Looking forward to collaborating with you.
> >
> > Hugo
> >
> > > On Jul 14, 2017, at 11:44 AM, P. Taylor Goetz 
> wrote:
> > >
> > > Please join me in congratulating the latest Committer and PMC Member,
> > Stig Rohde Døssing.
> > >
> > > Stig has been very active contributing patches to Storm’s Kafka
> > integration, participating in code reviews, and answering questions on
> the
> > mailing lists.
> > >
> > > Welcome Stig!
> > >
> > > -Taylor
> > >
> >
> >
>


Re: New Committer/PMC Member: Stig Rohde Døssing

2017-07-14 Thread Stig Rohde Døssing
Thanks. Happy to be part of the team :)

2017-07-15 0:48 GMT+02:00 Jungtaek Lim :

> Welcome Stig! Well deserved.
>
> - Jungtaek Lim (HeartSaVioR)
> On Sat, 15 Jul 2017 at 4:25 AM Hugo Da Cruz Louro 
> wrote:
>
> > Welcome Stig. Looking forward to collaborating with you.
> >
> > Hugo
> >
> > > On Jul 14, 2017, at 11:44 AM, P. Taylor Goetz 
> wrote:
> > >
> > > Please join me in congratulating the latest Committer and PMC Member,
> > Stig Rohde Døssing.
> > >
> > > Stig has been very active contributing patches to Storm’s Kafka
> > integration, participating in code reviews, and answering questions on
> the
> > mailing lists.
> > >
> > > Welcome Stig!
> > >
> > > -Taylor
> > >
> >
> >
>


Re: New Committer/PMC Member: Stig Rohde Døssing

2017-07-14 Thread Jungtaek Lim
Welcome Stig! Well deserved.

- Jungtaek Lim (HeartSaVioR)
On Sat, 15 Jul 2017 at 4:25 AM Hugo Da Cruz Louro 
wrote:

> Welcome Stig. Looking forward to collaborating with you.
>
> Hugo
>
> > On Jul 14, 2017, at 11:44 AM, P. Taylor Goetz  wrote:
> >
> > Please join me in congratulating the latest Committer and PMC Member,
> Stig Rohde Døssing.
> >
> > Stig has been very active contributing patches to Storm’s Kafka
> integration, participating in code reviews, and answering questions on the
> mailing lists.
> >
> > Welcome Stig!
> >
> > -Taylor
> >
>
>


[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

2017-07-14 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2152
  
Yes, it looks like there are only lambdas in the test code for these two 
PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

2017-07-14 Thread askprasanna
Github user askprasanna commented on the issue:

https://github.com/apache/storm/pull/2152
  
Will be happy to do that. Haven't checked fully and being a bit lazy here - 
we are using lambdas only in the unit tests, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...

2017-07-14 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2153#discussion_r127544434
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+
+public class KafkaSpoutRetryLimitTest {
+
+private final long offsetCommitPeriodMs = 2_000;
+private final TopologyContext contextMock = 
mock(TopologyContext.class);
+private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+private final Map conf = new HashMap<>();
+private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+private KafkaConsumer consumerMock;
+private KafkaSpout spout;
+private KafkaSpoutConfig spoutConfig;
+
+public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
+new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+0, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+private void setupSpoutWithNoRetry(Set 
assignedPartitions) {
+spoutConfig = getKafkaSpoutConfigBuilder(-1)
+.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+.setRetry(ZERO_RETRIES_RETRY_SERVICE)
+.build();
+
+consumerMock = mock(KafkaConsumer.class);
+KafkaConsumerFactory consumerFactory = 
(kafkaSpoutConfig) -> consumerMock;
+
+//Set up a spout listening to 1 topic partition
+spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+spout.open(conf, contextMock, collectorMock);
+spout.activate();
+
+ArgumentCaptor rebalanceListenerCapture 
= ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
+
+//Assign partitions to the spout
+ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
+consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+}
+
+@Test
+public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
+//Spout should ack failed messages after they hit the retry limit
+try (SimulatedTime simulatedTime = new SimulatedTime()) {
+setupSpoutWithNoRetry(Collections.singleton(partition));
+Map>> 
records = new HashMap<>();
+List> recordsForPartition = new 
ArrayList<>();
+int lastOffset = 3;
+for (int i = 0; i <= lastOffset; i++) {
+recordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), i, "key"

[GitHub] storm issue #2152: [STORM-2505] OffsetManager should account for offset void...

2017-07-14 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2152
  
I forgot to mention it, but this seems applicable to the 1.x branch as 
well. We can't cherry pick this back since it uses a few lambdas, so would you 
be willing to make a PR for a backport? This would also be the case for 
https://github.com/apache/storm/pull/2153.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...

2017-07-14 Thread askprasanna
Github user askprasanna commented on a diff in the pull request:

https://github.com/apache/storm/pull/2153#discussion_r127543142
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+
+public class KafkaSpoutRetryLimitTest {
+
+private final long offsetCommitPeriodMs = 2_000;
+private final TopologyContext contextMock = 
mock(TopologyContext.class);
+private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+private final Map conf = new HashMap<>();
+private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+private KafkaConsumer consumerMock;
+private KafkaSpout spout;
+private KafkaSpoutConfig spoutConfig;
+
+public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
+new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+0, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+private void setupSpoutWithNoRetry(Set 
assignedPartitions) {
+spoutConfig = getKafkaSpoutConfigBuilder(-1)
+.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+.setRetry(ZERO_RETRIES_RETRY_SERVICE)
+.build();
+
+consumerMock = mock(KafkaConsumer.class);
+KafkaConsumerFactory consumerFactory = 
(kafkaSpoutConfig) -> consumerMock;
+
+//Set up a spout listening to 1 topic partition
+spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+spout.open(conf, contextMock, collectorMock);
+spout.activate();
+
+ArgumentCaptor rebalanceListenerCapture 
= ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
+
+//Assign partitions to the spout
+ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
+consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+}
+
+@Test
+public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
+//Spout should ack failed messages after they hit the retry limit
+try (SimulatedTime simulatedTime = new SimulatedTime()) {
+setupSpoutWithNoRetry(Collections.singleton(partition));
+Map>> 
records = new HashMap<>();
+List> recordsForPartition = new 
ArrayList<>();
+int lastOffset = 3;
+for (int i = 0; i <= lastOffset; i++) {
+recordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), i

[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...

2017-07-14 Thread askprasanna
Github user askprasanna commented on a diff in the pull request:

https://github.com/apache/storm/pull/2153#discussion_r127541772
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+
+public class KafkaSpoutRetryLimitTest {
+
+private final long offsetCommitPeriodMs = 2_000;
+private final TopologyContext contextMock = 
mock(TopologyContext.class);
+private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+private final Map conf = new HashMap<>();
+private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+private KafkaConsumer consumerMock;
+private KafkaSpout spout;
+private KafkaSpoutConfig spoutConfig;
+
+public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
+new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+0, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+private void setupSpoutWithNoRetry(Set 
assignedPartitions) {
+spoutConfig = getKafkaSpoutConfigBuilder(-1)
+.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+.setRetry(ZERO_RETRIES_RETRY_SERVICE)
+.build();
+
+consumerMock = mock(KafkaConsumer.class);
+KafkaConsumerFactory consumerFactory = 
(kafkaSpoutConfig) -> consumerMock;
+
+//Set up a spout listening to 1 topic partition
+spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+spout.open(conf, contextMock, collectorMock);
+spout.activate();
+
+ArgumentCaptor rebalanceListenerCapture 
= ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
+
+//Assign partitions to the spout
+ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
+consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+}
+
+@Test
+public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
+//Spout should ack failed messages after they hit the retry limit
+try (SimulatedTime simulatedTime = new SimulatedTime()) {
+setupSpoutWithNoRetry(Collections.singleton(partition));
+Map>> 
records = new HashMap<>();
+List> recordsForPartition = new 
ArrayList<>();
+int lastOffset = 3;
+for (int i = 0; i <= lastOffset; i++) {
+recordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), i

[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...

2017-07-14 Thread askprasanna
Github user askprasanna commented on a diff in the pull request:

https://github.com/apache/storm/pull/2153#discussion_r127540789
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+
+public class KafkaSpoutRetryLimitTest {
+
+private final long offsetCommitPeriodMs = 2_000;
+private final TopologyContext contextMock = 
mock(TopologyContext.class);
+private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+private final Map conf = new HashMap<>();
+private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+private KafkaConsumer consumerMock;
+private KafkaSpout spout;
+private KafkaSpoutConfig spoutConfig;
+
+public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
+new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+0, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+private void setupSpoutWithNoRetry(Set 
assignedPartitions) {
+spoutConfig = getKafkaSpoutConfigBuilder(-1)
+.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+.setRetry(ZERO_RETRIES_RETRY_SERVICE)
+.build();
+
+consumerMock = mock(KafkaConsumer.class);
+KafkaConsumerFactory consumerFactory = 
(kafkaSpoutConfig) -> consumerMock;
+
+//Set up a spout listening to 1 topic partition
--- End diff --

will remove this comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...

2017-07-14 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2153#discussion_r127539919
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+
+public class KafkaSpoutRetryLimitTest {
+
+private final long offsetCommitPeriodMs = 2_000;
+private final TopologyContext contextMock = 
mock(TopologyContext.class);
+private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+private final Map conf = new HashMap<>();
+private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+private KafkaConsumer consumerMock;
+private KafkaSpout spout;
+private KafkaSpoutConfig spoutConfig;
+
+public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
+new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+0, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+private void setupSpoutWithNoRetry(Set 
assignedPartitions) {
+spoutConfig = getKafkaSpoutConfigBuilder(-1)
+.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+.setRetry(ZERO_RETRIES_RETRY_SERVICE)
+.build();
+
+consumerMock = mock(KafkaConsumer.class);
+KafkaConsumerFactory consumerFactory = 
(kafkaSpoutConfig) -> consumerMock;
+
+//Set up a spout listening to 1 topic partition
+spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+spout.open(conf, contextMock, collectorMock);
+spout.activate();
+
+ArgumentCaptor rebalanceListenerCapture 
= ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
+
+//Assign partitions to the spout
+ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
+consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+}
+
+@Test
+public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
+//Spout should ack failed messages after they hit the retry limit
+try (SimulatedTime simulatedTime = new SimulatedTime()) {
+setupSpoutWithNoRetry(Collections.singleton(partition));
+Map>> 
records = new HashMap<>();
+List> recordsForPartition = new 
ArrayList<>();
+int lastOffset = 3;
+for (int i = 0; i <= lastOffset; i++) {
+recordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), i, "key"

[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...

2017-07-14 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2153#discussion_r127539677
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+
+public class KafkaSpoutRetryLimitTest {
+
+private final long offsetCommitPeriodMs = 2_000;
+private final TopologyContext contextMock = 
mock(TopologyContext.class);
+private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+private final Map conf = new HashMap<>();
+private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+private KafkaConsumer consumerMock;
+private KafkaSpout spout;
+private KafkaSpoutConfig spoutConfig;
+
+public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
+new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+0, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+private void setupSpoutWithNoRetry(Set 
assignedPartitions) {
+spoutConfig = getKafkaSpoutConfigBuilder(-1)
+.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+.setRetry(ZERO_RETRIES_RETRY_SERVICE)
+.build();
+
+consumerMock = mock(KafkaConsumer.class);
+KafkaConsumerFactory consumerFactory = 
(kafkaSpoutConfig) -> consumerMock;
+
+//Set up a spout listening to 1 topic partition
--- End diff --

Nit: This is only true if assignedPartitions only has one entry


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2153: [STORM-2544] Fixing issue in acking of tuples that...

2017-07-14 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2153#discussion_r127539975
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+
+public class KafkaSpoutRetryLimitTest {
+
+private final long offsetCommitPeriodMs = 2_000;
+private final TopologyContext contextMock = 
mock(TopologyContext.class);
+private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+private final Map conf = new HashMap<>();
+private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+private KafkaConsumer consumerMock;
+private KafkaSpout spout;
+private KafkaSpoutConfig spoutConfig;
+
+public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
+new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+0, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+private void setupSpoutWithNoRetry(Set 
assignedPartitions) {
+spoutConfig = getKafkaSpoutConfigBuilder(-1)
+.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+.setRetry(ZERO_RETRIES_RETRY_SERVICE)
+.build();
+
+consumerMock = mock(KafkaConsumer.class);
+KafkaConsumerFactory consumerFactory = 
(kafkaSpoutConfig) -> consumerMock;
+
+//Set up a spout listening to 1 topic partition
+spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+spout.open(conf, contextMock, collectorMock);
+spout.activate();
+
+ArgumentCaptor rebalanceListenerCapture 
= ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
+
+//Assign partitions to the spout
+ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
+consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+}
+
+@Test
+public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
+//Spout should ack failed messages after they hit the retry limit
+try (SimulatedTime simulatedTime = new SimulatedTime()) {
+setupSpoutWithNoRetry(Collections.singleton(partition));
+Map>> 
records = new HashMap<>();
+List> recordsForPartition = new 
ArrayList<>();
+int lastOffset = 3;
+for (int i = 0; i <= lastOffset; i++) {
+recordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), i, "key"

[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...

2017-07-14 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2152#discussion_r127538268
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
 ---
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
+
+public class KafkaSpoutCommitTest {
+
+private final long offsetCommitPeriodMs = 2_000;
+private final TopologyContext contextMock = 
mock(TopologyContext.class);
+private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+private final Map conf = new HashMap<>();
+private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+private KafkaConsumer consumerMock;
+private KafkaSpout spout;
+private KafkaSpoutConfig spoutConfig;
+
+@Captor
+private ArgumentCaptor> 
commitCapture;
+
+private void setupSpout(Set assignedPartitions) {
+MockitoAnnotations.initMocks(this);
+spoutConfig = getKafkaSpoutConfigBuilder(-1)
+.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+.build();
+
+consumerMock = mock(KafkaConsumer.class);
+KafkaConsumerFactory consumerFactory = 
(kafkaSpoutConfig) -> consumerMock;
+
+//Set up a spout listening to 1 topic partition
+spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+spout.open(conf, contextMock, collectorMock);
+spout.activate();
+
+ArgumentCaptor rebalanceListenerCapture 
= ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
+
+//Assign partitions to the spout
+ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
+consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+}
+
+@Test
+public void testCommitSuccessWithOffsetVoids() {
+//Verify that the commit logic can handle offset voids
+try (SimulatedTime simulatedTime = new SimulatedTime()) {
+setupSpout(Collections.singleton(partition));
+Map>> 
records = new HashMap<>();
+List> recordsForPartition = new 
ArrayList<>();
+// Offsets emitted are 0,1,2,3,4,...,8,9
--- End diff --

Nit: This comment is a little confusing, the ... would usually imply that 
5,6,7 are included. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...

2017-07-14 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2152#discussion_r127538620
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
 ---
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
+
+public class KafkaSpoutCommitTest {
+
+private final long offsetCommitPeriodMs = 2_000;
+private final TopologyContext contextMock = 
mock(TopologyContext.class);
+private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+private final Map conf = new HashMap<>();
+private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+private KafkaConsumer consumerMock;
+private KafkaSpout spout;
+private KafkaSpoutConfig spoutConfig;
+
+@Captor
+private ArgumentCaptor> 
commitCapture;
+
+private void setupSpout(Set assignedPartitions) {
+MockitoAnnotations.initMocks(this);
+spoutConfig = getKafkaSpoutConfigBuilder(-1)
+.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+.build();
+
+consumerMock = mock(KafkaConsumer.class);
+KafkaConsumerFactory consumerFactory = 
(kafkaSpoutConfig) -> consumerMock;
+
+//Set up a spout listening to 1 topic partition
+spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+spout.open(conf, contextMock, collectorMock);
+spout.activate();
+
+ArgumentCaptor rebalanceListenerCapture 
= ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
+
+//Assign partitions to the spout
+ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
+consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+}
+
+@Test
+public void testCommitSuccessWithOffsetVoids() {
+//Verify that the commit logic can handle offset voids
+try (SimulatedTime simulatedTime = new SimulatedTime()) {
+setupSpout(Collections.singleton(partition));
+Map>> 
records = new HashMap<>();
+List> recordsForPartition = new 
ArrayList<>();
+// Offsets emitted are 0,1,2,3,4,...,8,9
+for (int i = 0; i < 5; i++) {
+recordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+}
+for (int i = 8; i < 10; i++) {
+recordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+}
+records.put(partition, recordsForPartition);
+
+when(consumerMock.poll(anyLong()))
+.thenReturn(new ConsumerRecords(records));
+
+for (int i = 0; i < r

[GitHub] storm pull request #2152: [STORM-2505] OffsetManager should account for offs...

2017-07-14 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2152#discussion_r127538410
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
 ---
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
+
+public class KafkaSpoutCommitTest {
+
+private final long offsetCommitPeriodMs = 2_000;
+private final TopologyContext contextMock = 
mock(TopologyContext.class);
+private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+private final Map conf = new HashMap<>();
+private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+private KafkaConsumer consumerMock;
+private KafkaSpout spout;
+private KafkaSpoutConfig spoutConfig;
+
+@Captor
+private ArgumentCaptor> 
commitCapture;
+
+private void setupSpout(Set assignedPartitions) {
+MockitoAnnotations.initMocks(this);
+spoutConfig = getKafkaSpoutConfigBuilder(-1)
+.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+.build();
+
+consumerMock = mock(KafkaConsumer.class);
+KafkaConsumerFactory consumerFactory = 
(kafkaSpoutConfig) -> consumerMock;
+
+//Set up a spout listening to 1 topic partition
+spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+spout.open(conf, contextMock, collectorMock);
+spout.activate();
+
+ArgumentCaptor rebalanceListenerCapture 
= ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
+
+//Assign partitions to the spout
+ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
+consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+}
+
+@Test
+public void testCommitSuccessWithOffsetVoids() {
+//Verify that the commit logic can handle offset voids
+try (SimulatedTime simulatedTime = new SimulatedTime()) {
+setupSpout(Collections.singleton(partition));
+Map>> 
records = new HashMap<>();
+List> recordsForPartition = new 
ArrayList<>();
+// Offsets emitted are 0,1,2,3,4,...,8,9
+for (int i = 0; i < 5; i++) {
+recordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+}
+for (int i = 8; i < 10; i++) {
+recordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+}
+records.put(partition, recordsForPartition);
+
+when(consumerMock.poll(anyLong()))
+.thenReturn(new ConsumerRecords(records));
+
+for (int i = 0; i < r

Re: New Committer/PMC Member: Stig Rohde Døssing

2017-07-14 Thread Hugo Da Cruz Louro
Welcome Stig. Looking forward to collaborating with you.

Hugo

> On Jul 14, 2017, at 11:44 AM, P. Taylor Goetz  wrote:
> 
> Please join me in congratulating the latest Committer and PMC Member, Stig 
> Rohde Døssing.
> 
> Stig has been very active contributing patches to Storm’s Kafka integration, 
> participating in code reviews, and answering questions on the mailing lists.
> 
> Welcome Stig!
> 
> -Taylor
> 



New Committer/PMC Member: Stig Rohde Døssing

2017-07-14 Thread P. Taylor Goetz
Please join me in congratulating the latest Committer and PMC Member, Stig 
Rohde Døssing.

Stig has been very active contributing patches to Storm’s Kafka integration, 
participating in code reviews, and answering questions on the mailing lists.

Welcome Stig!

-Taylor

[GitHub] storm issue #2209: [STORM-2622] Add owner resource summary on storm UI

2017-07-14 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2209
  
![screen shot 2017-07-14 at 9 08 15 
am](https://user-images.githubusercontent.com/14900612/28215594-2b6aa5b8-6874-11e7-808c-09395d0ffaa5.png)
https://user-images.githubusercontent.com/14900612/28215602-313cb8e6-6874-11e7-8055-55d6c0b491c3.png";>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2209: [STORM-2622] Add owner resource summary on storm U...

2017-07-14 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2209#discussion_r127463843
  
--- Diff: storm-server/src/main/java/org/apache/storm/LocalCluster.java ---
@@ -1095,6 +1096,12 @@ public TopologyHistoryInfo getTopologyHistory(String 
user) throws AuthorizationE
 return ret;
 }
 }
+
+@Override
+public List getOwnerResourceSummaries(String 
owner) throws AuthorizationException, TException {
+// TODO Auto-generated method stub
+throw new RuntimeException("NOT IMPLMENETED YET");
--- End diff --

Thanks. Fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2207: [STORM-2626] Provided a template for drpc-auth-acl.yaml

2017-07-14 Thread liu-zhaokun
Github user liu-zhaokun commented on the issue:

https://github.com/apache/storm/pull/2207
  
@HeartSaVioR 
I am so sorry to bother you.Do you have time to help me review it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2208: [STORM-2627] The annotation of storm.zookeeper.topology.a...

2017-07-14 Thread liu-zhaokun
Github user liu-zhaokun commented on the issue:

https://github.com/apache/storm/pull/2208
  
@HeartSaVioR 
I am so sorry to bother you.Do you have time to help me review it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---