[1/2] storm git commit: Add KafkaTupleListener to storm-kafka-client

2017-08-30 Thread srdo
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 10ab96a5c -> c4a09d311


Add KafkaTupleListener to storm-kafka-client


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/43625c26
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/43625c26
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/43625c26

Branch: refs/heads/1.x-branch
Commit: 43625c2637b65f4452cb21f9983274daf6d7f357
Parents: 4f19fee
Author: Bijan Fahimi 
Authored: Wed Aug 30 10:16:31 2017 +0200
Committer: Bijan Fahimi 
Committed: Wed Aug 30 10:16:31 2017 +0200

--
 .../kafka/spout/EmptyKafkaTupleListener.java| 53 +
 .../apache/storm/kafka/spout/KafkaSpout.java| 10 +++
 .../storm/kafka/spout/KafkaSpoutConfig.java | 38 +++--
 .../storm/kafka/spout/KafkaTupleListener.java   | 83 
 4 files changed, 177 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/43625c26/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
new file mode 100644
index 000..621fecd
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+
+package org.apache.storm.kafka.spout;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+public final class EmptyKafkaTupleListener implements KafkaTupleListener {
+
+@Override
+public void open(Map conf, TopologyContext context) { }
+
+@Override
+public void onEmit(List tuple, KafkaSpoutMessageId msgId) { }
+
+@Override
+public void onAck(KafkaSpoutMessageId msgId) { }
+
+@Override
+public void onPartitionsReassigned(Collection 
topicPartitions) { }
+
+@Override
+public void onRetry(KafkaSpoutMessageId msgId) { }
+
+@Override
+public void onMaxRetryReached(KafkaSpoutMessageId msgId) { }
+
+@Override
+public String toString() {
+return "EmptyKafkaTupleListener";
+}
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/43625c26/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 6631930..9f806b5 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -73,6 +73,7 @@ public class KafkaSpout extends BaseRichSpout {
 // Bookkeeping
 private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  // 
Strategy to determine the fetch offset of the first realized by the spout upon 
activation
 private transient KafkaSpoutRetryService retryService;  // 
Class that has the logic to handle tuple failure
+private transient KafkaTupleListener tupleListener; // 
Handles tuple events (emit, ack etc.)
 private transient Timer commitTimer;// 
timer == null for auto commit mode
 private transient boolean initialized;  // 
Flag indicating that the spout is still undergoing initialization process.
 // Initialization is only complete after the first call to  

[1/2] storm git commit: Add KafkaTupleListener to storm-kafka-client

2017-08-29 Thread srdo
Repository: storm
Updated Branches:
  refs/heads/master d06bb3856 -> 08d9d5a87


Add KafkaTupleListener to storm-kafka-client


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6fef40b4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6fef40b4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6fef40b4

Branch: refs/heads/master
Commit: 6fef40b4fbdad74b61fd54ad48085699a74c569c
Parents: 0bf7e70
Author: Bijan Fahimi 
Authored: Thu Aug 17 21:24:00 2017 +0200
Committer: Bijan Fahimi 
Committed: Thu Aug 24 17:37:57 2017 +0200

--
 .../kafka/spout/EmptyKafkaTupleListener.java| 53 +
 .../apache/storm/kafka/spout/KafkaSpout.java| 15 +++-
 .../storm/kafka/spout/KafkaSpoutConfig.java | 26 +-
 .../storm/kafka/spout/KafkaTupleListener.java   | 83 
 4 files changed, 174 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/6fef40b4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
new file mode 100644
index 000..621fecd
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+
+package org.apache.storm.kafka.spout;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+public final class EmptyKafkaTupleListener implements KafkaTupleListener {
+
+@Override
+public void open(Map conf, TopologyContext context) { }
+
+@Override
+public void onEmit(List tuple, KafkaSpoutMessageId msgId) { }
+
+@Override
+public void onAck(KafkaSpoutMessageId msgId) { }
+
+@Override
+public void onPartitionsReassigned(Collection 
topicPartitions) { }
+
+@Override
+public void onRetry(KafkaSpoutMessageId msgId) { }
+
+@Override
+public void onMaxRetryReached(KafkaSpoutMessageId msgId) { }
+
+@Override
+public String toString() {
+return "EmptyKafkaTupleListener";
+}
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6fef40b4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index e4b53ab..64e24a6 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -73,7 +73,9 @@ public class KafkaSpout extends BaseRichSpout {
 // Strategy to determine the fetch offset of the first realized by the 
spout upon activation
 private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  
 // Class that has the logic to handle tuple failure
-private transient KafkaSpoutRetryService retryService;  
+private transient KafkaSpoutRetryService retryService;
+// Handles tuple events (emit, ack etc.)
+private transient KafkaTupleListener tupleListener;
 // timer == null for auto commit mode
 private transient Timer commitTimer;
 // Flag indicating that the spout is still undergoing initialization 
process.
@@ -95,7 +97,7 @@ public class KafkaSpout extends BaseRichSpout {
 
 
 public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) {
-