[GitHub] storm issue #1826: STORM-2087 1.x

2016-12-18 Thread jfenc91
Github user jfenc91 commented on the issue:

https://github.com/apache/storm/pull/1826
  
Thanks for picking this up @srdo!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...

2016-11-14 Thread jfenc91
Github user jfenc91 commented on the issue:

https://github.com/apache/storm/pull/1679
  
@hmcl thanks for the review. I believe I made all the requested changes 
here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...

2016-11-14 Thread jfenc91
Github user jfenc91 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1679#discussion_r87907464
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.stream.IntStream;
+import static org.mockito.Mockito.*;
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
+
+public class SingleTopicKafkaSpoutTest {
--- End diff --

I addressed the initialization part. I think that abstracting the different 
test cases away algorithmically would add too much complexity. It is nice to 
have test cases that are easily to understand. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...

2016-10-27 Thread jfenc91
Github user jfenc91 commented on the issue:

https://github.com/apache/storm/pull/1679
  
Hey, trying not to let this thread die. Could we get some help here from a 
committer? @HeartSaVioR @revans2 Thanks!!  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...

2016-10-11 Thread jfenc91
Github user jfenc91 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1679#discussion_r82872195
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -266,26 +266,31 @@ private void doSeekRetriableTopicPartitions() {
 if (offsetAndMeta != null) {
 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // 
seek to the next offset that is ready to commit in next commit cycle
 } else {
-kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to 
last committed offset
+kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 
1);// Seek to last committed offset
 }
 }
 }
 
 //  emit  =
 private void emit() {
-emitTupleIfNotEmitted(waitingToEmit.next());
-waitingToEmit.remove();
+while(!emitTupleIfNotEmitted(waitingToEmit.next()) && 
waitingToEmit.hasNext()) {
+waitingToEmit.remove();
+}
 }
 
-// emits one tuple per record
-private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
+
+//Emits one tuple per record
+//@return true if tuple was emitted
+private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
 final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
 final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
 
 if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // 
