[GitHub] storm pull request #1873: Kafka spout - no duplicates on leader changes

2017-01-12 Thread ernisv
GitHub user ernisv opened a pull request:

https://github.com/apache/storm/pull/1873

Kafka spout - no duplicates on leader changes

Current behavior of Kafka spout emits duplicate tuples whenever Kafka topic 
leader's change.
In case of exception caused by leader changes, PartitionManagers are simply 
recreated losing the state about which tuples were already emitted and new 
PartitionManager re-emits them again.

This is fine as at-least-once is fulfilled, but still it would be better to 
not emit duplicate data if possible.
Moreover this could be easily avoided by moving the state related to 
emitted tuples from old PartitionManager to new one.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ernisv/storm 
kafka_spout_no_dup_on_leader_changes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1873.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1873


commit 9b1e336ed03a320b25e5799655eb64571ce21c48
Author: Ernestas Vaiciukevicius 
Date:   2017-01-12T14:54:59Z

Move state from old PartitionManager when recreating manager for same 
partition

commit a1a7cef9c84941ef8a1909fd4db10c85fe509e0e
Author: Ernestas Vaiciukevicius 
Date:   2017-01-12T15:39:51Z

Test to check if old PartitionManager's state is moved to new manager 
during manager recreate

commit c8c6ee83d69cf76d8aaeb9d5ccaedbd5946d4c9b
Author: Ernestas Vaiciukevicius 
Date:   2017-01-12T15:57:46Z

Include _emittedToOffset when copying state during PartitionManager recreate




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1873: Kafka spout - no duplicates on leader changes

2017-01-16 Thread ernisv
Github user ernisv commented on the issue:

https://github.com/apache/storm/pull/1873
  
Created JIRA issue: https://issues.apache.org/jira/browse/STORM-2296


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes 1 ...

2017-01-19 Thread ernisv
GitHub user ernisv opened a pull request:

https://github.com/apache/storm/pull/1888

STORM-2296 Kafka spout no dup on leader changes 1 x

Current behavior of Kafka spout emits duplicate tuples whenever Kafka topic 
leader's change.
In case of exception caused by leader changes, PartitionManagers are simply 
recreated losing the state about which tuples were already emitted and new 
PartitionManager re-emits them again.

This is fine as at-least-once is fulfilled, but still it would be better to 
not emit duplicate data if possible.
Moreover this could be easily avoided by moving the state related to 
emitted tuples from old PartitionManager to new one.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ernisv/storm 
kafka_spout_no_dup_on_leader_changes_1_x

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1888.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1888


commit fb9c3073f5babc35828abdcf897db31846cabecc
Author: Ernestas Vaiciukevicius 
Date:   2017-01-12T14:54:59Z

Move state from old PartitionManager when recreating manager for same 
partition

commit aefd80a5404b726d1dd538018b4f7f0bca119627
Author: Ernestas Vaiciukevicius 
Date:   2017-01-12T15:39:51Z

Test to check if old PartitionManager's state is moved to new manager 
during manager recreate

commit c744e4b7dcbca5082243a691d97f12cd4b1151c3
Author: Ernestas Vaiciukevicius 
Date:   2017-01-12T15:57:46Z

Include _emittedToOffset when copying state during PartitionManager recreate




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1873: STORM-2296 Kafka spout - no duplicates on leader changes

2017-01-19 Thread ernisv
Github user ernisv commented on the issue:

https://github.com/apache/storm/pull/1873
  
Created another pull request for 1.x-branch:

https://github.com/apache/storm/pull/1888


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes, 1...

2017-02-03 Thread ernisv
Github user ernisv commented on a diff in the pull request:

https://github.com/apache/storm/pull/1888#discussion_r99294012
  
--- Diff: 
external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
@@ -114,6 +113,33 @@ public void testPartitionsChange() throws Exception {
 }
 }
 
+@Test
+public void testPartitionManagerRecreate() throws Exception {
+final int totalTasks = 2;
+int partitionsPerTask = 2;
+List coordinatorList = buildCoordinators(totalTasks 
/ partitionsPerTask);
+
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
 9092)));
+List> partitionManagersBeforeRefresh = 
getPartitionManagers(coordinatorList);
+waitForRefresh();
+
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
 9093)));
+List> partitionManagersAfterRefresh = 
getPartitionManagers(coordinatorList);
+assertEquals(partitionManagersAfterRefresh.size(), 
partitionManagersAfterRefresh.size());
--- End diff --

Right, there's the same error in another test that got copy-pasted here :)
Fixed in both places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes, 1...

2017-02-03 Thread ernisv
Github user ernisv commented on a diff in the pull request:

https://github.com/apache/storm/pull/1888#discussion_r99294074
  
--- Diff: 
external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
@@ -114,6 +113,33 @@ public void testPartitionsChange() throws Exception {
 }
 }
 
+@Test
+public void testPartitionManagerRecreate() throws Exception {
+final int totalTasks = 2;
+int partitionsPerTask = 2;
+List coordinatorList = buildCoordinators(totalTasks 
/ partitionsPerTask);
+
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
 9092)));
+List> partitionManagersBeforeRefresh = 
getPartitionManagers(coordinatorList);
+waitForRefresh();
+
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
 9093)));
