[GitHub] storm pull request #2454: STORM-2847: Ensure spout can handle being activate...

2017-12-14 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2454#discussion_r157104947
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -442,10 +435,13 @@ private boolean isEmitTuple(List tuple) {
 return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
 }
 
-private void commitOffsetsForAckedTuples() {
-// Find offsets that are ready to be committed for every topic 
partition
+private void commitOffsetsForAckedTuples(Set 
assignedPartitions) {
+// Find offsets that are ready to be committed for every assigned 
topic partition
+final Map<TopicPartition, OffsetManager> assignedOffsetManagers = 
offsetManagers.entrySet().stream()
+.filter(entry -> assignedPartitions.contains(entry.getKey()))
+.collect(Collectors.toMap(entry -> entry.getKey(), entry -> 
entry.getValue()));
 final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = 
new HashMap<>();
--- End diff --

An empty line before this var would make the code easier to read


---


[GitHub] storm issue #2454: STORM-2847: Ensure spout can handle being activated and d...

2017-12-14 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2454
  
@srdo Can you explain how can the drawback scenario you describe in your 
[comment](https://github.com/apache/storm/pull/2454#issuecomment-351428500) 
happen? When activate happens, refresh partitions will be called, 
onPartitionsRevoked will commit only for the partitions that are now assigned 
to the consumer (so acks that make commits eligible for other partitions won't 
matter), and onPartitionsReassigned will remove the offset managers for the 
partitions that are no longer assigned to this spout instance.


---


[GitHub] storm issue #2460: STORM-2851 1.x

2017-12-15 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2460
  
+1


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-19 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r157842966
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
 ---
@@ -55,12 +58,12 @@ public void 
testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAc
 manager.addToAckMsgs(getMessageId(initialFetchOffset + 2));
 manager.addToAckMsgs(getMessageId(initialFetchOffset + 6));
 
-assertThat("The offset manager should not skip past offset 5 which 
is still pending", manager.findNextCommitOffset().offset(), 
is(initialFetchOffset + 3));
+assertThat("The offset manager should not skip past offset 5 which 
is still pending", manager.findNextCommitOffset(COMMIT_METADATA).offset(), 
is(initialFetchOffset + 3));
 
 manager.addToAckMsgs(getMessageId(initialFetchOffset + 5));
 
 assertThat("The offset manager should skip past the gap in acked 
messages, since the messages were not emitted", 
-manager.findNextCommitOffset().offset(), is(initialFetchOffset 
+ 7));
+manager.findNextCommitOffset(COMMIT_METADATA).offset(), 
is(initialFetchOffset + 7));
--- End diff --

OK.


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-19 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r157831079
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -115,26 +115,30 @@ public KafkaSpoutConfig(Builder<K, V> builder) {
 }
 
 /**
- * The offset used by the Kafka spout in the first poll to Kafka 
broker. The choice of this parameter will affect the number of consumer
- * records returned in the first poll. By default this parameter is 
set to UNCOMMITTED_EARLIEST. 
+ * Defines the offset used by the {@link KafkaSpout} in the first poll 
to Kafka broker. The choice of this parameter will affect
+ * the number of consumer records returned in the first poll. By 
default this parameter is set to UNCOMMITTED_EARLIEST. 
--- End diff --

OK.


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-19 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r157831049
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -115,26 +115,30 @@ public KafkaSpoutConfig(Builder<K, V> builder) {
 }
 
 /**
- * The offset used by the Kafka spout in the first poll to Kafka 
broker. The choice of this parameter will affect the number of consumer
- * records returned in the first poll. By default this parameter is 
set to UNCOMMITTED_EARLIEST. 
+ * Defines the offset used by the {@link KafkaSpout} in the first poll 
to Kafka broker. The choice of this parameter will affect
+ * the number of consumer records returned in the first poll. By 
default this parameter is set to UNCOMMITTED_EARLIEST. 
  * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, 
UNCOMMITTED_LATEST. 
  * 
- * EARLIEST means that the kafka spout polls records starting in 
the first offset of the partition, regardless of previous
- * commits
- * LATEST means that the kafka spout polls records with offsets 
greater than the last offset in the partition, regardless of
- * previous commits
+ * EARLIEST when the topology is first deployed the kafka spout 
polls records starting in the first offset of the
--- End diff --

OK, however this statement is correct because if a topology is deployed 
again it is a new topology because it has a new id and creates new object 
instances.


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-19 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r157831266
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -204,33 +226,70 @@ private void initialize(Collection 
partitions) {
 /**
  * Sets the cursor to the location dictated by the first poll 
strategy and returns the fetch offset.
  */
-private long doSeek(TopicPartition tp, OffsetAndMetadata 
committedOffset) {
-if (committedOffset != null) { // offset was 
committed for this TopicPartition
-if (firstPollOffsetStrategy.equals(EARLIEST)) {
-
kafkaConsumer.seekToBeginning(Collections.singleton(tp));
-} else if (firstPollOffsetStrategy.equals(LATEST)) {
-kafkaConsumer.seekToEnd(Collections.singleton(tp));
+private long doSeek(TopicPartition newTp, OffsetAndMetadata 
committedOffset) {
+LOG.trace("Seeking offset for topic-partition {} with {} and 
{}", newTp, firstPollOffsetStrategy, committedOffset);
--- End diff --

OK


---


[GitHub] storm issue #2466: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-19 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2466
  
@srdo this is the master version.


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-19 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r157930206
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/OffsetAndMetadataMocks.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.CommitMetadata;
+import org.apache.storm.task.TopologyContext;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class OffsetAndMetadataMocks {
+public static final String COMMIT_METADATA = 
"{\"topologyId\":\"tp1\",\"taskId\":3,\"thread\":\"Thread-20\"}";
+
+public static OffsetAndMetadata createMocksTree(KafkaConsumer<String, 
String> consumerMock,
--- End diff --

Done


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-19 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r157930210
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/OffsetAndMetadataMocks.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.CommitMetadata;
+import org.apache.storm.task.TopologyContext;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class OffsetAndMetadataMocks {
+public static final String COMMIT_METADATA = 
"{\"topologyId\":\"tp1\",\"taskId\":3,\"thread\":\"Thread-20\"}";
+
+public static OffsetAndMetadata createMocksTree(KafkaConsumer<String, 
String> consumerMock,
+TopologyContext 
topologyContext,
+TopicPartition 
topicPartition) throws java.io.IOException {
+
+OffsetAndMetadata oam = mock(OffsetAndMetadata.class);
+when(consumerMock.committed(topicPartition))
+.thenReturn(oam);
+
+when(oam.metadata())
+.thenReturn(COMMIT_METADATA);
+
+when(topologyContext.getStormId()).thenReturn("tp1");
+
+ObjectMapper om = mock(ObjectMapper.class);
--- End diff --

Done


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-19 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r157930231
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
 ---
@@ -71,6 +75,10 @@
 1, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) 
//Retry once after a minute
 .build();
 private KafkaSpout<String, String> spout;
+private final TopicPartition topicPartition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+private KafkaConsumer<String, String> consumerMock;
--- End diff --

Done


---


[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-19 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2465
  
@srdo can you please do one last review. Thanks.


---


[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-18 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2465
  
@srdo it should be good now. Can you please take a look. Thanks.


---


[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-16 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2465
  
@srdo I am working on making unit tests pass and then will squash and 
create master PR. Can you take another look in the meantime. Thanks.


---


[GitHub] storm pull request #2428: STORM-2826: Set key/value deserializer fields when...

2017-11-19 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2428#discussion_r151886343
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -292,17 +292,21 @@ private Builder(String bootstrapServers, 
SerializableDeserializer keyDes, Cla
 this.subscription = subscription;
 this.translator = new DefaultRecordTranslator<>();
 
-if (keyDesClazz != null) {
-
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
-}
-if (keyDes != null) {
-
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDes.getClass());
-}
-if (valueDesClazz != null) {
-
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDesClazz);
+if 
(!this.kafkaProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
--- End diff --

@srdo can you please clarify what you are trying to do? What happens if 
this if statement is false? Won't it cause kafkaProps to keep whatever value 
they have set and the fields keyClassDeserializer something else? What are the 
implications of that ?

The two ifs bellow, on lines 296 and 299, I think they can possibly be both 
true and with different values, if you are dealing with subtypes. If so, what 
happens in that case?


---


[GitHub] storm pull request #2426: STORM-2825: Fix ClassCastException when storm-kafk...

2017-11-19 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2426#discussion_r151887752
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -718,7 +718,13 @@ private static void setAutoCommitMode(Builder 
builder) {
 + " This will be treated as an error in the next major 
release."
 + " For now the spout will be configured to behave like it 
would have in pre-1.2.0 releases.");
 
-final boolean enableAutoCommit = 
(boolean)builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+Object enableAutoCommitValue = 
builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
--- End diff --

@srdo I believe that this logic can be simplified to:

``` java
final boolean isAutoCommitEnabled = 
Boolean.parseBoolean(builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).toString());
```
or if you prefer it more explicit:

``` java
final Object autoCommitConf = 
builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
final boolean isAutoCommitEnabled = 
Boolean.parseBoolean(autoCommitConf.toString());
```


---


[GitHub] storm issue #2427: MINOR: Use booleans instead of strings for 'enable.auto.c...

2017-11-19 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2427
  
+1


---


[GitHub] storm issue #2426: STORM-2825: Fix ClassCastException when storm-kafka-clien...

2017-11-21 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2426
  
+1. Thanks @srdo. Can you please squash the commits before merging. Thanks.


---


[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...

2017-12-07 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2438
  
@arunmahadevan @HeartSaVioR @srdo this patch has been merged into master 
and 1.x-branch


---


[GitHub] storm pull request #2438: STORM-2835: storm-kafka-client KafkaSpout can fail...

2017-12-07 Thread hmcl
Github user hmcl closed the pull request at:

https://github.com/apache/storm/pull/2438


---


[GitHub] storm pull request #2438: STORM-2835: storm-kafka-client KafkaSpout can fail...

2017-12-05 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2438#discussion_r155007745
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -242,9 +242,7 @@ public void nextTuple() {
 }
 }
 
-if (waitingToEmit()) {
-emit();
-}
+emitIfWaitingNotEmitted();
--- End diff --

Done


---


[GitHub] storm issue #2409: STORM-2796: Flux: Provide means for invoking static facto...

2017-12-08 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2409
  
@HeartSaVioR there is not one major project that does not require 
contributors to merge commits. It takes a few minutes and it makes a world of 
difference in terms of making the git log easy to understand, and most 
importantly, easy to cherry-pick.


---


[GitHub] storm issue #2451: STORM-2850: Make ManualPartitionSubscription call rebalan...

2017-12-08 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2451
  
+1


---


[GitHub] storm issue #2428: STORM-2826: Set key/value deserializer fields when using ...

2017-12-07 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2428
  
@arunmahadevan I am looking into this now. Thanks.


---


[GitHub] storm issue #2448: Quick fix: correcting markdown format

2017-12-07 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2448
  
@Ethanlm can you please "Quick Fix" with "MINOR: " Thanks.
+1 after fixing the commit message.


---


[GitHub] storm issue #2448: MINOR: correcting markdown format

2017-12-07 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2448
  
@Ethanlm yeah, I noticed it right after my comment. Somehow I had not 
refreshed my PRs view. If it is easy I will change the commit message since I 
am about to merge something. Otherwise we will just leave it.


---


[GitHub] storm pull request #2428: STORM-2826: Set key/value deserializer fields when...

2017-12-08 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2428#discussion_r155909342
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -359,7 +356,7 @@ private Builder(Builder builder, 
SerializableDeserializer keyDes, Class
  */
 @Deprecated
 public  Builder<NK, V> setKey(Class> clazz) {
--- End diff --

@srdo do you know why this method returns a new builder object? I can't 
figure a reason for it to so. I suspect that the only reason for that to happen 
is because the fields of the builder class are final (e.g keyDesClassClazz), 
and to make the generics work. There is no benefit in having fields inside the 
builder class to be final. The code snippet bellow also fixes the generics 
problem. Any reason not to get rid of the builder (with copy constructor) class 
completely and make this method like this:

```java
public Builder<K,V> setKey(Class> clazz) {
this.keyDesClazz = clazz;
if (keyDesClazz != null) {

this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
}
return this;
}
```

We should do something similar to the other 3 methods. In my opinion has 
become a bit confusing, and I believe this is one of the last few opportunities 
we have to make it better. Please let me know your thoughts. Thanks.


---


[GitHub] storm issue #2428: STORM-2826: Set key/value deserializer fields when using ...

2017-12-08 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2428
  
@srdo aiming to getting this PR merged more quickly I created a 
[PR](https://github.com/srdo/storm/pull/1) with a suggested fix off your 
branch. If you agree with the fix, can you please incorporate it, squash the 
commits, and push it again here. I will then review it right away. Thanks.


---


[GitHub] storm pull request #2454: STORM-2847: Ensure spout can handle being activate...

2017-12-11 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2454#discussion_r156223919
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -186,12 +182,13 @@ private void initialize(Collection 
partitions) {
 for (TopicPartition tp : newPartitions) {
 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
 final long fetchOffset = doSeek(tp, committedOffset);
+LOG.debug("Set consumer position to [{}] for 
topic-partition [{}], based on strategy [{}] and committed offset [{}]",
--- End diff --

LOL, my bad :). I was reviewing this on diff and confused it.


---


[GitHub] storm pull request #2454: STORM-2847: Ensure spout can handle being activate...

2017-12-11 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2454#discussion_r156220249
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -395,7 +387,7 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> 
record) {
 } else if (emitted.contains(msgId)) {   // has been emitted and it 
is pending ack or fail
 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
 } else {
-if (kafkaConsumer.committed(tp) != null && 
(kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
+if (kafkaConsumer.committed(tp) != null && 
(kafkaConsumer.committed(tp).offset() > kafkaConsumer.position(tp))) {
--- End diff --

Is this change to address https://issues.apache.org/jira/browse/STORM-2844 ?


---


[GitHub] storm pull request #2454: STORM-2847: Ensure spout can handle being activate...

2017-12-11 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2454#discussion_r156208523
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -186,12 +182,13 @@ private void initialize(Collection 
partitions) {
 for (TopicPartition tp : newPartitions) {
 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
 final long fetchOffset = doSeek(tp, committedOffset);
+LOG.debug("Set consumer position to [{}] for 
topic-partition [{}], based on strategy [{}] and committed offset [{}]",
--- End diff --

Isn't this log message useful? I would suggest that unless this info is 
elsewhere, we leave this message. If you want it lower priority, we can put it 
as TRACE level.


---


[GitHub] storm pull request #2454: STORM-2847: Ensure spout can handle being activate...

2017-12-11 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2454#discussion_r156208684
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -223,29 +220,24 @@ private long doSeek(TopicPartition tp, 
OffsetAndMetadata committedOffset) {
 @Override
 public void nextTuple() {
 try {
-if (initialized) { 
--- End diff --

Why is this flag no longer necessary?


---


[GitHub] storm pull request #2454: STORM-2847: Ensure spout can handle being activate...

2017-12-11 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2454#discussion_r156224126
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -395,7 +387,7 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> 
record) {
 } else if (emitted.contains(msgId)) {   // has been emitted and it 
is pending ack or fail
 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
 } else {
-if (kafkaConsumer.committed(tp) != null && 
(kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
+if (kafkaConsumer.committed(tp) != null && 
(kafkaConsumer.committed(tp).offset() > kafkaConsumer.position(tp))) {
--- End diff --

OK, I agree. Because initially I thought that this could be a potential 
fix, but then found out that it wouldn't work. I was wondering if I had missed 
anything. Thanks.


---


[GitHub] storm issue #2428: STORM-2826: Set key/value deserializer fields when using ...

2017-12-11 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2428
  
@srdo although some of these methods have been deprecated for 2.0, 
customers that are currently in a 1.x.y version will likely use this version 
for a few years. We will have to maintain this codebase for quite a long time, 
and therefore I am in favor of making at least the code a bit more readable. I 
had quite a hard time to understand what the existing code is doing. I have 
another suggestion, which I also shared with you on a 
[PR](https://github.com/srdo/storm/pull/1). 

I will leave it up to you which one to pick and I am +1 after that such 
that we can move forward. Thanks.



---


[GitHub] storm issue #2393: STORM-2781: Refactor storm-kafka-client KafkaSpout Proces...

2017-10-29 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2393
  
@HeartSaVioR for 1.x-branch


---


[GitHub] storm pull request #2394: 1.x branch storm 2787 ks init flag

2017-10-29 Thread hmcl
GitHub user hmcl opened a pull request:

https://github.com/apache/storm/pull/2394

1.x branch storm 2787 ks init flag



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hmcl/storm-apache 
1.x-branch_STORM-2787_KSInitFlag

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2394.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2394


commit ac16fe1ee9d974af64a30769819561c0abae23af
Author: Hugo Louro <hmclo...@gmail.com>
Date:   2017-10-23T00:44:54Z

STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees

 - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE
 - Refactor method name from setForceEnableTupleTracking to 
setTupleTrackingEnforced
 - Throw IllegalStateException instead of IllegalArgumentException if spout 
attempts to emit an already committed message
 - Update documentation to reflect these changes

commit 7d4ac07684a7405d6539a3bd0cb7da985736bac7
Author: Hugo Louro <hmclo...@gmail.com>
Date:   2017-10-25T06:52:54Z

STORM-2787: storm-kafka-client KafkaSpout method onPartitionsRevoked(...) 
should set initialized flag independently of processing guarantees




---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147012506
  
--- Diff: docs/storm-kafka-client.md ---
@@ -298,25 +298,44 @@ Currently the Kafka spout has has the following 
default values, which have been
 * max.uncommitted.offsets = 1000
 
 
-# Messaging reliability modes
+# Processing Guarantees
 
-In some cases you may not need or want the spout to guarantee 
at-least-once processing of messages. The spout also supports at-most-once and 
any-times modes. At-most-once guarantees that any tuple emitted to the topology 
will never be reemitted. Any-times makes no guarantees, but may reduce the 
overhead of committing offsets to Kafka in cases where you truly don't care how 
many times a message is processed.
+The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when 
the tuple with the `ConsumerRecord` for an offset is marked
+as processed, i.e. when the offset is committed to Kafka. For 
AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
+the commit happens. When the guarantee is NONE Kafka controls when the 
commit happens.
+
+* AT_LEAST_ONCE - an offset is ready to commit only after the 
corresponding tuple has been processed (at-least-once)
+ and acked. If a tuple fails or times-out it will be re-emitted. A 
tuple can be processed more than once if for instance
+ the ack gets lost.
+
+* AT_MOST_ONCE - every offset will be committed to Kafka right after being 
polled but before being emitted
+ to the downstream components of the topology. It guarantees that the 
offset is processed at-most-once because it
+ won't retry tuples that fail or timeout after the commit to Kafka has 
been done.
+
+* NONE - the polled offsets are committed to Kafka periodically as 
controlled by the Kafka properties
+ "enable.auto.commit" and "auto.commit.interval.ms". Because the spout 
does not control when the commit happens
+ it cannot give any message processing guarantees, i.e. a message may 
be processed 0, 1 or more times.
+ This option requires "enable.auto.commit=true". If 
"enable.auto.commit=false" an exception will be thrown.
+
+To set the processing guarantee use the 
`KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows:
 
-To set the processing guarantee, use the 
KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g.
 ```java
 KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
   .builder(String bootstrapServers, String ... topics)
   .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
 ```
 
-The spout will disable tuple tracking for emitted tuples by default when 
you use at-most-once or any-times. In some cases you may want to enable 
tracking anyway, because tuple tracking is necessary for some features of 
Storm, e.g. showing complete latency in Storm UI, or enabling backpressure 
through the `Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter.
+# Tuple Tracking
+
+By default the spout only tracks emitted tuples when the processing 
guarantee is AT_LEAST_ONCE. It may be necessary to track
+emitted tuples with other processing guarantees to benefit of Storm 
features such as showing complete latency in the UI,
+or enabling backpressure with Config.TOPOLOGY_MAX_SPOUT_PENDING.
 
-If you need to enable tracking, use the 
KafkaSpoutConfig.Builder.setForceEnableTupleTracking method, e.g.
 ```java
 KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
   .builder(String bootstrapServers, String ... topics)
   .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
-  .setForceEnableTupleTracking(true)
+  .setTupleTrackingEnforced(true)
 ```
 
-Note that this setting has no effect in at-least-once mode, where tuple 
tracking is always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantees 
where tuple tracking is required and therefore always enabled.
--- End diff --

Done


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147012458
  
--- Diff: docs/storm-kafka-client.md ---
@@ -298,25 +298,44 @@ Currently the Kafka spout has has the following 
default values, which have been
 * max.uncommitted.offsets = 1000
 
 
-# Messaging reliability modes
+# Processing Guarantees
 
-In some cases you may not need or want the spout to guarantee 
at-least-once processing of messages. The spout also supports at-most-once and 
any-times modes. At-most-once guarantees that any tuple emitted to the topology 
will never be reemitted. Any-times makes no guarantees, but may reduce the 
overhead of committing offsets to Kafka in cases where you truly don't care how 
many times a message is processed.
+The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when 
the tuple with the `ConsumerRecord` for an offset is marked
+as processed, i.e. when the offset is committed to Kafka. For 
AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
+the commit happens. When the guarantee is NONE Kafka controls when the 
commit happens.
+
+* AT_LEAST_ONCE - an offset is ready to commit only after the 
corresponding tuple has been processed (at-least-once)
+ and acked. If a tuple fails or times-out it will be re-emitted. A 
tuple can be processed more than once if for instance
+ the ack gets lost.
+
+* AT_MOST_ONCE - every offset will be committed to Kafka right after being 
polled but before being emitted
--- End diff --

Done


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147012467
  
--- Diff: docs/storm-kafka-client.md ---
@@ -298,25 +298,44 @@ Currently the Kafka spout has has the following 
default values, which have been
 * max.uncommitted.offsets = 1000
 
 
-# Messaging reliability modes
+# Processing Guarantees
 
-In some cases you may not need or want the spout to guarantee 
at-least-once processing of messages. The spout also supports at-most-once and 
any-times modes. At-most-once guarantees that any tuple emitted to the topology 
will never be reemitted. Any-times makes no guarantees, but may reduce the 
overhead of committing offsets to Kafka in cases where you truly don't care how 
many times a message is processed.
+The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when 
the tuple with the `ConsumerRecord` for an offset is marked
+as processed, i.e. when the offset is committed to Kafka. For 
AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
+the commit happens. When the guarantee is NONE Kafka controls when the 
commit happens.
+
+* AT_LEAST_ONCE - an offset is ready to commit only after the 
corresponding tuple has been processed (at-least-once)
+ and acked. If a tuple fails or times-out it will be re-emitted. A 
tuple can be processed more than once if for instance
+ the ack gets lost.
+
+* AT_MOST_ONCE - every offset will be committed to Kafka right after being 
polled but before being emitted
+ to the downstream components of the topology. It guarantees that the 
offset is processed at-most-once because it
--- End diff --

Done


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147012486
  
--- Diff: docs/storm-kafka-client.md ---
@@ -298,25 +298,44 @@ Currently the Kafka spout has has the following 
default values, which have been
 * max.uncommitted.offsets = 1000
 
 
-# Messaging reliability modes
+# Processing Guarantees
 
-In some cases you may not need or want the spout to guarantee 
at-least-once processing of messages. The spout also supports at-most-once and 
any-times modes. At-most-once guarantees that any tuple emitted to the topology 
will never be reemitted. Any-times makes no guarantees, but may reduce the 
overhead of committing offsets to Kafka in cases where you truly don't care how 
many times a message is processed.
+The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when 
the tuple with the `ConsumerRecord` for an offset is marked
+as processed, i.e. when the offset is committed to Kafka. For 
AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
+the commit happens. When the guarantee is NONE Kafka controls when the 
commit happens.
+
+* AT_LEAST_ONCE - an offset is ready to commit only after the 
corresponding tuple has been processed (at-least-once)
+ and acked. If a tuple fails or times-out it will be re-emitted. A 
tuple can be processed more than once if for instance
+ the ack gets lost.
+
+* AT_MOST_ONCE - every offset will be committed to Kafka right after being 
polled but before being emitted
+ to the downstream components of the topology. It guarantees that the 
offset is processed at-most-once because it
+ won't retry tuples that fail or timeout after the commit to Kafka has 
been done.
--- End diff --

Done


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r146997432
  
--- Diff: docs/storm-kafka-client.md ---
@@ -298,25 +298,44 @@ Currently the Kafka spout has has the following 
default values, which have been
 * max.uncommitted.offsets = 1000
 
 
-# Messaging reliability modes
+# Processing Guarantees
 
-In some cases you may not need or want the spout to guarantee 
at-least-once processing of messages. The spout also supports at-most-once and 
any-times modes. At-most-once guarantees that any tuple emitted to the topology 
will never be reemitted. Any-times makes no guarantees, but may reduce the 
overhead of committing offsets to Kafka in cases where you truly don't care how 
many times a message is processed.
+The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when 
the tuple with the `ConsumerRecord` for an offset is marked
--- End diff --

Maybe we can come up with a bit better wording for this, but I really don't 
think that we should say that an offset is marked as processed. Offsets are 
committed, not processed. ConsumerRecords, wrapped by Tuples are processed.


---


[GitHub] storm issue #2385: YSTORM-2727: Generic Resource Aware Scheduling

2017-10-25 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2385
  
@govind-menon why is YSTORM-2725 prefixed by Y? Should it be STORM-2725? If 
so, can you please fix the typo. Thanks.


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r146996683
  
--- Diff: docs/storm-kafka-client.md ---
@@ -298,25 +298,44 @@ Currently the Kafka spout has has the following 
default values, which have been
 * max.uncommitted.offsets = 1000
 
 
-# Messaging reliability modes
+# Processing Guarantees
 
-In some cases you may not need or want the spout to guarantee 
at-least-once processing of messages. The spout also supports at-most-once and 
any-times modes. At-most-once guarantees that any tuple emitted to the topology 
will never be reemitted. Any-times makes no guarantees, but may reduce the 
overhead of committing offsets to Kafka in cases where you truly don't care how 
many times a message is processed.
+The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when 
the tuple with the `ConsumerRecord` for an offset is marked
--- End diff --

Well, this is tricky because Storm does not process offsets, storm 
processes tuples. More exactly, it processes tuples that contain 
ConsumerRecord's. The offset is just part of the  ConsumerRecord, which also 
contains key, val, etc... We commit the offset, but by committing the offset we 
are technically marking that the tuple was processed because even if the tuple 
fails, it won't be retried (processed again).


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r146996955
  
--- Diff: docs/storm-kafka-client.md ---
@@ -298,25 +298,44 @@ Currently the Kafka spout has has the following 
default values, which have been
 * max.uncommitted.offsets = 1000
 
 
-# Messaging reliability modes
+# Processing Guarantees
 
-In some cases you may not need or want the spout to guarantee 
at-least-once processing of messages. The spout also supports at-most-once and 
any-times modes. At-most-once guarantees that any tuple emitted to the topology 
will never be reemitted. Any-times makes no guarantees, but may reduce the 
overhead of committing offsets to Kafka in cases where you truly don't care how 
many times a message is processed.
+The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when 
the tuple with the `ConsumerRecord` for an offset is marked
+as processed, i.e. when the offset is committed to Kafka. For 
AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
--- End diff --

Done


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r146997051
  
--- Diff: docs/storm-kafka-client.md ---
@@ -298,25 +298,44 @@ Currently the Kafka spout has has the following 
default values, which have been
 * max.uncommitted.offsets = 1000
 
 
-# Messaging reliability modes
+# Processing Guarantees
 
-In some cases you may not need or want the spout to guarantee 
at-least-once processing of messages. The spout also supports at-most-once and 
any-times modes. At-most-once guarantees that any tuple emitted to the topology 
will never be reemitted. Any-times makes no guarantees, but may reduce the 
overhead of committing offsets to Kafka in cases where you truly don't care how 
many times a message is processed.
+The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when 
the tuple with the `ConsumerRecord` for an offset is marked
+as processed, i.e. when the offset is committed to Kafka. For 
AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
+the commit happens. When the guarantee is NONE Kafka controls when the 
commit happens.
+
+* AT_LEAST_ONCE - an offset is ready to commit only after the 
corresponding tuple has been processed (at-least-once)
+ and acked. If a tuple fails or times-out it will be re-emitted. A 
tuple can be processed more than once if for instance
--- End diff --

Done


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147012706
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -78,17 +78,17 @@
 private transient KafkaSpoutRetryService retryService;
 // Handles tuple events (emit, ack etc.)
 private transient KafkaTupleListener tupleListener;
-// timer == null for modes other than at-least-once
+// timer == null if processing guarantee is other than at-least-once
--- End diff --

Done


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147013419
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -78,17 +78,17 @@
 private transient KafkaSpoutRetryService retryService;
 // Handles tuple events (emit, ack etc.)
 private transient KafkaTupleListener tupleListener;
-// timer == null for modes other than at-least-once
+// timer == null if processing guarantee is other than at-least-once
 private transient Timer commitTimer;
 // Flag indicating that the spout is still undergoing initialization 
process.
 private transient boolean initialized;
 // Initialization is only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
 // Tuples that were successfully acked/emitted. These tuples will be 
committed periodically when the commit timer expires,
-//or after a consumer rebalance, or during close/deactivate. Always 
empty if not using at-least-once mode.
+// or after a consumer rebalance, or during close/deactivate. Always 
empty if not using at-least-once processing guarantee.
 private transient Map<TopicPartition, OffsetManager> offsetManagers;
 // Tuples that have been emitted but that are "on the wire", i.e. 
pending being acked or failed.
-// Always empty if not using at-least-once mode.
+// Always empty if processing guarantee is other than at-least-once.
--- End diff --

Done. Removed the double negation.


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147013385
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -78,17 +78,17 @@
 private transient KafkaSpoutRetryService retryService;
 // Handles tuple events (emit, ack etc.)
 private transient KafkaTupleListener tupleListener;
-// timer == null for modes other than at-least-once
+// timer == null if processing guarantee is other than at-least-once
 private transient Timer commitTimer;
 // Flag indicating that the spout is still undergoing initialization 
process.
 private transient boolean initialized;
 // Initialization is only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
 // Tuples that were successfully acked/emitted. These tuples will be 
committed periodically when the commit timer expires,
-//or after a consumer rebalance, or during close/deactivate. Always 
empty if not using at-least-once mode.
+// or after a consumer rebalance, or during close/deactivate. Always 
empty if not using at-least-once processing guarantee.
--- End diff --

Done. Removed the double negation.


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147013717
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -255,26 +255,25 @@ private void throwKafkaConsumerInterruptedException() 
{
 }
 
 private boolean commit() {
-return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue();
// timer != null for non auto commit mode
+return isAtLeastOnceProcessing() && 
commitTimer.isExpiredResetOnTrue();// timer != null for non auto commit mode
 }
 
 private boolean poll() {
 final int maxUncommittedOffsets = 
kafkaSpoutConfig.getMaxUncommittedOffsets();
 final int readyMessageCount = retryService.readyMessageCount();
 final boolean poll = !waitingToEmit()
-//Check that the number of uncommitted, nonretriable tuples is 
less than the maxUncommittedOffsets limit
-//Accounting for retriable tuples this way still guarantees 
that the limit is followed on a per partition basis,
-//and prevents locking up the spout when there are too many 
retriable tuples
-&& (numUncommittedOffsets - readyMessageCount < 
maxUncommittedOffsets
-|| !isAtLeastOnce());
+// Check that the number of uncommitted, non-retriable 
tuples is less than the maxUncommittedOffsets limit.
--- End diff --

Done.


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147013620
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -125,8 +125,8 @@ public void open(Map<String, Object> conf, 
TopologyContext context, SpoutOutputC
 
 tupleListener = kafkaSpoutConfig.getTupleListener();
 
-if (isAtLeastOnce()) {
-// Only used if the spout commits offsets for acked tuples
+if (isAtLeastOnceProcessing()) {
+// Only used if the spout should commit to Kafka an offset 
only after its tuple has been acked.
--- End diff --

Done.


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147014032
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -336,22 +335,25 @@ private void emit() {
 private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
 final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
 final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, 
record.offset());
+
 if (offsetManagers.containsKey(tp) && 
offsetManagers.get(tp).contains(msgId)) {   // has been acked
 LOG.trace("Tuple for record [{}] has already been acked. 
Skipping", record);
-} else if (emitted.contains(msgId)) {   // has been emitted and 
it's pending ack or fail
+} else if (emitted.contains(msgId)) {   // has been emitted and it 
is pending ack or fail
 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
 } else {
-Validate.isTrue(kafkaConsumer.committed(tp) == null || 
kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp),
-"The spout is about to emit a message that has already 
been committed."
-+ " This should never occur, and indicates a bug in the 
spout");
+if (kafkaConsumer.committed(tp) != null && 
(kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
--- End diff --

Validate throws an IllegalArgumentException where the correct exception 
here is IllegalStateException. Furthermore, Validate in my opinion as a 
confusing API - Validate(true) throws an exception if false. It is misleading 
to me. I would rather leave it like this.


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147015173
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
 //  Ack ===
 @Override
 public void ack(Object messageId) {
-if (!isAtLeastOnce()) {
-// Only need to keep track of acked tuples if commits are done 
based on acks
-return;
-}
-
+// Only need to keep track of acked tuples if commits to Kafka are 
done after a tuple ack is received
 final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
-if (!emitted.contains(msgId)) {
-if (msgId.isEmitted()) {
-LOG.debug("Received ack for message [{}], associated with 
tuple emitted for a ConsumerRecord that "
-+ "came from a topic-partition that this consumer 
group instance is no longer tracking "
-+ "due to rebalance/partition reassignment. No action 
taken.", msgId);
+if (isAtLeastOnceProcessing()) {
--- End diff --

I disagree. Any method that has only one if condition and nothing else, 
should be 
```java 
if (condition == true) 
 do_action;
```
imho it is counter natural to have code like 
```java
if (condition = false) 
do_nothing;
else 
 do_action;
```
which is what basically the early return is doing.

There are also lengthier reasons related to the semantics of OOP, but I 
just think that in general wherever possible one should have code like if 
(condtion==true) do action.


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147015407
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
 //  Ack ===
 @Override
 public void ack(Object messageId) {
-if (!isAtLeastOnce()) {
-// Only need to keep track of acked tuples if commits are done 
based on acks
-return;
-}
-
+// Only need to keep track of acked tuples if commits to Kafka are 
done after a tuple ack is received
 final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
-if (!emitted.contains(msgId)) {
-if (msgId.isEmitted()) {
-LOG.debug("Received ack for message [{}], associated with 
tuple emitted for a ConsumerRecord that "
-+ "came from a topic-partition that this consumer 
group instance is no longer tracking "
-+ "due to rebalance/partition reassignment. No action 
taken.", msgId);
+if (isAtLeastOnceProcessing()) {
+if (!emitted.contains(msgId)) {
+if (msgId.isEmitted()) {
+LOG.debug("Received ack for message [{}], associated 
with tuple emitted for a ConsumerRecord that "
++ "came from a topic-partition that this consumer 
group instance is no longer tracking "
++ "due to rebalance/partition reassignment. No 
action taken.", msgId);
+} else {
+LOG.debug("Received direct ack for message [{}], 
associated with null tuple", msgId);
+}
 } else {
-LOG.debug("Received direct ack for message [{}], 
associated with null tuple", msgId);
+Validate.isTrue(!retryService.isScheduled(msgId), "The 
message id " + msgId + " is queued for retry while being acked."
++ " This should never occur barring errors in the 
RetryService implementation or the spout code.");
+
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
+emitted.remove(msgId);
 }
-} else {
-Validate.isTrue(!retryService.isScheduled(msgId), "The message 
id " + msgId + " is queued for retry while being acked."
-+ " This should never occur barring errors in the 
RetryService implementation or the spout code.");
-
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
-emitted.remove(msgId);
+tupleListener.onAck(msgId);
 }
-tupleListener.onAck(msgId);
 }
 
 //  Fail ===
 @Override
 public void fail(Object messageId) {
-if (!isAtLeastOnce()) {
-// Only need to keep track of failed tuples if commits are 
done based on acks
-return;
-}
+// Only need to keep track of failed tuples if commits to Kafka 
are done after a tuple ack is received
+if (isAtLeastOnceProcessing()) {
--- End diff --

Same comment above


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147015438
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
 //  Ack ===
 @Override
 public void ack(Object messageId) {
-if (!isAtLeastOnce()) {
-// Only need to keep track of acked tuples if commits are done 
based on acks
-return;
-}
-
+// Only need to keep track of acked tuples if commits to Kafka are 
done after a tuple ack is received
--- End diff --

Done.


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147016997
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
 //  Ack ===
 @Override
 public void ack(Object messageId) {
-if (!isAtLeastOnce()) {
-// Only need to keep track of acked tuples if commits are done 
based on acks
-return;
-}
-
+// Only need to keep track of acked tuples if commits to Kafka are 
done after a tuple ack is received
 final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
-if (!emitted.contains(msgId)) {
-if (msgId.isEmitted()) {
-LOG.debug("Received ack for message [{}], associated with 
tuple emitted for a ConsumerRecord that "
-+ "came from a topic-partition that this consumer 
group instance is no longer tracking "
-+ "due to rebalance/partition reassignment. No action 
taken.", msgId);
+if (isAtLeastOnceProcessing()) {
+if (!emitted.contains(msgId)) {
+if (msgId.isEmitted()) {
+LOG.debug("Received ack for message [{}], associated 
with tuple emitted for a ConsumerRecord that "
++ "came from a topic-partition that this consumer 
group instance is no longer tracking "
++ "due to rebalance/partition reassignment. No 
action taken.", msgId);
+} else {
+LOG.debug("Received direct ack for message [{}], 
associated with null tuple", msgId);
+}
 } else {
-LOG.debug("Received direct ack for message [{}], 
associated with null tuple", msgId);
+Validate.isTrue(!retryService.isScheduled(msgId), "The 
message id " + msgId + " is queued for retry while being acked."
++ " This should never occur barring errors in the 
RetryService implementation or the spout code.");
+
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
+emitted.remove(msgId);
 }
-} else {
-Validate.isTrue(!retryService.isScheduled(msgId), "The message 
id " + msgId + " is queued for retry while being acked."
-+ " This should never occur barring errors in the 
RetryService implementation or the spout code.");
-
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
-emitted.remove(msgId);
+tupleListener.onAck(msgId);
 }
-tupleListener.onAck(msgId);
 }
 
 //  Fail ===
 @Override
 public void fail(Object messageId) {
-if (!isAtLeastOnce()) {
-// Only need to keep track of failed tuples if commits are 
done based on acks
-return;
-}
+// Only need to keep track of failed tuples if commits to Kafka 
are done after a tuple ack is received
--- End diff --

Done.


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2380#discussion_r147022902
  
--- Diff: docs/storm-kafka-client.md ---
@@ -298,25 +298,44 @@ Currently the Kafka spout has has the following 
default values, which have been
 * max.uncommitted.offsets = 1000
 
 
-# Messaging reliability modes
+# Processing Guarantees
 
-In some cases you may not need or want the spout to guarantee 
at-least-once processing of messages. The spout also supports at-most-once and 
any-times modes. At-most-once guarantees that any tuple emitted to the topology 
will never be reemitted. Any-times makes no guarantees, but may reduce the 
overhead of committing offsets to Kafka in cases where you truly don't care how 
many times a message is processed.
+The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when 
the tuple with the `ConsumerRecord` for an offset is marked
+as processed, i.e. when the offset is committed to Kafka. For 
AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
+the commit happens. When the guarantee is NONE Kafka controls when the 
commit happens.
+
+* AT_LEAST_ONCE - an offset is ready to commit only after the 
corresponding tuple has been processed (at-least-once)
+ and acked. If a tuple fails or times-out it will be re-emitted. A 
tuple can be processed more than once if for instance
+ the ack gets lost.
+
+* AT_MOST_ONCE - every offset will be committed to Kafka right after being 
polled but before being emitted
+ to the downstream components of the topology. It guarantees that the 
offset is processed at-most-once because it
+ won't retry tuples that fail or timeout after the commit to Kafka has 
been done.
+
+* NONE - the polled offsets are committed to Kafka periodically as 
controlled by the Kafka properties
+ "enable.auto.commit" and "auto.commit.interval.ms". Because the spout 
does not control when the commit happens
+ it cannot give any message processing guarantees, i.e. a message may 
be processed 0, 1 or more times.
+ This option requires "enable.auto.commit=true". If 
"enable.auto.commit=false" an exception will be thrown.
+
+To set the processing guarantee use the 
`KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows:
 
-To set the processing guarantee, use the 
KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g.
 ```java
 KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
   .builder(String bootstrapServers, String ... topics)
   .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
 ```
 
-The spout will disable tuple tracking for emitted tuples by default when 
you use at-most-once or any-times. In some cases you may want to enable 
tracking anyway, because tuple tracking is necessary for some features of 
Storm, e.g. showing complete latency in Storm UI, or enabling backpressure 
through the `Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter.
+# Tuple Tracking
+
+By default the spout only tracks emitted tuples when the processing 
guarantee is AT_LEAST_ONCE. It may be necessary to track
+emitted tuples with other processing guarantees to benefit of Storm 
features such as showing complete latency in the UI,
--- End diff --

Done


---


[GitHub] storm pull request #2387: STORM-2787: storm-kafka-client KafkaSpout method o...

2017-10-25 Thread hmcl
GitHub user hmcl opened a pull request:

https://github.com/apache/storm/pull/2387

STORM-2787: storm-kafka-client KafkaSpout method onPartitionsRevoked(...) 
should set initialized flag independently of processing guarantees



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hmcl/storm-apache 
Apache_master_STORM-2787_KSInitFlag

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2387.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2387


commit 95d10fc60669fdf6b28f28fd9a5536a905a9a26e
Author: Hugo Louro <hmclo...@gmail.com>
Date:   2017-10-23T00:44:54Z

STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees

 - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE
 - Refactor method name from setForceEnableTupleTracking to 
setTupleTrackingEnforced
 - Throw IllegalStateException instead of IllegalArgumentException if spout 
attempts to emit an already committed message
 - Update documentation to reflect these changes

commit 12473ed944a0f1499a2ba61987a223bd861e9768
Author: Hugo Louro <hmclo...@gmail.com>
Date:   2017-10-25T06:52:54Z

STORM-2787: storm-kafka-client KafkaSpout method onPartitionsRevoked(...) 
should set initialized flag independently of processing guarantees




---


[GitHub] storm pull request #2393: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-29 Thread hmcl
GitHub user hmcl opened a pull request:

https://github.com/apache/storm/pull/2393

STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees

 - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE
 - Refactor method name from setForceEnableTupleTracking to 
setTupleTrackingEnforced
 - Throw IllegalStateException instead of IllegalArgumentException if spout 
attempts to emit an already committed message
 - Update documentation to reflect these changes

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hmcl/storm-apache 
1.x-branch_STORM-2781_KSProcGtees

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2393.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2393


commit ac16fe1ee9d974af64a30769819561c0abae23af
Author: Hugo Louro <hmclo...@gmail.com>
Date:   2017-10-23T00:44:54Z

STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees

 - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE
 - Refactor method name from setForceEnableTupleTracking to 
setTupleTrackingEnforced
 - Throw IllegalStateException instead of IllegalArgumentException if spout 
attempts to emit an already committed message
 - Update documentation to reflect these changes




---


[GitHub] storm issue #2394: 1.x branch storm 2787 ks init flag

2017-10-29 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2394
  
@HeartSaVioR for 1.x-branch


---


[GitHub] storm pull request #2464: STORM-2847 1.x

2017-12-20 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2464#discussion_r158205889
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS;
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnitRule;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutReactivationTest {
+
+@Rule
+public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+@Captor
+private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
commitCapture;
+
+private final TopologyContext topologyContext = 
mock(TopologyContext.class);
+private final Map<String, Object> conf = new HashMap<>();
+private final SpoutOutputCollector collector = 
mock(SpoutOutputCollector.class);
+private final long commitOffsetPeriodMs = 2_000;
+private KafkaConsumer<String, String> consumerSpy;
+private KafkaConsumer<String, String> postReactivationConsumerSpy;
+private KafkaSpout<String, String> spout;
+private final int maxPollRecords = 10;
+
+@Before
+public void setUp() {
+KafkaSpoutConfig<String, String> spoutConfig =
+SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+KafkaSpoutConfig.builder("127.0.0.1:" + 
kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+SingleTopicKafkaSpoutConfiguration.TOPIC))
+.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
+.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
maxPollRecords)
+.build();
+KafkaConsumerFactory<String, String> consumerFactory = new 
KafkaConsumerFactoryDefault<>();
+this.consumerSpy = 
spy(consumerFactory.createConsumer(spoutConfig));
+this.postReactivationConsumerSpy = 
spy(consumerFactory.createConsumer(spoutConfig));
+KafkaConsumerFactory<String, String> consumerFactoryMock = 
mock(KafkaConsumerFactory.class);
--- End diff --

why do you need consumerFactory and consumerFactoryMock?


---


[GitHub] storm issue #2464: STORM-2847 1.x

2017-12-20 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2464
  
Can you please squash the commits. You can merge after.


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-05-05 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r186253569
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,39 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Translation from `storm-kafka` to `storm-kafka-client` spout properties
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/SpoutConfig.html)
 and

+[KafkaConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/KafkaConfig.html).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://storm.apache.org/releases/1.0.6/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.html)
 
+and Kafka 0.10.1.0 
[ConsumerConfig](https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html).
+
+| SpoutConfig   | KafkaSpoutConfig/ConsumerConfig Name | KafkaSpoutConfig 
Usage |
--- End diff --

I suggest removing Name as ConsumerConfig is very explanatory


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-05-05 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r186253597
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,39 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Translation from `storm-kafka` to `storm-kafka-client` spout properties
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/SpoutConfig.html)
 and

+[KafkaConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/KafkaConfig.html).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://storm.apache.org/releases/1.0.6/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.html)
 
+and Kafka 0.10.1.0 
[ConsumerConfig](https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html).
+
+| SpoutConfig   | KafkaSpoutConfig/ConsumerConfig Name | KafkaSpoutConfig 
Usage |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | 
[`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.htm
 
l#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | **Setting:** 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Setting:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for 
network requests. The buffer size which consumer has for pulling data from 
producer  **Default:** `1MB`| **Setting:** 
[`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG)
  The size of the TCP receive buffer (SO_RCVBUF) to use when reading 
data. If the value is -1, the OS default will be used | 
[`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `socketTimeoutMs` **Default:** `1` | **N/A** ||
+| **Setting:** `useStartOffsetTimeIfOffsetOutOfRange` **Default:** 
`true` | **Setting:** 
[`auto.offset.reset`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG)
 **Possible values:** `"latest"`, `"earliest"`, `"none"` **Default:** 
`latest`. Exception: `earliest` if 
[`ProcessingGuarantee`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.ht

[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-05-05 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r186253552
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,39 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Translation from `storm-kafka` to `storm-kafka-client` spout properties
--- End diff --

NIT: Rename translation to Mapping. This is not really a translation.


---


[GitHub] storm issue #2637: Map of Spout configurations from `storm-kafka` to `storm-...

2018-05-05 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2637
  
@srishtyagrawal Thank you for the code review. It is much better now. 
Besides my two comments above, I still wonder if it would be better to point 
the links with the description of the Kafka properties to the top of the [New 
Consumer 
Configs](http://kafka.apache.org/10/documentation.html#newconsumerconfigs) 
table. The Javadocs have no description of the property and are basically the 
name of the property written using capital letters and underscores. This won't 
be very helpful to the user. It's better to be pointed to the table. The user 
will know then that he has to search for the name of the property there.


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-05-05 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r186253611
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,39 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Translation from `storm-kafka` to `storm-kafka-client` spout properties
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/SpoutConfig.html)
 and

+[KafkaConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/KafkaConfig.html).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://storm.apache.org/releases/1.0.6/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.html)
 
+and Kafka 0.10.1.0 
[ConsumerConfig](https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html).
+
+| SpoutConfig   | KafkaSpoutConfig/ConsumerConfig Name | KafkaSpoutConfig 
Usage |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | 
[`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.htm
 
l#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | **Setting:** 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Setting:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for 
network requests. The buffer size which consumer has for pulling data from 
producer  **Default:** `1MB`| **Setting:** 
[`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG)
  The size of the TCP receive buffer (SO_RCVBUF) to use when reading 
data. If the value is -1, the OS default will be used | 
[`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `socketTimeoutMs` **Default:** `1` | **N/A** ||
+| **Setting:** `useStartOffsetTimeIfOffsetOutOfRange` **Default:** 
`true` | **Setting:** 
[`auto.offset.reset`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG)
 **Possible values:** `"latest"`, `"earliest"`, `"none"` **Default:** 
`latest`. Exception: `earliest` if 
[`ProcessingGuarantee`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.ht

[GitHub] storm pull request #2667: STORM-3063: Fix minor pom issues

2018-05-08 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2667#discussion_r186887493
  
--- Diff: pom.xml ---
@@ -1275,6 +1270,25 @@
 
true
 
 
+
+org.apache.maven.plugins
+maven-enforcer-plugin
+
+
+enforce-maven
--- End diff --

since there is only one execution, is 'id' really necessary ?


---


[GitHub] storm issue #2667: STORM-3063: Fix minor pom issues

2018-05-08 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2667
  
+1


---


[GitHub] storm pull request #2667: STORM-3063: Fix minor pom issues

2018-05-09 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2667#discussion_r187100160
  
--- Diff: pom.xml ---
@@ -1275,6 +1270,25 @@
 
true
 
 
+
+org.apache.maven.plugins
+maven-enforcer-plugin
+
+
+enforce-maven
--- End diff --

Agree. Would 'enforce-maven-version' be more descriptive?


---


[GitHub] storm issue #2637: STORM-3060: Map of Spout configurations from storm-kafka ...

2018-05-08 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2637
  
+1
@srishtyagrawal thank you for your nice and helpful contribution. It will 
benefit a lot of users.


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185160807
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | **Import package:** 
`org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.`
  **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` 
  **Usage:** 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Kafka config:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| **Import package:** `import 
org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for 
network requests. The buffer size which consumer has for pulling data from 
producer  **Default:** `1MB`| **Kafka config:** 
[`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG)
  The size of the TCP receive buffer (SO_RCVBUF) to use when reading 
data. If the value is -1, the OS default will be used | **Import package:** 
`import org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `socketTimeoutMs` **Default:** `1` | 
Discontinued in `storm-kafka-client` ||
--- End diff --

Instead of discontinued I would put **N/A**


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185168182
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | **Import package:** 
`org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.`
  **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` 
  **Usage:** 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Kafka config:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| **Import package:** `import 
org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
--- End diff --

the (**max?**) number of bytes to attempt ... 


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185163739
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
--- End diff --

Do you really want to mean migration, or rather some sort of parallel 
between the name and meaning of the properties in storm-kafka vs 
storm-kafka-client. This may imply that there is a way to migrate, whereas I 
don't really have a migration, but rather a way to specify the same behavior in 
the old and new spout.


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185168807
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | **Import package:** 
`org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.`
  **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` 
  **Usage:** 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Kafka config:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| **Import package:** `import 
org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for 
network requests. The buffer size which consumer has for pulling data from 
producer  **Default:** `1MB`| **Kafka config:** 
[`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG)
  The size of the TCP receive buffer (SO_RCVBUF) to use when reading 
data. If the value is -1, the OS default will be used | **Import package:** 
`import org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `socketTimeoutMs` **Default:** `1` | 
Discontinued in `storm-kafka-client` ||
+| **Setting:** `useStartOffsetTimeIfOffsetOutOfRange` **Default:** 
`true` | **Kafka config:** 
[`auto.offset.reset`](https://kafka.apache.org/11/javadoc/org/apache

[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185161496
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | **Import package:** 
`org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.`
  **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
--- End diff --

Are the "import package:" entries throughout necessary ? The ConsumerConfig 
strings come all from the same Kafka package, and the KafkaSpoutConfig 
configurations already need to have the package imported when the set* method 
is declared in the code.

It seems most "Usage:" web links are broken. Wouldn't it be better to 
simply paste the method signature and put the link for that same signature?


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185168952
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | **Import package:** 
`org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.`
  **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` 
  **Usage:** 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Kafka config:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| **Import package:** `import 
org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
--- End diff --

I suggest omitting "Kafka config:". The link is self explanatory


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185168669
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | **Import package:** 
`org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.`
  **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` 
  **Usage:** 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Kafka config:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| **Import package:** `import 
org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for 
network requests. The buffer size which consumer has for pulling data from 
producer  **Default:** `1MB`| **Kafka config:** 
[`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG)
  The size of the TCP receive buffer (SO_RCVBUF) to use when reading 
data. If the value is -1, the OS default will be used | **Import package:** 
`import org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `socketTimeoutMs` **Default:** `1` | 
Discontinued in `storm-kafka-client` ||
+| **Setting:** `useStartOffsetTimeIfOffsetOutOfRange` **Default:** 
`true` | **Kafka config:** 
[`auto.offset.reset`](https://kafka.apache.org/11/javadoc/org/apache

[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185167619
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
--- End diff --

The storm version is already specified above and could be omitted in the 
table.  IF possible I would suggest to present the table such as:
```
| SpoutConfig   | KafkaSpoutConfig   |  
---
| prop | desc | default | prop  | desc | default |
```


---


[GitHub] storm issue #2637: Map of Spout configurations from `storm-kafka` to `storm-...

2018-04-30 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2637
  
@erikdw I am reviewing this now. Sorry but I was away the last few days.


---


[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

2017-10-22 Thread hmcl
GitHub user hmcl opened a pull request:

https://github.com/apache/storm/pull/2380

STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees

 - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE
 - Refactor method name from setForceEnableTupleTracking to 
setTupleTrackingEnforced
 - Throw IllegalStateException instead of IllegalArgumentException if spout 
attempts to emit an already committed message
 - Update documentation to reflect these changes

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hmcl/storm-apache 
Apache_master_STORM-2781_KSProcGtees

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2380.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2380


commit 6a5c3c4b043a8ddd1224fb14f55a512b810d05b7
Author: Hugo Louro <hmclo...@gmail.com>
Date:   2017-10-23T00:44:54Z

STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees

 - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE
 - Refactor method name from setForceEnableTupleTracking to 
setTupleTrackingEnforced
 - Throw IllegalStateException instead of IllegalArgumentException if spout 
attempts to emit an already committed message
 - Update documentation to reflect these changes




---


[GitHub] storm pull request #2381: STORM-2784: storm-kafka-client KafkaTupleListener ...

2017-10-22 Thread hmcl
GitHub user hmcl opened a pull request:

https://github.com/apache/storm/pull/2381

STORM-2784: storm-kafka-client KafkaTupleListener method onPartitions…

…Reassigned() should be called after initialization is complete

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hmcl/storm-apache 
Apache_master_STORM-2784_KTLOPR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2381.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2381


commit ae827dac08fa67997c5eb8e6c23828f190de2851
Author: Hugo Louro <hmclo...@gmail.com>
Date:   2017-10-23T04:35:29Z

STORM-2784: storm-kafka-client KafkaTupleListener method 
onPartitionsReassigned() should be called after initialization is complete




---


[GitHub] storm issue #2387: STORM-2787: storm-kafka-client KafkaSpout method onPartit...

2017-10-27 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2387
  
@srdo @HeartSaVioR I have incorporated the code review changes of the 
depending patch. It should be good to merge. Thanks.


---


[GitHub] storm issue #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout Proces...

2017-10-27 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2380
  
I have squashed the commits and addressed the early return issue.


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120269
  
--- Diff: external/storm-solr/README.md ---
@@ -97,6 +97,29 @@ field separates each value with the token % instead of 
the default | . To use th
 .setMultiValueFieldToken("%").build();
 ```
 
+##Working with Kerberized Solr
+If your topology is going to interact with kerberized Solr, your 
bolts/states need to be authenticated by Solr Server. We can enable
+authentication by distributing keytabs for solr user on all worker hosts. 
We can configure the solr bolt to use keytabs by setting
+SolrConfig.enableKerberos config property.
+
+On worker hosts the bolt/trident-state code will use the keytab file with 
principal provided in the jaas config to authenticate with
+Solr. You need to specify a Kerberos principal for the client and a 
corresponding keytab in the JAAS client configuration file.
+Also make sure the provided principal is configured with required 
permissions to access solr collections.
+
+Here’s an example JAAS config:
+
+`SolrJClient {
+  com.sun.security.auth.module.Krb5LoginModule required
+  useKeyTab=true
+  keyTab="/keytabs/foo.keytab"
--- End diff --

/keytabs/solr.keytab ? Perhaps we could put in here an entry that matches 
what Ambari typically creates.


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159121022
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilderV2.java
 ---
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.solr.schema.builder;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
+import org.apache.solr.client.solrj.request.schema.FieldTypeDefinition;
+import org.apache.solr.client.solrj.request.schema.SchemaRequest;
+import org.apache.solr.client.solrj.response.schema.SchemaRepresentation;
+import org.apache.solr.client.solrj.response.schema.SchemaResponse;
+import org.apache.storm.solr.config.SolrConfig;
+import org.apache.storm.solr.schema.CopyField;
+import org.apache.storm.solr.schema.Field;
+import org.apache.storm.solr.schema.FieldType;
+import org.apache.storm.solr.schema.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class that builds the {@link Schema} object from the schema returned by 
the SchemaRequest
+ */
+public class RestJsonSchemaBuilderV2 implements SchemaBuilder {
+private static final Logger logger = 
LoggerFactory.getLogger(RestJsonSchemaBuilderV2.class);
+private Schema schema = new Schema();
+private SolrConfig solrConfig;
+private String collection;
+
+public RestJsonSchemaBuilderV2(SolrConfig solrConfig, String 
collection) {
+this.solrConfig = solrConfig;
+this.collection = collection;
+}
+
+@Override
+public void buildSchema() throws IOException {
+SolrClient solrClient = null;
+try {
+if (solrConfig.enableKerberos())
+HttpClientUtil.setConfigurer(new 
Krb5HttpClientConfigurer());
+
+solrClient = new CloudSolrClient(solrConfig.getZkHostString());
--- End diff --

The initial code was building the schema from the JSON representation. The 
other class has been deprecated, which means that JSON is no longer supported. 
Is there a reason to support both? If so, there should probably be a factory 
that depending on configuration choice (e.g. Kerberos) would build one or 
another.

The goal of using JSON was to avoid using all of this programatic setting. 


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120398
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
 ---
@@ -153,4 +160,15 @@ private void failQueuedTuples(List 
failedTuples) {
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) { }
 
+@Override
+public void cleanup() {
+if (solrClient != null) {
+try {
+solrClient.close();
+} catch (IOException e) {
+LOG.debug("Error while closing solrClient", e);
--- End diff --

should it be error level?


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120315
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
 ---
@@ -88,9 +92,11 @@ private int capacity() {
 @Override
 protected void process(Tuple tuple) {
 try {
+LOG.debug("Processing Tuple: {}", tuple);
--- End diff --

Storm provides these debug log messages when the Config has setDebug(true). 
Therefore we don't usually add these types of logging.


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120320
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
 ---
@@ -128,6 +134,7 @@ private void fail(Tuple tuple, Exception e) {
 List failedTuples = getQueuedTuples();
 failQueuedTuples(failedTuples);
 }
+LOG.debug("Failed Tuple: {}", tuple, e);
--- End diff --

Same as above


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120430
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/schema/SolrFieldTypeFinder.java
 ---
@@ -70,13 +73,22 @@ public String toString() {
 /**
  * Initiates class containing all the information relating fields with 
their types.
  * This information is parsed from the schema
- * @param schema SolrSchema containing the information about fields 
and types
+ * @param schemaBuilder schemaBuilder to build the information about 
fields and types
  * */
-public SolrFieldTypeFinder(Schema schema) {
-if (schema == null) {
-throw new IllegalArgumentException("Schema object is null");
+public SolrFieldTypeFinder(SchemaBuilder schemaBuilder) {
+this.schemaBuilder = schemaBuilder;
+}
+
+public void initialize() {
+if (schemaBuilder == null) {
+throw new IllegalArgumentException("schemaBuilder object is 
null");
--- End diff --

Should this IllegalArgumentException  validation be done in the constructor 
to avoid this exception from occurring at runtime?  If it is intended to be 
done here, perhaps it should be IllegalStateException.


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120287
  
--- Diff: external/storm-solr/README.md ---
@@ -171,7 +194,7 @@ Querying  Solr for these patterns, you will see the 
values that have been indexe
 
 curl -X GET -H "Content-type:application/json" -H 
"Accept:application/json" 
http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*id_fields_test_val*=json=true
 
-curl -X GET -H "Content-type: application/json" -H "Accept: 
application/json" 
http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*id_fields_test_val*=json=true
+curl -X GET -H "Content-type:application/json" -H 
"Accept:application/json" 
http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*json_test_val*=json=true
--- End diff --

Was this a bug?


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120311
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
 ---
@@ -88,9 +92,11 @@ private int capacity() {
 @Override
 protected void process(Tuple tuple) {
 try {
+LOG.debug("Processing Tuple: {}", tuple);
 SolrRequest request = solrMapper.toSolrRequest(tuple);
 solrClient.request(request, solrMapper.getCollection());
 ack(tuple);
+LOG.debug("Acked Tuple: {}", tuple);
--- End diff --

Same as above


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120406
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java 
---
@@ -54,4 +65,7 @@ public int getTickTupleInterval() {
 return tickTupleInterval;
 }
 
+public boolean enableKerberos() {
--- End diff --

NIT: IsKerberosEnabled() or isEnableKerberos() ?


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159119896
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier<Map<TopicPartition, OffsetManager>> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
+}
+
+@Override
+public Object getValueAndReset() {
+
+Map<TopicPartition, OffsetManager> offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
+return null;
+}
+
+Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
+Set topicPartitions = offsetManagers.keySet();
+
+Map<TopicPartition, Long> beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
+Map<TopicPartition, Long> endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
+Map<String, Long> result = new HashMap<>();
--- End diff --

it would be useful to have a comment saying what is in this result map


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159119706
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -739,4 +764,9 @@ public boolean shouldPoll() {
 return !this.pollablePartitions.isEmpty();
 }
 }
+
+@VisibleForTesting
+KafkaOffsetMetric getKafkaOffsetMetric() {
--- End diff --

If we start adding a lot of these test methods we would be better off but 
creating a class in the tests packaged called KafkaSpoutTest that extends 
KafkaSpout and use that one in the tests. All of these methods should go in 
this class. WE don't want this class to be very bloated.


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159119938
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier<Map<TopicPartition, OffsetManager>> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
--- End diff --

What's the reasoning behind passing Supplier rather than the actual object?


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159119878
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier<Map<TopicPartition, OffsetManager>> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
+}
+
+@Override
+public Object getValueAndReset() {
+
+Map<TopicPartition, OffsetManager> offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
--- End diff --

Should this be INFO level? Is this going to print this message periodically?


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-22 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r158573100
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *  
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnitRule;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.junit.Before;
+import org.junit.Rule;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public abstract class KafkaSpoutAbstractTest {
+@Rule
+public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+@Captor
+ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+final TopologyContext topologyContext = mock(TopologyContext.class);
+final Map<String, Object> conf = new HashMap<>();
+final SpoutOutputCollector collector = 
mock(SpoutOutputCollector.class);
+final long commitOffsetPeriodMs = 2_000;
+final int maxRetries = 3;
+KafkaConsumer<String, String> consumerSpy;
+KafkaConsumerFactory<String, String> consumerFactory;
+KafkaSpout<String, String> spout;
+final int maxPollRecords = 10;
+
+@Before
+public void setUp() {
+MockitoAnnotations.initMocks(this);
+
+final KafkaSpoutConfig<String, String> spoutConfig = 
createSpoutConfig();
+
+consumerSpy = spy(new KafkaConsumerFactoryDefault<String, 
String>().createConsumer(spoutConfig));
+
+consumerFactory = new KafkaConsumerFactory<String, String>() {
+@Override
+public KafkaConsumer<String, String> 
createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
+return consumerSpy;
+}
+
+};
+
+spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+}
+
+
+abstract KafkaSpoutConfig<String, String> createSpoutConfig();
+
+void prepareSpout(int messageCount) throws Exception {
+
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), 
SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
+SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
+}
+
+/**
+ * Helper method to in sequence do:
+ * 
+ * spout.nexTuple()
+ * verify messageId
+ * spout.ack(msgId)
+ * reset(collector) to be able to reuse mock
+ * 
+ *
+ * @param offset offset of message to be verified
+ * @return {@link ArgumentCaptor} of the messageId verified
+ */
+ArgumentCaptor 
nextTuple_verifyEmitted_ack_resetCollectorMock(int offs

[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-23 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2465
  
@srdo done. Pls check and I will squash the commits right away. I would 
like to try to merge this in today. I will update the master PR with everything 
squashed already. Thanks.


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-22 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r158572508
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.utils.Time;
+import org.junit.Test;
+
+import java.util.regex.Pattern;
+
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+public class KafkaSpoutTopologyDeployActivateDeactivateTest extends 
KafkaSpoutAbstractTest {
+@Override
+KafkaSpoutConfig<String, String> createSpoutConfig() {
+return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+KafkaSpoutConfig.builder("127.0.0.1:" + 
kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)))
+.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+.setRetry(new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
+maxRetries, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
+
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
maxPollRecords)
+.build();
+}
+
+@Test
+public void 
test_FirstPollStrategy_Earliest_NotEnforced_OnTopologyActivateDeactivate() 
throws Exception {
+try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
--- End diff --

Done


---


[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-22 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2465
  
@srdo It should be OK now. If you have further requests lest's file a 
refactoring JIRA and include in it refactoring some of the unit tests for 
better code reuse.


---


<    2   3   4   5   6   7   8   >