has been acked
 LOG.trace("Tuple for record [{}] has already been acked. 
Skipping", record);
+return false;
--- End diff --

I removed the return statements @harshach 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...

2016-10-07 Thread jfenc91
Github user jfenc91 commented on the issue:

https://github.com/apache/storm/pull/1679
  
I bumped the logging level for unexpected offsets to Warn. Anyway can 
we get this and the other PR merged @srdo @hmcl? Admittedly there is still work 
that needs to be done, but this PR is pretty messy at this point and the 
changes here (and in the other PR) makes the kafka spout mostly usable for 
version 10. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...

2016-09-24 Thread jfenc91
Github user jfenc91 commented on the issue:

https://github.com/apache/storm/pull/1679
  
I am on version 0.10.0.1. Interesting, didn't realize unclean leader 
election was a default... So ya I had that enabled. Turning that off now 
though! Thanks @srdo for the tip and hopefully that will make this problem go 
away! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...

2016-09-24 Thread jfenc91
Github user jfenc91 commented on the issue:

https://github.com/apache/storm/pull/1679
  
@srdo @hmcl I tracked down one source of the "double acking" 

It looks like I am being switched to reading from an out of sync replica. 
Looking at my offsets:
GROUP  TOPIC  PARTITION  
CURRENT-OFFSET  LOG-END-OFFSET  LAG OWNER
 bla_5_2   bla_5 102237919  50979   
-186940 consumer-7_/123.456.789.101

When examining with my other partition logs there is no way that this 
log-end-offset should be at 50k. It should be around 200k-300k. When the 
connector sends the request for offset 237920 it is out of range and the resets 
the offset to whatever the start offset is: 
https://github.com/apache/kafka/blob/0.10.0.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L580-L589
 . For me this happened to be uncomitted_earliest so I got messages starting at 
position 1. 

This is a really unfortunate situation. We should probably make an effort 
to handle it better so data loss / disruption is minimized. I kind of wonder if 
this ever happened with the kafka 8. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1696: STORM-2104: More graceful handling of acked/failed...

2016-09-21 Thread jfenc91
Github user jfenc91 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1696#discussion_r79973663
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -145,6 +154,10 @@ private void initialize(Collection 
partitions) {
 }
 
 retryService.retainAll(partitions);
+
+//Emitted messages for partitions that are no longer assigned 
to this spout can't be acked, and they shouldn't be retried. Remove them from 
emitted.
+Set partitionsSet = new HashSet(partitions);
+emitted.removeIf((msgId) -> 
!partitionsSet.contains(msgId.getTopicPartition()));
--- End diff --

This looks good. I think this same logic may be needed in 
onPartitionsRevoked as well. Also, I believe the message may need to be removed 
from the retryService as well. Please correct me if I am wrong! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...

2016-09-21 Thread jfenc91
Github user jfenc91 commented on the issue:

https://github.com/apache/storm/pull/1679
  
@srdo Alright I think everything has been addressed here. I have actually 
been running this merged with your other PR for the last 12 hours processing 
100M tuples and its looked pretty good. Only issue has been: 
"org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member." which is unrelated to this PR.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...

2016-09-21 Thread jfenc91
Github user jfenc91 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1679#discussion_r79956690
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -266,26 +266,32 @@ private void doSeekRetriableTopicPartitions() {
 if (offsetAndMeta != null) {
 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // 
seek to the next offset that is ready to commit in next commit cycle
 } else {
-kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to 
last committed offset
+kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 
1);// Seek to last committed offset
 }
 }
 }
 
 //  emit  =
 private void emit() {
-emitTupleIfNotEmitted(waitingToEmit.next());
+//Keep trying to send a tuple when requested
+while(!emitTupleIfNotEmitted(waitingToEmit.next()) && 
waitingToEmit.hasNext())
--- End diff --

Not a problem. Performance/throughput is very important to me so I am happy 
to go back to this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...

2016-09-21 Thread jfenc91
Github user jfenc91 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1679#discussion_r79956580
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -266,26 +266,32 @@ private void doSeekRetriableTopicPartitions() {
 if (offsetAndMeta != null) {
 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // 
seek to the next offset that is ready to commit in next commit cycle
 } else {
-kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to 
last committed offset
+kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 
1);// Seek to last committed offset
--- End diff --

Not a problem. Performance/throughput is very important to me so I am happy 
to go back to this! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...

2016-09-20 Thread jfenc91
Github user jfenc91 commented on the issue:

https://github.com/apache/storm/pull/1679
  
Thats fine @srdo . Thanks again for the reviews. I also created STORM-2106 
to keep track of the consequences of this change. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...

2016-09-20 Thread jfenc91
Github user jfenc91 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1679#discussion_r79659246
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -266,26 +266,32 @@ private void doSeekRetriableTopicPartitions() {
 if (offsetAndMeta != null) {
 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // 
seek to the next offset that is ready to commit in next commit cycle
 } else {
-kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to 
last committed offset
+kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 
1);// Seek to last committed offset
--- End diff --

@srdo Yes, in the case that the failed tuple is not the first message in 
the batch since the last committed offset things worked fine how they were 
previously. 

I agree. Honestly, the idea of auto commit mode seems to go against the 
philosophy of storm's processing guarantees. Putting the offsets that need to 
be retried in memory isn't enough in the case of restarts. I propose we address 
auto commit mode in a separate PR. Sound alright?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...

2016-09-20 Thread jfenc91
Github user jfenc91 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1679#discussion_r79662312
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -479,16 +487,17 @@ public OffsetAndMetadata findNextCommitOffset() {
 KafkaSpoutMessageId nextCommitMsg = null; // this is a 
convenience variable to make it faster to create OffsetAndMetadata
 
 for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // 
complexity is that of a linear scan on a TreeMap
-if ((currOffset = currAckedMsg.offset()) == 
initialFetchOffset || currOffset == nextCommitOffset + 1) {// found 
the next offset to commit
+if ((currOffset = currAckedMsg.offset()) == 
nextCommitOffset + 1) {// found the next offset to commit
 found = true;
 nextCommitMsg = currAckedMsg;
 nextCommitOffset = currOffset;
 } else if (currAckedMsg.offset() > nextCommitOffset + 1) { 
   // offset found is not continuous to the offsets listed to go in the next 
commit, so stop search
 LOG.debug("topic-partition [{}] has non-continuous 
offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
 break;
 } else {
-LOG.debug("topic-partition [{}] has unexpected offset 
[{}].", tp, currOffset);
-break;
+//Received a redundant ack. Ignore and continue 
processing.
--- End diff --

I saw this a couple of times before I figured it out. I have not taken the 
time to reproduce this in a toy/test case, but given the error message this is 
clearly a storm or storm-kafka-client issue. I got to this state in about 30 
minutes of running a topology processing 800k-300k tuples a minute with about 
10s latency. The input to the topology was on the order of 2k-10k tuples per 
minute with a bolt that separated each input into multiple tuples. At startup 
there was a high amount of failures after the separation (I was making requests 
against an unwarmed ELB). I would guess that that is enough to reproduce with 
random data/failures. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...

2016-09-20 Thread jfenc91
Github user jfenc91 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1679#discussion_r79657089
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -451,11 +454,11 @@ public int compare(KafkaSpoutMessageId m1, 
KafkaSpoutMessageId m2) {
 /**
  * This class is not thread safe
  */
-private class OffsetEntry {
+class OffsetEntry {
 private final TopicPartition tp;
 private final long initialFetchOffset;  /* First offset to be 
fetched. It is either set to the beginning, end, or to the first uncommitted 
offset.
  * Initial value depends 
on offset strategy. See KafkaSpoutConsumerRebalanceListener */
-private long committedOffset; // last offset committed to 
Kafka. Initially it is set to fetchOffset - 1
+long committedOffset; // last offset committed to Kafka. 
Initially it is set to fetchOffset - 1
--- End diff --

Unless there is a way I don't know about, using reflection is hard to 
follow and difficult to refactor making it somewhat fragile. I am adding a 
protected get method and changing this back to private to hopefully address 
your concerns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...

2016-09-19 Thread jfenc91
Github user jfenc91 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1679#discussion_r79540459
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.stream.IntStream;
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
+import static org.mockito.Mockito.*;
+
+public class SingleTopicKafkaSpoutTest {
+
+@Rule
+public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+
+void populateTopicData(String topicName, int msgCount) {
+kafkaUnitRule.getKafkaUnit().createTopic(topicName);
+
+IntStream.range(0, msgCount).forEach(value -> {
+KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
+topicName, ((Integer)value).toString(),
--- End diff --

value is an int. So I changed the Integer casts to Integer.toString(value) 
 that probably looks a bit nicer 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...

2016-09-19 Thread jfenc91
Github user jfenc91 commented on the issue:

https://github.com/apache/storm/pull/1679
  
Thanks for the review here @srdo! I will have a few more PRs headed your 
way in a week or two for this kafka client to make it fully usable.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...

2016-09-19 Thread jfenc91
Github user jfenc91 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1679#discussion_r79481211
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.stream.IntStream;
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
+import static org.mockito.Mockito.*;
+
+public class SingleTopicKafkaSpoutTest {
+
+@Rule
+public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+
+void populateTopicData(String topicName, int msgCount) {
+kafkaUnitRule.getKafkaUnit().createTopic(topicName);
+
+IntStream.range(0, msgCount).forEach(value -> {
+KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
+topicName, ((Integer)value).toString(),
+((Integer)value).toString());
+
+kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
+});
+}
+
+private void assertOffset(int offset, KafkaSpout.OffsetEntry entry) {
+boolean currentOffsetMatch = entry.committedOffset == offset;
+OffsetAndMetadata nextOffset = entry.findNextCommitOffset();
+boolean nextOffsetMatch =  nextOffset != null && 
nextOffset.offset() == offset;
+assertTrue("Next offset: " +
+entry.findNextCommitOffset() +
+", Current offset: " +
+entry.committedOffset +
+", Desired offset: " +
+offset,
+currentOffsetMatch | nextOffsetMatch);
+}
+
+@Test
+public void shouldContinueWithSlowDoubleAcks() throws Exception {
+int messageCount = 20;
+populateTopicData(SingleTopicKafkaSpoutConfiguration.topic, 
messageCount);
+int kafkaPort = kafkaUnitRule.getKafkaPort();
+
+TopologyContext topology = mock(TopologyContext.class);
+SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+Map conf = mock(Map.class);
+
+KafkaSpout spout = new 
KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), kafkaPort));
+spout.open(conf, topology, collector);
+spout.activate();
+
+//play 1st tuple
+ArgumentCaptor messageIdToDoubleAck = 
ArgumentCaptor.forClass(Object.class);
+spout.nextTuple();
+verify(collector).emit(anyObject(), anyObject(), 
messageIdToDoubleAck.capture());
+spout.ack(messageIdToDoubleAck.getValue());
+
+IntStream.range(0, messageCount/2).forEach(value -> {
+spout.nextTuple();
+});
+
+spout.ack(messageIdToDoubleAck.getValue());
+
+IntStream.range(0, messageCount).forEach(value -> {
+spout.nextTuple();
+});
+
+ArgumentCaptor remainingIds = 
ArgumentCaptor.forClass(Object.class);
+
+verify(collector, times(messageCount)).emit(
+eq(SingleTopicKafkaSpoutConfiguration.stream),
+anyObject(),
+  

[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...

2016-09-19 Thread jfenc91
Github user jfenc91 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1679#discussion_r79480763
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -479,16 +487,17 @@ public OffsetAndMetadata findNextCommitOffset() {
 KafkaSpoutMessageId nextCommitMsg = null; // this is a 
convenience variable to make it faster to create OffsetAndMetadata
 
 for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // 
complexity is that of a linear scan on a TreeMap
-if ((currOffset = currAckedMsg.offset()) == 
initialFetchOffset || currOffset == nextCommitOffset + 1) {// found 
the next offset to commit
+if ((currOffset = currAckedMsg.offset()) == 
nextCommitOffset + 1) {// found the next offset to commit
 found = true;
 nextCommitMsg = currAckedMsg;
 nextCommitOffset = currOffset;
 } else if (currAckedMsg.offset() > nextCommitOffset + 1) { 
   // offset found is not continuous to the offsets listed to go in the next 
commit, so stop search
 LOG.debug("topic-partition [{}] has non-continuous 
offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
 break;
 } else {
-LOG.debug("topic-partition [{}] has unexpected offset 
[{}].", tp, currOffset);
-break;
+//Received a redundant ack. Ignore and continue 
processing.
--- End diff --

So I added this because I was seeing acks on tuples that were behind the 
already committed offset. With that break statement in place, the result is a 
complete halt in processing. While this isn't pretty this is the only solution 
I could see.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...

2016-09-19 Thread jfenc91
Github user jfenc91 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1679#discussion_r79480290
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -266,26 +266,32 @@ private void doSeekRetriableTopicPartitions() {
 if (offsetAndMeta != null) {
 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // 
seek to the next offset that is ready to commit in next commit cycle
 } else {
-kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to 
last committed offset
+kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 
1);// Seek to last committed offset
 }
 }
 }
 
 //  emit  =
 private void emit() {
-emitTupleIfNotEmitted(waitingToEmit.next());
+//Keep trying to send a tuple when requested
+while(!emitTupleIfNotEmitted(waitingToEmit.next()) && 
waitingToEmit.hasNext())
--- End diff --

I was having issues with the spout reaching the max spout pending limit. 
Although it is apparent now that there are other contributing factors. So I 
will go back to the simpler way here. No need to overcomplicate. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...

2016-09-12 Thread jfenc91
Github user jfenc91 commented on the issue:

https://github.com/apache/storm/pull/1679
  
A larger refactor here is probably needed to make this more performant. 
These changes understandably seem to make the spout struggle to get the 
processing tuple count anywhere near the max spout pending. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1679: storm-kafka-client tests: tuples not being properl...

2016-09-10 Thread jfenc91
GitHub user jfenc91 opened a pull request:

https://github.com/apache/storm/pull/1679

storm-kafka-client tests: tuples not being properly replayed

I have been working with kafka 10 and storm 1.0.2 using the 
storm-kafka-client in master and am having issues with topologies that fail 
tuples. The kafka offsets seemed to get stuck and the kafka spout eventually 
halted even though the last committed offset was no where near the end of the 
queue. 

Here are a few unit tests that I believe replicate my situation. The last 2 
are currently failing which is reflective of what I am seeing in my topologies. 
Let me know if I missed anything! This seems like a pretty big oversight, so I 
am getting the feeling that something in the test is wrong. Thanks!

I added tests for the following cases:
  - All tuples being acked.
  - A tuple being failed in order it was emitted
  - A tuple being failed out of the order it was emitted


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jfenc91/storm stormKafkaClientTests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1679.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1679


commit 94da00023dceb3a9281c536f92c052e4e969835b
Author: Jeff Fenchel <jfen...@gmail.com>
Date:   2016-09-11T01:44:35Z

Added unit tests to storm-kafka-client to cover reading from a single
topic with:
  - All tuples being acked.
  - A tuple being failed in order it was emitted
  - A tuple being failed out of the order it was emitted




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---