[GitHub] storm issue #1826: STORM-2087 1.x
Github user jfenc91 commented on the issue: https://github.com/apache/storm/pull/1826 Thanks for picking this up @srdo! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...
Github user jfenc91 commented on the issue: https://github.com/apache/storm/pull/1679 @hmcl thanks for the review. I believe I made all the requested changes here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...
Github user jfenc91 commented on a diff in the pull request: https://github.com/apache/storm/pull/1679#discussion_r87907464 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import info.batey.kafka.unit.KafkaUnitRule; +import kafka.producer.KeyedMessage; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Values; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import static org.junit.Assert.*; + +import java.util.Map; +import java.util.stream.IntStream; +import static org.mockito.Mockito.*; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*; + +public class SingleTopicKafkaSpoutTest { --- End diff -- I addressed the initialization part. I think that abstracting the different test cases away algorithmically would add too much complexity. It is nice to have test cases that are easily to understand. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...
Github user jfenc91 commented on the issue: https://github.com/apache/storm/pull/1679 Hey, trying not to let this thread die. Could we get some help here from a committer? @HeartSaVioR @revans2 Thanks!! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...
Github user jfenc91 commented on a diff in the pull request: https://github.com/apache/storm/pull/1679#discussion_r82872195 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -266,26 +266,31 @@ private void doSeekRetriableTopicPartitions() { if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { -kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to last committed offset +kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);// Seek to last committed offset } } } // emit = private void emit() { -emitTupleIfNotEmitted(waitingToEmit.next()); -waitingToEmit.remove(); +while(!emitTupleIfNotEmitted(waitingToEmit.next()) && waitingToEmit.hasNext()) { +waitingToEmit.remove(); +} } -// emits one tuple per record -private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) { + +//Emits one tuple per record +//@return true if tuple was emitted +private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) { final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record); if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { // has been acked LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); +return false; --- End diff -- I removed the return statements @harshach --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...
Github user jfenc91 commented on the issue: https://github.com/apache/storm/pull/1679 I bumped the logging level for unexpected offsets to Warn. Anyway can we get this and the other PR merged @srdo @hmcl? Admittedly there is still work that needs to be done, but this PR is pretty messy at this point and the changes here (and in the other PR) makes the kafka spout mostly usable for version 10. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...
Github user jfenc91 commented on the issue: https://github.com/apache/storm/pull/1679 I am on version 0.10.0.1. Interesting, didn't realize unclean leader election was a default... So ya I had that enabled. Turning that off now though! Thanks @srdo for the tip and hopefully that will make this problem go away! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...
Github user jfenc91 commented on the issue: https://github.com/apache/storm/pull/1679 @srdo @hmcl I tracked down one source of the "double acking" It looks like I am being switched to reading from an out of sync replica. Looking at my offsets: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER bla_5_2 bla_5 102237919 50979 -186940 consumer-7_/123.456.789.101 When examining with my other partition logs there is no way that this log-end-offset should be at 50k. It should be around 200k-300k. When the connector sends the request for offset 237920 it is out of range and the resets the offset to whatever the start offset is: https://github.com/apache/kafka/blob/0.10.0.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L580-L589 . For me this happened to be uncomitted_earliest so I got messages starting at position 1. This is a really unfortunate situation. We should probably make an effort to handle it better so data loss / disruption is minimized. I kind of wonder if this ever happened with the kafka 8. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1696: STORM-2104: More graceful handling of acked/failed...
Github user jfenc91 commented on a diff in the pull request: https://github.com/apache/storm/pull/1696#discussion_r79973663 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -145,6 +154,10 @@ private void initialize(Collection partitions) { } retryService.retainAll(partitions); + +//Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted. +Set partitionsSet = new HashSet(partitions); +emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition())); --- End diff -- This looks good. I think this same logic may be needed in onPartitionsRevoked as well. Also, I believe the message may need to be removed from the retryService as well. Please correct me if I am wrong! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...
Github user jfenc91 commented on the issue: https://github.com/apache/storm/pull/1679 @srdo Alright I think everything has been addressed here. I have actually been running this merged with your other PR for the last 12 hours processing 100M tuples and its looked pretty good. Only issue has been: "org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member." which is unrelated to this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...
Github user jfenc91 commented on a diff in the pull request: https://github.com/apache/storm/pull/1679#discussion_r79956690 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -266,26 +266,32 @@ private void doSeekRetriableTopicPartitions() { if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { -kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to last committed offset +kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);// Seek to last committed offset } } } // emit = private void emit() { -emitTupleIfNotEmitted(waitingToEmit.next()); +//Keep trying to send a tuple when requested +while(!emitTupleIfNotEmitted(waitingToEmit.next()) && waitingToEmit.hasNext()) --- End diff -- Not a problem. Performance/throughput is very important to me so I am happy to go back to this! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...
Github user jfenc91 commented on a diff in the pull request: https://github.com/apache/storm/pull/1679#discussion_r79956580 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -266,26 +266,32 @@ private void doSeekRetriableTopicPartitions() { if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { -kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to last committed offset +kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);// Seek to last committed offset --- End diff -- Not a problem. Performance/throughput is very important to me so I am happy to go back to this! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...
Github user jfenc91 commented on the issue: https://github.com/apache/storm/pull/1679 Thats fine @srdo . Thanks again for the reviews. I also created STORM-2106 to keep track of the consequences of this change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...
Github user jfenc91 commented on a diff in the pull request: https://github.com/apache/storm/pull/1679#discussion_r79659246 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -266,26 +266,32 @@ private void doSeekRetriableTopicPartitions() { if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { -kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to last committed offset +kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);// Seek to last committed offset --- End diff -- @srdo Yes, in the case that the failed tuple is not the first message in the batch since the last committed offset things worked fine how they were previously. I agree. Honestly, the idea of auto commit mode seems to go against the philosophy of storm's processing guarantees. Putting the offsets that need to be retried in memory isn't enough in the case of restarts. I propose we address auto commit mode in a separate PR. Sound alright? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...
Github user jfenc91 commented on a diff in the pull request: https://github.com/apache/storm/pull/1679#discussion_r79662312 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -479,16 +487,17 @@ public OffsetAndMetadata findNextCommitOffset() { KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap -if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) {// found the next offset to commit +if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {// found the next offset to commit found = true; nextCommitMsg = currAckedMsg; nextCommitOffset = currOffset; } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); break; } else { -LOG.debug("topic-partition [{}] has unexpected offset [{}].", tp, currOffset); -break; +//Received a redundant ack. Ignore and continue processing. --- End diff -- I saw this a couple of times before I figured it out. I have not taken the time to reproduce this in a toy/test case, but given the error message this is clearly a storm or storm-kafka-client issue. I got to this state in about 30 minutes of running a topology processing 800k-300k tuples a minute with about 10s latency. The input to the topology was on the order of 2k-10k tuples per minute with a bolt that separated each input into multiple tuples. At startup there was a high amount of failures after the separation (I was making requests against an unwarmed ELB). I would guess that that is enough to reproduce with random data/failures. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...
Github user jfenc91 commented on a diff in the pull request: https://github.com/apache/storm/pull/1679#discussion_r79657089 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -451,11 +454,11 @@ public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) { /** * This class is not thread safe */ -private class OffsetEntry { +class OffsetEntry { private final TopicPartition tp; private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ -private long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1 +long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1 --- End diff -- Unless there is a way I don't know about, using reflection is hard to follow and difficult to refactor making it somewhat fragile. I am adding a protected get method and changing this back to private to hopefully address your concerns. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...
Github user jfenc91 commented on a diff in the pull request: https://github.com/apache/storm/pull/1679#discussion_r79540459 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import info.batey.kafka.unit.KafkaUnitRule; +import kafka.producer.KeyedMessage; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Values; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import static org.junit.Assert.*; + +import java.util.Map; +import java.util.stream.IntStream; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*; +import static org.mockito.Mockito.*; + +public class SingleTopicKafkaSpoutTest { + +@Rule +public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); + + +void populateTopicData(String topicName, int msgCount) { +kafkaUnitRule.getKafkaUnit().createTopic(topicName); + +IntStream.range(0, msgCount).forEach(value -> { +KeyedMessage<String, String> keyedMessage = new KeyedMessage<>( +topicName, ((Integer)value).toString(), --- End diff -- value is an int. So I changed the Integer casts to Integer.toString(value) that probably looks a bit nicer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...
Github user jfenc91 commented on the issue: https://github.com/apache/storm/pull/1679 Thanks for the review here @srdo! I will have a few more PRs headed your way in a week or two for this kafka client to make it fully usable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...
Github user jfenc91 commented on a diff in the pull request: https://github.com/apache/storm/pull/1679#discussion_r79481211 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import info.batey.kafka.unit.KafkaUnitRule; +import kafka.producer.KeyedMessage; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Values; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import static org.junit.Assert.*; + +import java.util.Map; +import java.util.stream.IntStream; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*; +import static org.mockito.Mockito.*; + +public class SingleTopicKafkaSpoutTest { + +@Rule +public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); + + +void populateTopicData(String topicName, int msgCount) { +kafkaUnitRule.getKafkaUnit().createTopic(topicName); + +IntStream.range(0, msgCount).forEach(value -> { +KeyedMessage<String, String> keyedMessage = new KeyedMessage<>( +topicName, ((Integer)value).toString(), +((Integer)value).toString()); + +kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage); +}); +} + +private void assertOffset(int offset, KafkaSpout.OffsetEntry entry) { +boolean currentOffsetMatch = entry.committedOffset == offset; +OffsetAndMetadata nextOffset = entry.findNextCommitOffset(); +boolean nextOffsetMatch = nextOffset != null && nextOffset.offset() == offset; +assertTrue("Next offset: " + +entry.findNextCommitOffset() + +", Current offset: " + +entry.committedOffset + +", Desired offset: " + +offset, +currentOffsetMatch | nextOffsetMatch); +} + +@Test +public void shouldContinueWithSlowDoubleAcks() throws Exception { +int messageCount = 20; +populateTopicData(SingleTopicKafkaSpoutConfiguration.topic, messageCount); +int kafkaPort = kafkaUnitRule.getKafkaPort(); + +TopologyContext topology = mock(TopologyContext.class); +SpoutOutputCollector collector = mock(SpoutOutputCollector.class); +Map conf = mock(Map.class); + +KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), kafkaPort)); +spout.open(conf, topology, collector); +spout.activate(); + +//play 1st tuple +ArgumentCaptor messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class); +spout.nextTuple(); +verify(collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture()); +spout.ack(messageIdToDoubleAck.getValue()); + +IntStream.range(0, messageCount/2).forEach(value -> { +spout.nextTuple(); +}); + +spout.ack(messageIdToDoubleAck.getValue()); + +IntStream.range(0, messageCount).forEach(value -> { +spout.nextTuple(); +}); + +ArgumentCaptor remainingIds = ArgumentCaptor.forClass(Object.class); + +verify(collector, times(messageCount)).emit( +eq(SingleTopicKafkaSpoutConfiguration.stream), +anyObject(), +
[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...
Github user jfenc91 commented on a diff in the pull request: https://github.com/apache/storm/pull/1679#discussion_r79480763 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -479,16 +487,17 @@ public OffsetAndMetadata findNextCommitOffset() { KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap -if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) {// found the next offset to commit +if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {// found the next offset to commit found = true; nextCommitMsg = currAckedMsg; nextCommitOffset = currOffset; } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); break; } else { -LOG.debug("topic-partition [{}] has unexpected offset [{}].", tp, currOffset); -break; +//Received a redundant ack. Ignore and continue processing. --- End diff -- So I added this because I was seeing acks on tuples that were behind the already committed offset. With that break statement in place, the result is a complete halt in processing. While this isn't pretty this is the only solution I could see. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1679: STORM-2087: storm-kafka-client - tuples not always...
Github user jfenc91 commented on a diff in the pull request: https://github.com/apache/storm/pull/1679#discussion_r79480290 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -266,26 +266,32 @@ private void doSeekRetriableTopicPartitions() { if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { -kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to last committed offset +kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);// Seek to last committed offset } } } // emit = private void emit() { -emitTupleIfNotEmitted(waitingToEmit.next()); +//Keep trying to send a tuple when requested +while(!emitTupleIfNotEmitted(waitingToEmit.next()) && waitingToEmit.hasNext()) --- End diff -- I was having issues with the spout reaching the max spout pending limit. Although it is apparent now that there are other contributing factors. So I will go back to the simpler way here. No need to overcomplicate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1679: STORM-2087: storm-kafka-client - tuples not always being ...
Github user jfenc91 commented on the issue: https://github.com/apache/storm/pull/1679 A larger refactor here is probably needed to make this more performant. These changes understandably seem to make the spout struggle to get the processing tuple count anywhere near the max spout pending. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1679: storm-kafka-client tests: tuples not being properl...
GitHub user jfenc91 opened a pull request: https://github.com/apache/storm/pull/1679 storm-kafka-client tests: tuples not being properly replayed I have been working with kafka 10 and storm 1.0.2 using the storm-kafka-client in master and am having issues with topologies that fail tuples. The kafka offsets seemed to get stuck and the kafka spout eventually halted even though the last committed offset was no where near the end of the queue. Here are a few unit tests that I believe replicate my situation. The last 2 are currently failing which is reflective of what I am seeing in my topologies. Let me know if I missed anything! This seems like a pretty big oversight, so I am getting the feeling that something in the test is wrong. Thanks! I added tests for the following cases: - All tuples being acked. - A tuple being failed in order it was emitted - A tuple being failed out of the order it was emitted You can merge this pull request into a Git repository by running: $ git pull https://github.com/jfenc91/storm stormKafkaClientTests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1679.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1679 commit 94da00023dceb3a9281c536f92c052e4e969835b Author: Jeff Fenchel <jfen...@gmail.com> Date: 2016-09-11T01:44:35Z Added unit tests to storm-kafka-client to cover reading from a single topic with: - All tuples being acked. - A tuple being failed in order it was emitted - A tuple being failed out of the order it was emitted --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---