+List> partitionManagersAfterRefresh = 
getPartitionManagers(coordinatorList);
+assertEquals(partitionManagersAfterRefresh.size(), 
partitionManagersAfterRefresh.size());
+Iterator> iterator = 
partitionManagersAfterRefresh.iterator();
+for (List partitionManagersBefore : 
partitionManagersBeforeRefresh) {
+List partitionManagersAfter = 
iterator.next();
+for (PartitionManager before : partitionManagersBefore)
--- End diff --

Right, simplified the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1888: STORM-2296 Kafka spout no dup on leader changes, 1...

2017-02-03 Thread ernisv
Github user ernisv commented on a diff in the pull request:

https://github.com/apache/storm/pull/1888#discussion_r99295805
  
--- Diff: 
external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---
@@ -114,6 +113,33 @@ public void testPartitionsChange() throws Exception {
 }
 }
 
+@Test
+public void testPartitionManagerRecreate() throws Exception {
+final int totalTasks = 2;
+int partitionsPerTask = 2;
+List coordinatorList = buildCoordinators(totalTasks 
/ partitionsPerTask);
+
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
 9092)));
+List> partitionManagersBeforeRefresh = 
getPartitionManagers(coordinatorList);
+waitForRefresh();
+
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
 9093)));
+List> partitionManagersAfterRefresh = 
getPartitionManagers(coordinatorList);
+assertEquals(partitionManagersAfterRefresh.size(), 
partitionManagersAfterRefresh.size());
+Iterator> iterator = 
partitionManagersAfterRefresh.iterator();
+for (List partitionManagersBefore : 
partitionManagersBeforeRefresh) {
+List partitionManagersAfter = 
iterator.next();
+for (PartitionManager before : partitionManagersBefore)
+for (PartitionManager after: partitionManagersAfter)
+if (before.getPartition().partition == 
after.getPartition().partition)
+assertStateIsTheSame(before, after);
+}
+}
+
+private void assertStateIsTheSame(PartitionManager managerBefore, 
PartitionManager managerAfter) {
+// check if state was actually moved from old PartitionManager
+assertNotSame(managerBefore, managerAfter);
+assertSame(managerBefore._waitingToEmit, 
managerAfter._waitingToEmit);
--- End diff --

I've added checks for _emittedToOffset and _committedTo fields.
However _failedMsgRetryManager and _pending are private, so we either do 
not check them or  increase the fields visibility to "package". Do you think it 
warrants visibility change ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1888: STORM-2296 Kafka spout no dup on leader changes, 1.x-bran...

2017-02-03 Thread ernisv
Github user ernisv commented on the issue:

https://github.com/apache/storm/pull/1888
  
Ok, I've squashed all the changes to single commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1873: STORM-2296 Kafka spout - no duplicates on leader c...

2017-02-09 Thread ernisv
Github user ernisv closed the pull request at:

https://github.com/apache/storm/pull/1873


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1873: STORM-2296 Kafka spout - no duplicates on leader changes

2017-02-09 Thread ernisv
Github user ernisv commented on the issue:

https://github.com/apache/storm/pull/1873
  
That's fine - i'll close this PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1940: STORM-2361 Kafka spout - after leader change, it s...

2017-02-14 Thread ernisv
GitHub user ernisv opened a pull request:

https://github.com/apache/storm/pull/1940

STORM-2361 Kafka spout - after leader change, it stops committing offsets 
to ZK

Try to route acks/fails to recreated partition managers if manager is not 
found by exact (broker, partition, topic) combination

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ernisv/storm 
kafka-spout-leader-change-bug-2361

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1940.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1940


commit fe141fd79a0c7f1ec9a191359f9ccd6eb7a34f4b
Author: Ernestas Vaiciukevicius 
Date:   2017-02-14T15:41:02Z

Try to route acks/fails to recreated partition managers if manager is not 
found by exact (broker, partition, topic) combination




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1940: STORM-2361 Kafka spout - after leader change, it stops co...

2017-03-06 Thread ernisv
Github user ernisv commented on the issue:

https://github.com/apache/storm/pull/1940
  
Sure - braces added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1940: STORM-2361 Kafka spout - after leader change, it s...

2017-03-09 Thread ernisv
Github user ernisv commented on a diff in the pull request:

https://github.com/apache/storm/pull/1940#discussion_r105104946
  
--- Diff: 
external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java ---
@@ -172,6 +187,12 @@ public void fail(Object msgId) {
 PartitionManager m = _coordinator.getManager(id.partition);
 if (m != null) {
 m.fail(id.offset);
+} {
--- End diff --

Right - fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1940: STORM-2361 Kafka spout - after leader change, it stops co...

2017-03-09 Thread ernisv
Github user ernisv commented on the issue:

https://github.com/apache/storm/pull/1940
  
Squashed the commits.
About moving to STORM-2296 - I've put the links/relations to STORM-2296.
But as the issue itself is already in "resolved" state - not sure how the 
patch could be added to it.
Is registering the relation not enough?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1940: STORM-2361 Kafka spout - after leader change, it stops co...

2017-03-09 Thread ernisv
Github user ernisv commented on the issue:

https://github.com/apache/storm/pull/1940
  
Ok, changed the title


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---