[GitHub] storm pull request #2454: STORM-2847: Ensure spout can handle being activate...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2454#discussion_r157104947 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -442,10 +435,13 @@ private boolean isEmitTuple(List tuple) { return tuple != null || kafkaSpoutConfig.isEmitNullTuples(); } -private void commitOffsetsForAckedTuples() { -// Find offsets that are ready to be committed for every topic partition +private void commitOffsetsForAckedTuples(Set assignedPartitions) { +// Find offsets that are ready to be committed for every assigned topic partition +final Map<TopicPartition, OffsetManager> assignedOffsetManagers = offsetManagers.entrySet().stream() +.filter(entry -> assignedPartitions.contains(entry.getKey())) +.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>(); --- End diff -- An empty line before this var would make the code easier to read ---
[GitHub] storm issue #2454: STORM-2847: Ensure spout can handle being activated and d...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2454 @srdo Can you explain how can the drawback scenario you describe in your [comment](https://github.com/apache/storm/pull/2454#issuecomment-351428500) happen? When activate happens, refresh partitions will be called, onPartitionsRevoked will commit only for the partitions that are now assigned to the consumer (so acks that make commits eligible for other partitions won't matter), and onPartitionsReassigned will remove the offset managers for the partitions that are no longer assigned to this spout instance. ---
[GitHub] storm issue #2460: STORM-2851 1.x
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2460 +1 ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157842966 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java --- @@ -55,12 +58,12 @@ public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAc manager.addToAckMsgs(getMessageId(initialFetchOffset + 2)); manager.addToAckMsgs(getMessageId(initialFetchOffset + 6)); -assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset().offset(), is(initialFetchOffset + 3)); +assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(COMMIT_METADATA).offset(), is(initialFetchOffset + 3)); manager.addToAckMsgs(getMessageId(initialFetchOffset + 5)); assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", -manager.findNextCommitOffset().offset(), is(initialFetchOffset + 7)); +manager.findNextCommitOffset(COMMIT_METADATA).offset(), is(initialFetchOffset + 7)); --- End diff -- OK. ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157831079 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -115,26 +115,30 @@ public KafkaSpoutConfig(Builder<K, V> builder) { } /** - * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will affect the number of consumer - * records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. + * Defines the offset used by the {@link KafkaSpout} in the first poll to Kafka broker. The choice of this parameter will affect + * the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. --- End diff -- OK. ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157831049 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -115,26 +115,30 @@ public KafkaSpoutConfig(Builder<K, V> builder) { } /** - * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will affect the number of consumer - * records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. + * Defines the offset used by the {@link KafkaSpout} in the first poll to Kafka broker. The choice of this parameter will affect + * the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. * - * EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous - * commits - * LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of - * previous commits + * EARLIEST when the topology is first deployed the kafka spout polls records starting in the first offset of the --- End diff -- OK, however this statement is correct because if a topology is deployed again it is a new topology because it has a new id and creates new object instances. ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157831266 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -204,33 +226,70 @@ private void initialize(Collection partitions) { /** * Sets the cursor to the location dictated by the first poll strategy and returns the fetch offset. */ -private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { -if (committedOffset != null) { // offset was committed for this TopicPartition -if (firstPollOffsetStrategy.equals(EARLIEST)) { - kafkaConsumer.seekToBeginning(Collections.singleton(tp)); -} else if (firstPollOffsetStrategy.equals(LATEST)) { -kafkaConsumer.seekToEnd(Collections.singleton(tp)); +private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) { +LOG.trace("Seeking offset for topic-partition {} with {} and {}", newTp, firstPollOffsetStrategy, committedOffset); --- End diff -- OK ---
[GitHub] storm issue #2466: STORM-2844: KafkaSpout Throws IllegalStateException After...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2466 @srdo this is the master version. ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157930206 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/OffsetAndMetadataMocks.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.CommitMetadata; +import org.apache.storm.task.TopologyContext; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class OffsetAndMetadataMocks { +public static final String COMMIT_METADATA = "{\"topologyId\":\"tp1\",\"taskId\":3,\"thread\":\"Thread-20\"}"; + +public static OffsetAndMetadata createMocksTree(KafkaConsumer<String, String> consumerMock, --- End diff -- Done ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157930210 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/OffsetAndMetadataMocks.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.CommitMetadata; +import org.apache.storm.task.TopologyContext; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class OffsetAndMetadataMocks { +public static final String COMMIT_METADATA = "{\"topologyId\":\"tp1\",\"taskId\":3,\"thread\":\"Thread-20\"}"; + +public static OffsetAndMetadata createMocksTree(KafkaConsumer<String, String> consumerMock, +TopologyContext topologyContext, +TopicPartition topicPartition) throws java.io.IOException { + +OffsetAndMetadata oam = mock(OffsetAndMetadata.class); +when(consumerMock.committed(topicPartition)) +.thenReturn(oam); + +when(oam.metadata()) +.thenReturn(COMMIT_METADATA); + +when(topologyContext.getStormId()).thenReturn("tp1"); + +ObjectMapper om = mock(ObjectMapper.class); --- End diff -- Done ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157930231 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java --- @@ -71,6 +75,10 @@ 1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute .build(); private KafkaSpout<String, String> spout; +private final TopicPartition topicPartition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); +private KafkaConsumer<String, String> consumerMock; --- End diff -- Done ---
[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2465 @srdo can you please do one last review. Thanks. ---
[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2465 @srdo it should be good now. Can you please take a look. Thanks. ---
[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2465 @srdo I am working on making unit tests pass and then will squash and create master PR. Can you take another look in the meantime. Thanks. ---
[GitHub] storm pull request #2428: STORM-2826: Set key/value deserializer fields when...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2428#discussion_r151886343 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -292,17 +292,21 @@ private Builder(String bootstrapServers, SerializableDeserializer keyDes, Cla this.subscription = subscription; this.translator = new DefaultRecordTranslator<>(); -if (keyDesClazz != null) { - this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz); -} -if (keyDes != null) { - this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDes.getClass()); -} -if (valueDesClazz != null) { - this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDesClazz); +if (!this.kafkaProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { --- End diff -- @srdo can you please clarify what you are trying to do? What happens if this if statement is false? Won't it cause kafkaProps to keep whatever value they have set and the fields keyClassDeserializer something else? What are the implications of that ? The two ifs bellow, on lines 296 and 299, I think they can possibly be both true and with different values, if you are dealing with subtypes. If so, what happens in that case? ---
[GitHub] storm pull request #2426: STORM-2825: Fix ClassCastException when storm-kafk...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2426#discussion_r151887752 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -718,7 +718,13 @@ private static void setAutoCommitMode(Builder builder) { + " This will be treated as an error in the next major release." + " For now the spout will be configured to behave like it would have in pre-1.2.0 releases."); -final boolean enableAutoCommit = (boolean)builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); +Object enableAutoCommitValue = builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); --- End diff -- @srdo I believe that this logic can be simplified to: ``` java final boolean isAutoCommitEnabled = Boolean.parseBoolean(builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).toString()); ``` or if you prefer it more explicit: ``` java final Object autoCommitConf = builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); final boolean isAutoCommitEnabled = Boolean.parseBoolean(autoCommitConf.toString()); ``` ---
[GitHub] storm issue #2427: MINOR: Use booleans instead of strings for 'enable.auto.c...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2427 +1 ---
[GitHub] storm issue #2426: STORM-2825: Fix ClassCastException when storm-kafka-clien...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2426 +1. Thanks @srdo. Can you please squash the commits before merging. Thanks. ---
[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2438 @arunmahadevan @HeartSaVioR @srdo this patch has been merged into master and 1.x-branch ---
[GitHub] storm pull request #2438: STORM-2835: storm-kafka-client KafkaSpout can fail...
Github user hmcl closed the pull request at: https://github.com/apache/storm/pull/2438 ---
[GitHub] storm pull request #2438: STORM-2835: storm-kafka-client KafkaSpout can fail...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2438#discussion_r155007745 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -242,9 +242,7 @@ public void nextTuple() { } } -if (waitingToEmit()) { -emit(); -} +emitIfWaitingNotEmitted(); --- End diff -- Done ---
[GitHub] storm issue #2409: STORM-2796: Flux: Provide means for invoking static facto...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2409 @HeartSaVioR there is not one major project that does not require contributors to merge commits. It takes a few minutes and it makes a world of difference in terms of making the git log easy to understand, and most importantly, easy to cherry-pick. ---
[GitHub] storm issue #2451: STORM-2850: Make ManualPartitionSubscription call rebalan...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2451 +1 ---
[GitHub] storm issue #2428: STORM-2826: Set key/value deserializer fields when using ...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2428 @arunmahadevan I am looking into this now. Thanks. ---
[GitHub] storm issue #2448: Quick fix: correcting markdown format
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2448 @Ethanlm can you please "Quick Fix" with "MINOR: " Thanks. +1 after fixing the commit message. ---
[GitHub] storm issue #2448: MINOR: correcting markdown format
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2448 @Ethanlm yeah, I noticed it right after my comment. Somehow I had not refreshed my PRs view. If it is easy I will change the commit message since I am about to merge something. Otherwise we will just leave it. ---
[GitHub] storm pull request #2428: STORM-2826: Set key/value deserializer fields when...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2428#discussion_r155909342 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -359,7 +356,7 @@ private Builder(Builder builder, SerializableDeserializer keyDes, Class */ @Deprecated public Builder<NK, V> setKey(Class> clazz) { --- End diff -- @srdo do you know why this method returns a new builder object? I can't figure a reason for it to so. I suspect that the only reason for that to happen is because the fields of the builder class are final (e.g keyDesClassClazz), and to make the generics work. There is no benefit in having fields inside the builder class to be final. The code snippet bellow also fixes the generics problem. Any reason not to get rid of the builder (with copy constructor) class completely and make this method like this: ```java public Builder<K,V> setKey(Class> clazz) { this.keyDesClazz = clazz; if (keyDesClazz != null) { this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz); } return this; } ``` We should do something similar to the other 3 methods. In my opinion has become a bit confusing, and I believe this is one of the last few opportunities we have to make it better. Please let me know your thoughts. Thanks. ---
[GitHub] storm issue #2428: STORM-2826: Set key/value deserializer fields when using ...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2428 @srdo aiming to getting this PR merged more quickly I created a [PR](https://github.com/srdo/storm/pull/1) with a suggested fix off your branch. If you agree with the fix, can you please incorporate it, squash the commits, and push it again here. I will then review it right away. Thanks. ---
[GitHub] storm pull request #2454: STORM-2847: Ensure spout can handle being activate...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2454#discussion_r156223919 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -186,12 +182,13 @@ private void initialize(Collection partitions) { for (TopicPartition tp : newPartitions) { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); final long fetchOffset = doSeek(tp, committedOffset); +LOG.debug("Set consumer position to [{}] for topic-partition [{}], based on strategy [{}] and committed offset [{}]", --- End diff -- LOL, my bad :). I was reviewing this on diff and confused it. ---
[GitHub] storm pull request #2454: STORM-2847: Ensure spout can handle being activate...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2454#discussion_r156220249 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -395,7 +387,7 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) { } else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { -if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) { +if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() > kafkaConsumer.position(tp))) { --- End diff -- Is this change to address https://issues.apache.org/jira/browse/STORM-2844 ? ---
[GitHub] storm pull request #2454: STORM-2847: Ensure spout can handle being activate...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2454#discussion_r156208523 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -186,12 +182,13 @@ private void initialize(Collection partitions) { for (TopicPartition tp : newPartitions) { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); final long fetchOffset = doSeek(tp, committedOffset); +LOG.debug("Set consumer position to [{}] for topic-partition [{}], based on strategy [{}] and committed offset [{}]", --- End diff -- Isn't this log message useful? I would suggest that unless this info is elsewhere, we leave this message. If you want it lower priority, we can put it as TRACE level. ---
[GitHub] storm pull request #2454: STORM-2847: Ensure spout can handle being activate...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2454#discussion_r156208684 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -223,29 +220,24 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { @Override public void nextTuple() { try { -if (initialized) { --- End diff -- Why is this flag no longer necessary? ---
[GitHub] storm pull request #2454: STORM-2847: Ensure spout can handle being activate...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2454#discussion_r156224126 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -395,7 +387,7 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) { } else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { -if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) { +if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() > kafkaConsumer.position(tp))) { --- End diff -- OK, I agree. Because initially I thought that this could be a potential fix, but then found out that it wouldn't work. I was wondering if I had missed anything. Thanks. ---
[GitHub] storm issue #2428: STORM-2826: Set key/value deserializer fields when using ...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2428 @srdo although some of these methods have been deprecated for 2.0, customers that are currently in a 1.x.y version will likely use this version for a few years. We will have to maintain this codebase for quite a long time, and therefore I am in favor of making at least the code a bit more readable. I had quite a hard time to understand what the existing code is doing. I have another suggestion, which I also shared with you on a [PR](https://github.com/srdo/storm/pull/1). I will leave it up to you which one to pick and I am +1 after that such that we can move forward. Thanks. ---
[GitHub] storm issue #2393: STORM-2781: Refactor storm-kafka-client KafkaSpout Proces...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2393 @HeartSaVioR for 1.x-branch ---
[GitHub] storm pull request #2394: 1.x branch storm 2787 ks init flag
GitHub user hmcl opened a pull request: https://github.com/apache/storm/pull/2394 1.x branch storm 2787 ks init flag You can merge this pull request into a Git repository by running: $ git pull https://github.com/hmcl/storm-apache 1.x-branch_STORM-2787_KSInitFlag Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2394.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2394 commit ac16fe1ee9d974af64a30769819561c0abae23af Author: Hugo Louro <hmclo...@gmail.com> Date: 2017-10-23T00:44:54Z STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE - Refactor method name from setForceEnableTupleTracking to setTupleTrackingEnforced - Throw IllegalStateException instead of IllegalArgumentException if spout attempts to emit an already committed message - Update documentation to reflect these changes commit 7d4ac07684a7405d6539a3bd0cb7da985736bac7 Author: Hugo Louro <hmclo...@gmail.com> Date: 2017-10-25T06:52:54Z STORM-2787: storm-kafka-client KafkaSpout method onPartitionsRevoked(...) should set initialized flag independently of processing guarantees ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147012506 --- Diff: docs/storm-kafka-client.md --- @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been * max.uncommitted.offsets = 1000 -# Messaging reliability modes +# Processing Guarantees -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed. +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when +the commit happens. When the guarantee is NONE Kafka controls when the commit happens. + +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once) + and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance + the ack gets lost. + +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted + to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it + won't retry tuples that fail or timeout after the commit to Kafka has been done. + +* NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties + "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens + it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times. + This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown. + +To set the processing guarantee use the `KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows: -To set the processing guarantee, use the KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g. ```java KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .builder(String bootstrapServers, String ... topics) .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE) ``` -The spout will disable tuple tracking for emitted tuples by default when you use at-most-once or any-times. In some cases you may want to enable tracking anyway, because tuple tracking is necessary for some features of Storm, e.g. showing complete latency in Storm UI, or enabling backpressure through the `Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter. +# Tuple Tracking + +By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE. It may be necessary to track +emitted tuples with other processing guarantees to benefit of Storm features such as showing complete latency in the UI, +or enabling backpressure with Config.TOPOLOGY_MAX_SPOUT_PENDING. -If you need to enable tracking, use the KafkaSpoutConfig.Builder.setForceEnableTupleTracking method, e.g. ```java KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .builder(String bootstrapServers, String ... topics) .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE) - .setForceEnableTupleTracking(true) + .setTupleTrackingEnforced(true) ``` -Note that this setting has no effect in at-least-once mode, where tuple tracking is always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantees where tuple tracking is required and therefore always enabled. --- End diff -- Done ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147012458 --- Diff: docs/storm-kafka-client.md --- @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been * max.uncommitted.offsets = 1000 -# Messaging reliability modes +# Processing Guarantees -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed. +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when +the commit happens. When the guarantee is NONE Kafka controls when the commit happens. + +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once) + and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance + the ack gets lost. + +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted --- End diff -- Done ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147012467 --- Diff: docs/storm-kafka-client.md --- @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been * max.uncommitted.offsets = 1000 -# Messaging reliability modes +# Processing Guarantees -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed. +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when +the commit happens. When the guarantee is NONE Kafka controls when the commit happens. + +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once) + and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance + the ack gets lost. + +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted + to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it --- End diff -- Done ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147012486 --- Diff: docs/storm-kafka-client.md --- @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been * max.uncommitted.offsets = 1000 -# Messaging reliability modes +# Processing Guarantees -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed. +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when +the commit happens. When the guarantee is NONE Kafka controls when the commit happens. + +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once) + and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance + the ack gets lost. + +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted + to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it + won't retry tuples that fail or timeout after the commit to Kafka has been done. --- End diff -- Done ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r146997432 --- Diff: docs/storm-kafka-client.md --- @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been * max.uncommitted.offsets = 1000 -# Messaging reliability modes +# Processing Guarantees -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed. +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked --- End diff -- Maybe we can come up with a bit better wording for this, but I really don't think that we should say that an offset is marked as processed. Offsets are committed, not processed. ConsumerRecords, wrapped by Tuples are processed. ---
[GitHub] storm issue #2385: YSTORM-2727: Generic Resource Aware Scheduling
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2385 @govind-menon why is YSTORM-2725 prefixed by Y? Should it be STORM-2725? If so, can you please fix the typo. Thanks. ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r146996683 --- Diff: docs/storm-kafka-client.md --- @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been * max.uncommitted.offsets = 1000 -# Messaging reliability modes +# Processing Guarantees -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed. +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked --- End diff -- Well, this is tricky because Storm does not process offsets, storm processes tuples. More exactly, it processes tuples that contain ConsumerRecord's. The offset is just part of the ConsumerRecord, which also contains key, val, etc... We commit the offset, but by committing the offset we are technically marking that the tuple was processed because even if the tuple fails, it won't be retried (processed again). ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r146996955 --- Diff: docs/storm-kafka-client.md --- @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been * max.uncommitted.offsets = 1000 -# Messaging reliability modes +# Processing Guarantees -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed. +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when --- End diff -- Done ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r146997051 --- Diff: docs/storm-kafka-client.md --- @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been * max.uncommitted.offsets = 1000 -# Messaging reliability modes +# Processing Guarantees -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed. +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when +the commit happens. When the guarantee is NONE Kafka controls when the commit happens. + +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once) + and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance --- End diff -- Done ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147012706 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -78,17 +78,17 @@ private transient KafkaSpoutRetryService retryService; // Handles tuple events (emit, ack etc.) private transient KafkaTupleListener tupleListener; -// timer == null for modes other than at-least-once +// timer == null if processing guarantee is other than at-least-once --- End diff -- Done ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147013419 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -78,17 +78,17 @@ private transient KafkaSpoutRetryService retryService; // Handles tuple events (emit, ack etc.) private transient KafkaTupleListener tupleListener; -// timer == null for modes other than at-least-once +// timer == null if processing guarantee is other than at-least-once private transient Timer commitTimer; // Flag indicating that the spout is still undergoing initialization process. private transient boolean initialized; // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires, -//or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once mode. +// or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once processing guarantee. private transient Map<TopicPartition, OffsetManager> offsetManagers; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. -// Always empty if not using at-least-once mode. +// Always empty if processing guarantee is other than at-least-once. --- End diff -- Done. Removed the double negation. ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147013385 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -78,17 +78,17 @@ private transient KafkaSpoutRetryService retryService; // Handles tuple events (emit, ack etc.) private transient KafkaTupleListener tupleListener; -// timer == null for modes other than at-least-once +// timer == null if processing guarantee is other than at-least-once private transient Timer commitTimer; // Flag indicating that the spout is still undergoing initialization process. private transient boolean initialized; // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires, -//or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once mode. +// or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once processing guarantee. --- End diff -- Done. Removed the double negation. ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147013717 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -255,26 +255,25 @@ private void throwKafkaConsumerInterruptedException() { } private boolean commit() { -return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode +return isAtLeastOnceProcessing() && commitTimer.isExpiredResetOnTrue();// timer != null for non auto commit mode } private boolean poll() { final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); final int readyMessageCount = retryService.readyMessageCount(); final boolean poll = !waitingToEmit() -//Check that the number of uncommitted, nonretriable tuples is less than the maxUncommittedOffsets limit -//Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, -//and prevents locking up the spout when there are too many retriable tuples -&& (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets -|| !isAtLeastOnce()); +// Check that the number of uncommitted, non-retriable tuples is less than the maxUncommittedOffsets limit. --- End diff -- Done. ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147013620 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -125,8 +125,8 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC tupleListener = kafkaSpoutConfig.getTupleListener(); -if (isAtLeastOnce()) { -// Only used if the spout commits offsets for acked tuples +if (isAtLeastOnceProcessing()) { +// Only used if the spout should commit to Kafka an offset only after its tuple has been acked. --- End diff -- Done. ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147014032 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -336,22 +335,25 @@ private void emit() { private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) { final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, record.offset()); + if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); -} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail +} else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { -Validate.isTrue(kafkaConsumer.committed(tp) == null || kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp), -"The spout is about to emit a message that has already been committed." -+ " This should never occur, and indicates a bug in the spout"); +if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) { --- End diff -- Validate throws an IllegalArgumentException where the correct exception here is IllegalStateException. Furthermore, Validate in my opinion as a confusing API - Validate(true) throws an exception if false. It is misleading to me. I would rather leave it like this. ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147015173 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() { // Ack === @Override public void ack(Object messageId) { -if (!isAtLeastOnce()) { -// Only need to keep track of acked tuples if commits are done based on acks -return; -} - +// Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; -if (!emitted.contains(msgId)) { -if (msgId.isEmitted()) { -LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " -+ "came from a topic-partition that this consumer group instance is no longer tracking " -+ "due to rebalance/partition reassignment. No action taken.", msgId); +if (isAtLeastOnceProcessing()) { --- End diff -- I disagree. Any method that has only one if condition and nothing else, should be ```java if (condition == true) do_action; ``` imho it is counter natural to have code like ```java if (condition = false) do_nothing; else do_action; ``` which is what basically the early return is doing. There are also lengthier reasons related to the semantics of OOP, but I just think that in general wherever possible one should have code like if (condtion==true) do action. ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147015407 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() { // Ack === @Override public void ack(Object messageId) { -if (!isAtLeastOnce()) { -// Only need to keep track of acked tuples if commits are done based on acks -return; -} - +// Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; -if (!emitted.contains(msgId)) { -if (msgId.isEmitted()) { -LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " -+ "came from a topic-partition that this consumer group instance is no longer tracking " -+ "due to rebalance/partition reassignment. No action taken.", msgId); +if (isAtLeastOnceProcessing()) { +if (!emitted.contains(msgId)) { +if (msgId.isEmitted()) { +LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " ++ "came from a topic-partition that this consumer group instance is no longer tracking " ++ "due to rebalance/partition reassignment. No action taken.", msgId); +} else { +LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); +} } else { -LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); +Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." ++ " This should never occur barring errors in the RetryService implementation or the spout code."); + offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); +emitted.remove(msgId); } -} else { -Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." -+ " This should never occur barring errors in the RetryService implementation or the spout code."); - offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); -emitted.remove(msgId); +tupleListener.onAck(msgId); } -tupleListener.onAck(msgId); } // Fail === @Override public void fail(Object messageId) { -if (!isAtLeastOnce()) { -// Only need to keep track of failed tuples if commits are done based on acks -return; -} +// Only need to keep track of failed tuples if commits to Kafka are done after a tuple ack is received +if (isAtLeastOnceProcessing()) { --- End diff -- Same comment above ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147015438 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() { // Ack === @Override public void ack(Object messageId) { -if (!isAtLeastOnce()) { -// Only need to keep track of acked tuples if commits are done based on acks -return; -} - +// Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received --- End diff -- Done. ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147016997 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() { // Ack === @Override public void ack(Object messageId) { -if (!isAtLeastOnce()) { -// Only need to keep track of acked tuples if commits are done based on acks -return; -} - +// Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; -if (!emitted.contains(msgId)) { -if (msgId.isEmitted()) { -LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " -+ "came from a topic-partition that this consumer group instance is no longer tracking " -+ "due to rebalance/partition reassignment. No action taken.", msgId); +if (isAtLeastOnceProcessing()) { +if (!emitted.contains(msgId)) { +if (msgId.isEmitted()) { +LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " ++ "came from a topic-partition that this consumer group instance is no longer tracking " ++ "due to rebalance/partition reassignment. No action taken.", msgId); +} else { +LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); +} } else { -LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); +Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." ++ " This should never occur barring errors in the RetryService implementation or the spout code."); + offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); +emitted.remove(msgId); } -} else { -Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." -+ " This should never occur barring errors in the RetryService implementation or the spout code."); - offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); -emitted.remove(msgId); +tupleListener.onAck(msgId); } -tupleListener.onAck(msgId); } // Fail === @Override public void fail(Object messageId) { -if (!isAtLeastOnce()) { -// Only need to keep track of failed tuples if commits are done based on acks -return; -} +// Only need to keep track of failed tuples if commits to Kafka are done after a tuple ack is received --- End diff -- Done. ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147022902 --- Diff: docs/storm-kafka-client.md --- @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been * max.uncommitted.offsets = 1000 -# Messaging reliability modes +# Processing Guarantees -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed. +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when +the commit happens. When the guarantee is NONE Kafka controls when the commit happens. + +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once) + and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance + the ack gets lost. + +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted + to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it + won't retry tuples that fail or timeout after the commit to Kafka has been done. + +* NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties + "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens + it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times. + This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown. + +To set the processing guarantee use the `KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows: -To set the processing guarantee, use the KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g. ```java KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .builder(String bootstrapServers, String ... topics) .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE) ``` -The spout will disable tuple tracking for emitted tuples by default when you use at-most-once or any-times. In some cases you may want to enable tracking anyway, because tuple tracking is necessary for some features of Storm, e.g. showing complete latency in Storm UI, or enabling backpressure through the `Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter. +# Tuple Tracking + +By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE. It may be necessary to track +emitted tuples with other processing guarantees to benefit of Storm features such as showing complete latency in the UI, --- End diff -- Done ---
[GitHub] storm pull request #2387: STORM-2787: storm-kafka-client KafkaSpout method o...
GitHub user hmcl opened a pull request: https://github.com/apache/storm/pull/2387 STORM-2787: storm-kafka-client KafkaSpout method onPartitionsRevoked(...) should set initialized flag independently of processing guarantees You can merge this pull request into a Git repository by running: $ git pull https://github.com/hmcl/storm-apache Apache_master_STORM-2787_KSInitFlag Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2387.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2387 commit 95d10fc60669fdf6b28f28fd9a5536a905a9a26e Author: Hugo Louro <hmclo...@gmail.com> Date: 2017-10-23T00:44:54Z STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE - Refactor method name from setForceEnableTupleTracking to setTupleTrackingEnforced - Throw IllegalStateException instead of IllegalArgumentException if spout attempts to emit an already committed message - Update documentation to reflect these changes commit 12473ed944a0f1499a2ba61987a223bd861e9768 Author: Hugo Louro <hmclo...@gmail.com> Date: 2017-10-25T06:52:54Z STORM-2787: storm-kafka-client KafkaSpout method onPartitionsRevoked(...) should set initialized flag independently of processing guarantees ---
[GitHub] storm pull request #2393: STORM-2781: Refactor storm-kafka-client KafkaSpout...
GitHub user hmcl opened a pull request: https://github.com/apache/storm/pull/2393 STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE - Refactor method name from setForceEnableTupleTracking to setTupleTrackingEnforced - Throw IllegalStateException instead of IllegalArgumentException if spout attempts to emit an already committed message - Update documentation to reflect these changes You can merge this pull request into a Git repository by running: $ git pull https://github.com/hmcl/storm-apache 1.x-branch_STORM-2781_KSProcGtees Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2393.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2393 commit ac16fe1ee9d974af64a30769819561c0abae23af Author: Hugo Louro <hmclo...@gmail.com> Date: 2017-10-23T00:44:54Z STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE - Refactor method name from setForceEnableTupleTracking to setTupleTrackingEnforced - Throw IllegalStateException instead of IllegalArgumentException if spout attempts to emit an already committed message - Update documentation to reflect these changes ---
[GitHub] storm issue #2394: 1.x branch storm 2787 ks init flag
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2394 @HeartSaVioR for 1.x-branch ---
[GitHub] storm pull request #2464: STORM-2847 1.x
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2464#discussion_r158205889 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java --- @@ -0,0 +1,145 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.KafkaUnitRule; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaSpoutReactivationTest { + +@Rule +public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); + +@Captor +private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; + +private final TopologyContext topologyContext = mock(TopologyContext.class); +private final Map<String, Object> conf = new HashMap<>(); +private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); +private final long commitOffsetPeriodMs = 2_000; +private KafkaConsumer<String, String> consumerSpy; +private KafkaConsumer<String, String> postReactivationConsumerSpy; +private KafkaSpout<String, String> spout; +private final int maxPollRecords = 10; + +@Before +public void setUp() { +KafkaSpoutConfig<String, String> spoutConfig = +SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig( +KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), +SingleTopicKafkaSpoutConfiguration.TOPIC)) +.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST) +.setOffsetCommitPeriodMs(commitOffsetPeriodMs) +.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) +.build(); +KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactoryDefault<>(); +this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig)); +this.postReactivationConsumerSpy = spy(consumerFactory.createConsumer(spoutConfig)); +KafkaConsumerFactory<String, String> consumerFactoryMock = mock(KafkaConsumerFactory.class); --- End diff -- why do you need consumerFactory and consumerFactoryMock? ---
[GitHub] storm issue #2464: STORM-2847 1.x
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2464 Can you please squash the commits. You can merge after. ---
[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2637#discussion_r186253569 --- Diff: docs/storm-kafka-client.md --- @@ -313,4 +313,39 @@ KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .setTupleTrackingEnforced(true) ``` -Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. + +# Translation from `storm-kafka` to `storm-kafka-client` spout properties + +This may not be an exhaustive list because the `storm-kafka` configs were taken from Storm 0.9.6 +[SpoutConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/SpoutConfig.html) and +[KafkaConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/KafkaConfig.html). +`storm-kafka-client` spout configurations were taken from Storm 1.0.6 +[KafkaSpoutConfig](https://storm.apache.org/releases/1.0.6/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.html) +and Kafka 0.10.1.0 [ConsumerConfig](https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html). + +| SpoutConfig | KafkaSpoutConfig/ConsumerConfig Name | KafkaSpoutConfig Usage | --- End diff -- I suggest removing Name as ConsumerConfig is very explanatory ---
[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2637#discussion_r186253597 --- Diff: docs/storm-kafka-client.md --- @@ -313,4 +313,39 @@ KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .setTupleTrackingEnforced(true) ``` -Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. + +# Translation from `storm-kafka` to `storm-kafka-client` spout properties + +This may not be an exhaustive list because the `storm-kafka` configs were taken from Storm 0.9.6 +[SpoutConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/SpoutConfig.html) and +[KafkaConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/KafkaConfig.html). +`storm-kafka-client` spout configurations were taken from Storm 1.0.6 +[KafkaSpoutConfig](https://storm.apache.org/releases/1.0.6/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.html) +and Kafka 0.10.1.0 [ConsumerConfig](https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html). + +| SpoutConfig | KafkaSpoutConfig/ConsumerConfig Name | KafkaSpoutConfig Usage | +| - | | --- | +| **Setting:** `startOffsetTime` **Default:** `EarliestTime` **Setting:** `forceFromStart` **Default:** `false` `startOffsetTime` & `forceFromStart` together determine the starting offset. `forceFromStart` determines whether the Zookeeper offset is ignored. `startOffsetTime` sets the timestamp that determines the beginning offset, in case there is no offset in Zookeeper, or the Zookeeper offset is ignored | **Setting:** [`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html) **Default:** `UNCOMMITTED_EARLIEST` [Refer to the helper table](#helper-table-for-setting-firstpolloffsetstrategy) for picking `FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` settings | [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.htm l#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)| +| **Setting:** `scheme` The interface that specifies how a `ByteBuffer` from a Kafka topic is transformed into Storm tuple **Default:** `RawMultiScheme` | **Setting:** [`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)| [`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-) [`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `fetchSizeBytes` Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server **Default:** `1MB` | **Setting:** [`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG) **Default:** `1MB`| [`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for network requests. The buffer size which consumer has for pulling data from producer **Default:** `1MB`| **Setting:** [`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG) The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used | [`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `socketTimeoutMs` **Default:** `1` | **N/A** || +| **Setting:** `useStartOffsetTimeIfOffsetOutOfRange` **Default:** `true` | **Setting:** [`auto.offset.reset`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG) **Possible values:** `"latest"`, `"earliest"`, `"none"` **Default:** `latest`. Exception: `earliest` if [`ProcessingGuarantee`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.ht
[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2637#discussion_r186253552 --- Diff: docs/storm-kafka-client.md --- @@ -313,4 +313,39 @@ KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .setTupleTrackingEnforced(true) ``` -Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. + +# Translation from `storm-kafka` to `storm-kafka-client` spout properties --- End diff -- NIT: Rename translation to Mapping. This is not really a translation. ---
[GitHub] storm issue #2637: Map of Spout configurations from `storm-kafka` to `storm-...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2637 @srishtyagrawal Thank you for the code review. It is much better now. Besides my two comments above, I still wonder if it would be better to point the links with the description of the Kafka properties to the top of the [New Consumer Configs](http://kafka.apache.org/10/documentation.html#newconsumerconfigs) table. The Javadocs have no description of the property and are basically the name of the property written using capital letters and underscores. This won't be very helpful to the user. It's better to be pointed to the table. The user will know then that he has to search for the name of the property there. ---
[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2637#discussion_r186253611 --- Diff: docs/storm-kafka-client.md --- @@ -313,4 +313,39 @@ KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .setTupleTrackingEnforced(true) ``` -Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. + +# Translation from `storm-kafka` to `storm-kafka-client` spout properties + +This may not be an exhaustive list because the `storm-kafka` configs were taken from Storm 0.9.6 +[SpoutConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/SpoutConfig.html) and +[KafkaConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/KafkaConfig.html). +`storm-kafka-client` spout configurations were taken from Storm 1.0.6 +[KafkaSpoutConfig](https://storm.apache.org/releases/1.0.6/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.html) +and Kafka 0.10.1.0 [ConsumerConfig](https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html). + +| SpoutConfig | KafkaSpoutConfig/ConsumerConfig Name | KafkaSpoutConfig Usage | +| - | | --- | +| **Setting:** `startOffsetTime` **Default:** `EarliestTime` **Setting:** `forceFromStart` **Default:** `false` `startOffsetTime` & `forceFromStart` together determine the starting offset. `forceFromStart` determines whether the Zookeeper offset is ignored. `startOffsetTime` sets the timestamp that determines the beginning offset, in case there is no offset in Zookeeper, or the Zookeeper offset is ignored | **Setting:** [`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html) **Default:** `UNCOMMITTED_EARLIEST` [Refer to the helper table](#helper-table-for-setting-firstpolloffsetstrategy) for picking `FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` settings | [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.htm l#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)| +| **Setting:** `scheme` The interface that specifies how a `ByteBuffer` from a Kafka topic is transformed into Storm tuple **Default:** `RawMultiScheme` | **Setting:** [`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)| [`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-) [`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `fetchSizeBytes` Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server **Default:** `1MB` | **Setting:** [`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG) **Default:** `1MB`| [`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for network requests. The buffer size which consumer has for pulling data from producer **Default:** `1MB`| **Setting:** [`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG) The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used | [`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `socketTimeoutMs` **Default:** `1` | **N/A** || +| **Setting:** `useStartOffsetTimeIfOffsetOutOfRange` **Default:** `true` | **Setting:** [`auto.offset.reset`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG) **Possible values:** `"latest"`, `"earliest"`, `"none"` **Default:** `latest`. Exception: `earliest` if [`ProcessingGuarantee`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.ht
[GitHub] storm pull request #2667: STORM-3063: Fix minor pom issues
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2667#discussion_r186887493 --- Diff: pom.xml --- @@ -1275,6 +1270,25 @@ true + +org.apache.maven.plugins +maven-enforcer-plugin + + +enforce-maven --- End diff -- since there is only one execution, is 'id' really necessary ? ---
[GitHub] storm issue #2667: STORM-3063: Fix minor pom issues
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2667 +1 ---
[GitHub] storm pull request #2667: STORM-3063: Fix minor pom issues
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2667#discussion_r187100160 --- Diff: pom.xml --- @@ -1275,6 +1270,25 @@ true + +org.apache.maven.plugins +maven-enforcer-plugin + + +enforce-maven --- End diff -- Agree. Would 'enforce-maven-version' be more descriptive? ---
[GitHub] storm issue #2637: STORM-3060: Map of Spout configurations from storm-kafka ...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2637 +1 @srishtyagrawal thank you for your nice and helpful contribution. It will benefit a lot of users. ---
[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2637#discussion_r185160807 --- Diff: docs/storm-kafka-client.md --- @@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .setTupleTrackingEnforced(true) ``` -Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. + +# Migrating a `storm-kafka` spout to use `storm-kafka-client` + +This may not be an exhaustive list because the `storm-kafka` configs were taken from Storm 0.9.6 +[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java) and +[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java). +`storm-kafka-client` spout configurations were taken from Storm 1.0.6 +[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java). + +| Storm-0.9.6 SpoutConfig | Storm-1.0.6 KafkaSpoutConfig name | KafkaSpoutConfig usage help | +| - | | --- | +| **Setting:** `startOffsetTime` **Default:** `EarliestTime` **Setting:** `forceFromStart` **Default:** `false` `startOffsetTime` & `forceFromStart` together determine the starting offset. `forceFromStart` determines whether the Zookeeper offset is ignored. `startOffsetTime` sets the timestamp that determines the beginning offset, in case there is no offset in Zookeeper, or the Zookeeper offset is ignored | **Setting:** [`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html) **Default:** `UNCOMMITTED_EARLIEST` [Refer to the helper table](#helper-table-for-setting-firstpolloffsetstrategy) for picking `FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` settings | **Import package:** `org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.` **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)| +| **Setting:** `scheme` The interface that specifies how a `ByteBuffer` from a Kafka topic is transformed into Storm tuple **Default:** `RawMultiScheme` | [`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)| **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-) [`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `fetchSizeBytes` Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server **Default:** `1MB` | **Kafka config:** [`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG) **Default:** `1MB`| **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for network requests. The buffer size which consumer has for pulling data from producer **Default:** `1MB`| **Kafka config:** [`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG) The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used | **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `socketTimeoutMs` **Default:** `1` | Discontinued in `storm-kafka-client` || --- End diff -- Instead of discontinued I would put **N/A** ---
[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2637#discussion_r185168182 --- Diff: docs/storm-kafka-client.md --- @@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .setTupleTrackingEnforced(true) ``` -Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. + +# Migrating a `storm-kafka` spout to use `storm-kafka-client` + +This may not be an exhaustive list because the `storm-kafka` configs were taken from Storm 0.9.6 +[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java) and +[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java). +`storm-kafka-client` spout configurations were taken from Storm 1.0.6 +[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java). + +| Storm-0.9.6 SpoutConfig | Storm-1.0.6 KafkaSpoutConfig name | KafkaSpoutConfig usage help | +| - | | --- | +| **Setting:** `startOffsetTime` **Default:** `EarliestTime` **Setting:** `forceFromStart` **Default:** `false` `startOffsetTime` & `forceFromStart` together determine the starting offset. `forceFromStart` determines whether the Zookeeper offset is ignored. `startOffsetTime` sets the timestamp that determines the beginning offset, in case there is no offset in Zookeeper, or the Zookeeper offset is ignored | **Setting:** [`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html) **Default:** `UNCOMMITTED_EARLIEST` [Refer to the helper table](#helper-table-for-setting-firstpolloffsetstrategy) for picking `FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` settings | **Import package:** `org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.` **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)| +| **Setting:** `scheme` The interface that specifies how a `ByteBuffer` from a Kafka topic is transformed into Storm tuple **Default:** `RawMultiScheme` | [`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)| **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-) [`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `fetchSizeBytes` Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server **Default:** `1MB` | **Kafka config:** [`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG) **Default:** `1MB`| **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| --- End diff -- the (**max?**) number of bytes to attempt ... ---
[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2637#discussion_r185163739 --- Diff: docs/storm-kafka-client.md --- @@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .setTupleTrackingEnforced(true) ``` -Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. + +# Migrating a `storm-kafka` spout to use `storm-kafka-client` --- End diff -- Do you really want to mean migration, or rather some sort of parallel between the name and meaning of the properties in storm-kafka vs storm-kafka-client. This may imply that there is a way to migrate, whereas I don't really have a migration, but rather a way to specify the same behavior in the old and new spout. ---
[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2637#discussion_r185168807 --- Diff: docs/storm-kafka-client.md --- @@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .setTupleTrackingEnforced(true) ``` -Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. + +# Migrating a `storm-kafka` spout to use `storm-kafka-client` + +This may not be an exhaustive list because the `storm-kafka` configs were taken from Storm 0.9.6 +[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java) and +[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java). +`storm-kafka-client` spout configurations were taken from Storm 1.0.6 +[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java). + +| Storm-0.9.6 SpoutConfig | Storm-1.0.6 KafkaSpoutConfig name | KafkaSpoutConfig usage help | +| - | | --- | +| **Setting:** `startOffsetTime` **Default:** `EarliestTime` **Setting:** `forceFromStart` **Default:** `false` `startOffsetTime` & `forceFromStart` together determine the starting offset. `forceFromStart` determines whether the Zookeeper offset is ignored. `startOffsetTime` sets the timestamp that determines the beginning offset, in case there is no offset in Zookeeper, or the Zookeeper offset is ignored | **Setting:** [`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html) **Default:** `UNCOMMITTED_EARLIEST` [Refer to the helper table](#helper-table-for-setting-firstpolloffsetstrategy) for picking `FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` settings | **Import package:** `org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.` **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)| +| **Setting:** `scheme` The interface that specifies how a `ByteBuffer` from a Kafka topic is transformed into Storm tuple **Default:** `RawMultiScheme` | [`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)| **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-) [`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `fetchSizeBytes` Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server **Default:** `1MB` | **Kafka config:** [`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG) **Default:** `1MB`| **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for network requests. The buffer size which consumer has for pulling data from producer **Default:** `1MB`| **Kafka config:** [`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG) The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used | **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `socketTimeoutMs` **Default:** `1` | Discontinued in `storm-kafka-client` || +| **Setting:** `useStartOffsetTimeIfOffsetOutOfRange` **Default:** `true` | **Kafka config:** [`auto.offset.reset`](https://kafka.apache.org/11/javadoc/org/apache
[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2637#discussion_r185161496 --- Diff: docs/storm-kafka-client.md --- @@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .setTupleTrackingEnforced(true) ``` -Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. + +# Migrating a `storm-kafka` spout to use `storm-kafka-client` + +This may not be an exhaustive list because the `storm-kafka` configs were taken from Storm 0.9.6 +[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java) and +[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java). +`storm-kafka-client` spout configurations were taken from Storm 1.0.6 +[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java). + +| Storm-0.9.6 SpoutConfig | Storm-1.0.6 KafkaSpoutConfig name | KafkaSpoutConfig usage help | +| - | | --- | +| **Setting:** `startOffsetTime` **Default:** `EarliestTime` **Setting:** `forceFromStart` **Default:** `false` `startOffsetTime` & `forceFromStart` together determine the starting offset. `forceFromStart` determines whether the Zookeeper offset is ignored. `startOffsetTime` sets the timestamp that determines the beginning offset, in case there is no offset in Zookeeper, or the Zookeeper offset is ignored | **Setting:** [`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html) **Default:** `UNCOMMITTED_EARLIEST` [Refer to the helper table](#helper-table-for-setting-firstpolloffsetstrategy) for picking `FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` settings | **Import package:** `org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.` **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)| --- End diff -- Are the "import package:" entries throughout necessary ? The ConsumerConfig strings come all from the same Kafka package, and the KafkaSpoutConfig configurations already need to have the package imported when the set* method is declared in the code. It seems most "Usage:" web links are broken. Wouldn't it be better to simply paste the method signature and put the link for that same signature? ---
[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2637#discussion_r185168952 --- Diff: docs/storm-kafka-client.md --- @@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .setTupleTrackingEnforced(true) ``` -Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. + +# Migrating a `storm-kafka` spout to use `storm-kafka-client` + +This may not be an exhaustive list because the `storm-kafka` configs were taken from Storm 0.9.6 +[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java) and +[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java). +`storm-kafka-client` spout configurations were taken from Storm 1.0.6 +[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java). + +| Storm-0.9.6 SpoutConfig | Storm-1.0.6 KafkaSpoutConfig name | KafkaSpoutConfig usage help | +| - | | --- | +| **Setting:** `startOffsetTime` **Default:** `EarliestTime` **Setting:** `forceFromStart` **Default:** `false` `startOffsetTime` & `forceFromStart` together determine the starting offset. `forceFromStart` determines whether the Zookeeper offset is ignored. `startOffsetTime` sets the timestamp that determines the beginning offset, in case there is no offset in Zookeeper, or the Zookeeper offset is ignored | **Setting:** [`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html) **Default:** `UNCOMMITTED_EARLIEST` [Refer to the helper table](#helper-table-for-setting-firstpolloffsetstrategy) for picking `FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` settings | **Import package:** `org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.` **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)| +| **Setting:** `scheme` The interface that specifies how a `ByteBuffer` from a Kafka topic is transformed into Storm tuple **Default:** `RawMultiScheme` | [`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)| **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-) [`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `fetchSizeBytes` Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server **Default:** `1MB` | **Kafka config:** [`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG) **Default:** `1MB`| **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| --- End diff -- I suggest omitting "Kafka config:". The link is self explanatory ---
[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2637#discussion_r185168669 --- Diff: docs/storm-kafka-client.md --- @@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .setTupleTrackingEnforced(true) ``` -Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. + +# Migrating a `storm-kafka` spout to use `storm-kafka-client` + +This may not be an exhaustive list because the `storm-kafka` configs were taken from Storm 0.9.6 +[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java) and +[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java). +`storm-kafka-client` spout configurations were taken from Storm 1.0.6 +[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java). + +| Storm-0.9.6 SpoutConfig | Storm-1.0.6 KafkaSpoutConfig name | KafkaSpoutConfig usage help | +| - | | --- | +| **Setting:** `startOffsetTime` **Default:** `EarliestTime` **Setting:** `forceFromStart` **Default:** `false` `startOffsetTime` & `forceFromStart` together determine the starting offset. `forceFromStart` determines whether the Zookeeper offset is ignored. `startOffsetTime` sets the timestamp that determines the beginning offset, in case there is no offset in Zookeeper, or the Zookeeper offset is ignored | **Setting:** [`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html) **Default:** `UNCOMMITTED_EARLIEST` [Refer to the helper table](#helper-table-for-setting-firstpolloffsetstrategy) for picking `FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` settings | **Import package:** `org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.` **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)| +| **Setting:** `scheme` The interface that specifies how a `ByteBuffer` from a Kafka topic is transformed into Storm tuple **Default:** `RawMultiScheme` | [`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)| **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-) [`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `fetchSizeBytes` Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server **Default:** `1MB` | **Kafka config:** [`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG) **Default:** `1MB`| **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for network requests. The buffer size which consumer has for pulling data from producer **Default:** `1MB`| **Kafka config:** [`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG) The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used | **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` **Usage:** [`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, )`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)| +| **Setting:** `socketTimeoutMs` **Default:** `1` | Discontinued in `storm-kafka-client` || +| **Setting:** `useStartOffsetTimeIfOffsetOutOfRange` **Default:** `true` | **Kafka config:** [`auto.offset.reset`](https://kafka.apache.org/11/javadoc/org/apache
[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2637#discussion_r185167619 --- Diff: docs/storm-kafka-client.md --- @@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .setTupleTrackingEnforced(true) ``` -Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. + +# Migrating a `storm-kafka` spout to use `storm-kafka-client` + +This may not be an exhaustive list because the `storm-kafka` configs were taken from Storm 0.9.6 +[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java) and +[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java). +`storm-kafka-client` spout configurations were taken from Storm 1.0.6 +[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java). + +| Storm-0.9.6 SpoutConfig | Storm-1.0.6 KafkaSpoutConfig name | KafkaSpoutConfig usage help | --- End diff -- The storm version is already specified above and could be omitted in the table. IF possible I would suggest to present the table such as: ``` | SpoutConfig | KafkaSpoutConfig | --- | prop | desc | default | prop | desc | default | ``` ---
[GitHub] storm issue #2637: Map of Spout configurations from `storm-kafka` to `storm-...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2637 @erikdw I am reviewing this now. Sorry but I was away the last few days. ---
[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...
GitHub user hmcl opened a pull request: https://github.com/apache/storm/pull/2380 STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE - Refactor method name from setForceEnableTupleTracking to setTupleTrackingEnforced - Throw IllegalStateException instead of IllegalArgumentException if spout attempts to emit an already committed message - Update documentation to reflect these changes You can merge this pull request into a Git repository by running: $ git pull https://github.com/hmcl/storm-apache Apache_master_STORM-2781_KSProcGtees Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2380.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2380 commit 6a5c3c4b043a8ddd1224fb14f55a512b810d05b7 Author: Hugo Louro <hmclo...@gmail.com> Date: 2017-10-23T00:44:54Z STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE - Refactor method name from setForceEnableTupleTracking to setTupleTrackingEnforced - Throw IllegalStateException instead of IllegalArgumentException if spout attempts to emit an already committed message - Update documentation to reflect these changes ---
[GitHub] storm pull request #2381: STORM-2784: storm-kafka-client KafkaTupleListener ...
GitHub user hmcl opened a pull request: https://github.com/apache/storm/pull/2381 STORM-2784: storm-kafka-client KafkaTupleListener method onPartitions⦠â¦Reassigned() should be called after initialization is complete You can merge this pull request into a Git repository by running: $ git pull https://github.com/hmcl/storm-apache Apache_master_STORM-2784_KTLOPR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2381.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2381 commit ae827dac08fa67997c5eb8e6c23828f190de2851 Author: Hugo Louro <hmclo...@gmail.com> Date: 2017-10-23T04:35:29Z STORM-2784: storm-kafka-client KafkaTupleListener method onPartitionsReassigned() should be called after initialization is complete ---
[GitHub] storm issue #2387: STORM-2787: storm-kafka-client KafkaSpout method onPartit...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2387 @srdo @HeartSaVioR I have incorporated the code review changes of the depending patch. It should be good to merge. Thanks. ---
[GitHub] storm issue #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout Proces...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2380 I have squashed the commits and addressed the early return issue. ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r159120269 --- Diff: external/storm-solr/README.md --- @@ -97,6 +97,29 @@ field separates each value with the token % instead of the default | . To use th .setMultiValueFieldToken("%").build(); ``` +##Working with Kerberized Solr +If your topology is going to interact with kerberized Solr, your bolts/states need to be authenticated by Solr Server. We can enable +authentication by distributing keytabs for solr user on all worker hosts. We can configure the solr bolt to use keytabs by setting +SolrConfig.enableKerberos config property. + +On worker hosts the bolt/trident-state code will use the keytab file with principal provided in the jaas config to authenticate with +Solr. You need to specify a Kerberos principal for the client and a corresponding keytab in the JAAS client configuration file. +Also make sure the provided principal is configured with required permissions to access solr collections. + +Hereâs an example JAAS config: + +`SolrJClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="/keytabs/foo.keytab" --- End diff -- /keytabs/solr.keytab ? Perhaps we could put in here an entry that matches what Ambari typically creates. ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r159121022 --- Diff: external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilderV2.java --- @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.solr.schema.builder; + +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer; +import org.apache.solr.client.solrj.request.schema.FieldTypeDefinition; +import org.apache.solr.client.solrj.request.schema.SchemaRequest; +import org.apache.solr.client.solrj.response.schema.SchemaRepresentation; +import org.apache.solr.client.solrj.response.schema.SchemaResponse; +import org.apache.storm.solr.config.SolrConfig; +import org.apache.storm.solr.schema.CopyField; +import org.apache.storm.solr.schema.Field; +import org.apache.storm.solr.schema.FieldType; +import org.apache.storm.solr.schema.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Class that builds the {@link Schema} object from the schema returned by the SchemaRequest + */ +public class RestJsonSchemaBuilderV2 implements SchemaBuilder { +private static final Logger logger = LoggerFactory.getLogger(RestJsonSchemaBuilderV2.class); +private Schema schema = new Schema(); +private SolrConfig solrConfig; +private String collection; + +public RestJsonSchemaBuilderV2(SolrConfig solrConfig, String collection) { +this.solrConfig = solrConfig; +this.collection = collection; +} + +@Override +public void buildSchema() throws IOException { +SolrClient solrClient = null; +try { +if (solrConfig.enableKerberos()) +HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer()); + +solrClient = new CloudSolrClient(solrConfig.getZkHostString()); --- End diff -- The initial code was building the schema from the JSON representation. The other class has been deprecated, which means that JSON is no longer supported. Is there a reason to support both? If so, there should probably be a factory that depending on configuration choice (e.g. Kerberos) would build one or another. The goal of using JSON was to avoid using all of this programatic setting. ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r159120398 --- Diff: external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java --- @@ -153,4 +160,15 @@ private void failQueuedTuples(List failedTuples) { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } +@Override +public void cleanup() { +if (solrClient != null) { +try { +solrClient.close(); +} catch (IOException e) { +LOG.debug("Error while closing solrClient", e); --- End diff -- should it be error level? ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r159120315 --- Diff: external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java --- @@ -88,9 +92,11 @@ private int capacity() { @Override protected void process(Tuple tuple) { try { +LOG.debug("Processing Tuple: {}", tuple); --- End diff -- Storm provides these debug log messages when the Config has setDebug(true). Therefore we don't usually add these types of logging. ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r159120320 --- Diff: external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java --- @@ -128,6 +134,7 @@ private void fail(Tuple tuple, Exception e) { List failedTuples = getQueuedTuples(); failQueuedTuples(failedTuples); } +LOG.debug("Failed Tuple: {}", tuple, e); --- End diff -- Same as above ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r159120430 --- Diff: external/storm-solr/src/main/java/org/apache/storm/solr/schema/SolrFieldTypeFinder.java --- @@ -70,13 +73,22 @@ public String toString() { /** * Initiates class containing all the information relating fields with their types. * This information is parsed from the schema - * @param schema SolrSchema containing the information about fields and types + * @param schemaBuilder schemaBuilder to build the information about fields and types * */ -public SolrFieldTypeFinder(Schema schema) { -if (schema == null) { -throw new IllegalArgumentException("Schema object is null"); +public SolrFieldTypeFinder(SchemaBuilder schemaBuilder) { +this.schemaBuilder = schemaBuilder; +} + +public void initialize() { +if (schemaBuilder == null) { +throw new IllegalArgumentException("schemaBuilder object is null"); --- End diff -- Should this IllegalArgumentException validation be done in the constructor to avoid this exception from occurring at runtime? If it is intended to be done here, perhaps it should be IllegalStateException. ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r159120287 --- Diff: external/storm-solr/README.md --- @@ -171,7 +194,7 @@ Querying Solr for these patterns, you will see the values that have been indexe curl -X GET -H "Content-type:application/json" -H "Accept:application/json" http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*id_fields_test_val*=json=true -curl -X GET -H "Content-type: application/json" -H "Accept: application/json" http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*id_fields_test_val*=json=true +curl -X GET -H "Content-type:application/json" -H "Accept:application/json" http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*json_test_val*=json=true --- End diff -- Was this a bug? ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r159120311 --- Diff: external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java --- @@ -88,9 +92,11 @@ private int capacity() { @Override protected void process(Tuple tuple) { try { +LOG.debug("Processing Tuple: {}", tuple); SolrRequest request = solrMapper.toSolrRequest(tuple); solrClient.request(request, solrMapper.getCollection()); ack(tuple); +LOG.debug("Acked Tuple: {}", tuple); --- End diff -- Same as above ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r159120406 --- Diff: external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java --- @@ -54,4 +65,7 @@ public int getTickTupleInterval() { return tickTupleInterval; } +public boolean enableKerberos() { --- End diff -- NIT: IsKerberosEnabled() or isEnableKerberos() ? ---
[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2480#discussion_r159119896 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java --- @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.metrics; + +import com.google.common.base.Supplier; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.OffsetManager; +import org.apache.storm.metric.api.IMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class KafkaOffsetMetric implements IMetric { + +private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class); +private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier; +private final Supplier consumerSupplier; + +public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier consumerSupplier) { +this.offsetManagerSupplier = offsetManagerSupplier; +this.consumerSupplier = consumerSupplier; +} + +@Override +public Object getValueAndReset() { + +Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get(); +KafkaConsumer kafkaConsumer = consumerSupplier.get(); + +if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) { +LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is null."); +return null; +} + +Map<String,TopicMetrics> topicMetricsMap = new HashMap<>(); +Set topicPartitions = offsetManagers.keySet(); + +Map<TopicPartition, Long> beginningOffsets= kafkaConsumer.beginningOffsets(topicPartitions); +Map<TopicPartition, Long> endOffsets= kafkaConsumer.endOffsets(topicPartitions); +Map<String, Long> result = new HashMap<>(); --- End diff -- it would be useful to have a comment saying what is in this result map ---
[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2480#discussion_r159119706 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -739,4 +764,9 @@ public boolean shouldPoll() { return !this.pollablePartitions.isEmpty(); } } + +@VisibleForTesting +KafkaOffsetMetric getKafkaOffsetMetric() { --- End diff -- If we start adding a lot of these test methods we would be better off but creating a class in the tests packaged called KafkaSpoutTest that extends KafkaSpout and use that one in the tests. All of these methods should go in this class. WE don't want this class to be very bloated. ---
[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2480#discussion_r159119938 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java --- @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.metrics; + +import com.google.common.base.Supplier; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.OffsetManager; +import org.apache.storm.metric.api.IMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class KafkaOffsetMetric implements IMetric { + +private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class); +private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier; +private final Supplier consumerSupplier; + +public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier consumerSupplier) { --- End diff -- What's the reasoning behind passing Supplier rather than the actual object? ---
[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2480#discussion_r159119878 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java --- @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.metrics; + +import com.google.common.base.Supplier; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.OffsetManager; +import org.apache.storm.metric.api.IMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class KafkaOffsetMetric implements IMetric { + +private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class); +private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier; +private final Supplier consumerSupplier; + +public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier consumerSupplier) { +this.offsetManagerSupplier = offsetManagerSupplier; +this.consumerSupplier = consumerSupplier; +} + +@Override +public Object getValueAndReset() { + +Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get(); +KafkaConsumer kafkaConsumer = consumerSupplier.get(); + +if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) { +LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is null."); --- End diff -- Should this be INFO level? Is this going to print this message periodically? ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r158573100 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.KafkaUnitRule; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Time; +import org.junit.Before; +import org.junit.Rule; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public abstract class KafkaSpoutAbstractTest { +@Rule +public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); + +@Captor +ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; + +final TopologyContext topologyContext = mock(TopologyContext.class); +final Map<String, Object> conf = new HashMap<>(); +final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); +final long commitOffsetPeriodMs = 2_000; +final int maxRetries = 3; +KafkaConsumer<String, String> consumerSpy; +KafkaConsumerFactory<String, String> consumerFactory; +KafkaSpout<String, String> spout; +final int maxPollRecords = 10; + +@Before +public void setUp() { +MockitoAnnotations.initMocks(this); + +final KafkaSpoutConfig<String, String> spoutConfig = createSpoutConfig(); + +consumerSpy = spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig)); + +consumerFactory = new KafkaConsumerFactory<String, String>() { +@Override +public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) { +return consumerSpy; +} + +}; + +spout = new KafkaSpout<>(spoutConfig, consumerFactory); +} + + +abstract KafkaSpoutConfig<String, String> createSpoutConfig(); + +void prepareSpout(int messageCount) throws Exception { + SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount); +SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); +} + +/** + * Helper method to in sequence do: + * + * spout.nexTuple() + * verify messageId + * spout.ack(msgId) + * reset(collector) to be able to reuse mock + * + * + * @param offset offset of message to be verified + * @return {@link ArgumentCaptor} of the messageId verified + */ +ArgumentCaptor nextTuple_verifyEmitted_ack_resetCollectorMock(int offs
[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2465 @srdo done. Pls check and I will squash the commits right away. I would like to try to merge this in today. I will update the master PR with everything squashed already. Thanks. ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r158572508 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.utils.Time; +import org.junit.Test; + +import java.util.regex.Pattern; + +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +public class KafkaSpoutTopologyDeployActivateDeactivateTest extends KafkaSpoutAbstractTest { +@Override +KafkaSpoutConfig<String, String> createSpoutConfig() { +return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig( +KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), +Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))) +.setOffsetCommitPeriodMs(commitOffsetPeriodMs) +.setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), +maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) + .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) +.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) +.build(); +} + +@Test +public void test_FirstPollStrategy_Earliest_NotEnforced_OnTopologyActivateDeactivate() throws Exception { +try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) { --- End diff -- Done ---
[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2465 @srdo It should be OK now. If you have further requests lest's file a refactoring JIRA and include in it refactoring some of the unit tests for better code reuse. ---