[1/2] storm git commit: Add KafkaTupleListener to storm-kafka-client
Repository: storm Updated Branches: refs/heads/1.x-branch 10ab96a5c -> c4a09d311 Add KafkaTupleListener to storm-kafka-client Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/43625c26 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/43625c26 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/43625c26 Branch: refs/heads/1.x-branch Commit: 43625c2637b65f4452cb21f9983274daf6d7f357 Parents: 4f19fee Author: Bijan FahimiAuthored: Wed Aug 30 10:16:31 2017 +0200 Committer: Bijan Fahimi Committed: Wed Aug 30 10:16:31 2017 +0200 -- .../kafka/spout/EmptyKafkaTupleListener.java| 53 + .../apache/storm/kafka/spout/KafkaSpout.java| 10 +++ .../storm/kafka/spout/KafkaSpoutConfig.java | 38 +++-- .../storm/kafka/spout/KafkaTupleListener.java | 83 4 files changed, 177 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/43625c26/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java new file mode 100644 index 000..621fecd --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.storm.kafka.spout; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + +public final class EmptyKafkaTupleListener implements KafkaTupleListener { + +@Override +public void open(Map conf, TopologyContext context) { } + +@Override +public void onEmit(List tuple, KafkaSpoutMessageId msgId) { } + +@Override +public void onAck(KafkaSpoutMessageId msgId) { } + +@Override +public void onPartitionsReassigned(Collection topicPartitions) { } + +@Override +public void onRetry(KafkaSpoutMessageId msgId) { } + +@Override +public void onMaxRetryReached(KafkaSpoutMessageId msgId) { } + +@Override +public String toString() { +return "EmptyKafkaTupleListener"; +} +} http://git-wip-us.apache.org/repos/asf/storm/blob/43625c26/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 6631930..9f806b5 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -73,6 +73,7 @@ public class KafkaSpout extends BaseRichSpout { // Bookkeeping private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure +private transient KafkaTupleListener tupleListener; // Handles tuple events (emit, ack etc.) private transient Timer commitTimer;// timer == null for auto commit mode private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. // Initialization is only complete after the first call to
[1/2] storm git commit: Add KafkaTupleListener to storm-kafka-client
Repository: storm Updated Branches: refs/heads/master d06bb3856 -> 08d9d5a87 Add KafkaTupleListener to storm-kafka-client Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6fef40b4 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6fef40b4 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6fef40b4 Branch: refs/heads/master Commit: 6fef40b4fbdad74b61fd54ad48085699a74c569c Parents: 0bf7e70 Author: Bijan FahimiAuthored: Thu Aug 17 21:24:00 2017 +0200 Committer: Bijan Fahimi Committed: Thu Aug 24 17:37:57 2017 +0200 -- .../kafka/spout/EmptyKafkaTupleListener.java| 53 + .../apache/storm/kafka/spout/KafkaSpout.java| 15 +++- .../storm/kafka/spout/KafkaSpoutConfig.java | 26 +- .../storm/kafka/spout/KafkaTupleListener.java | 83 4 files changed, 174 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/6fef40b4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java new file mode 100644 index 000..621fecd --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.storm.kafka.spout; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + +public final class EmptyKafkaTupleListener implements KafkaTupleListener { + +@Override +public void open(Map conf, TopologyContext context) { } + +@Override +public void onEmit(List tuple, KafkaSpoutMessageId msgId) { } + +@Override +public void onAck(KafkaSpoutMessageId msgId) { } + +@Override +public void onPartitionsReassigned(Collection topicPartitions) { } + +@Override +public void onRetry(KafkaSpoutMessageId msgId) { } + +@Override +public void onMaxRetryReached(KafkaSpoutMessageId msgId) { } + +@Override +public String toString() { +return "EmptyKafkaTupleListener"; +} +} http://git-wip-us.apache.org/repos/asf/storm/blob/6fef40b4/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index e4b53ab..64e24a6 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -73,7 +73,9 @@ public class KafkaSpout extends BaseRichSpout { // Strategy to determine the fetch offset of the first realized by the spout upon activation private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Class that has the logic to handle tuple failure -private transient KafkaSpoutRetryService retryService; +private transient KafkaSpoutRetryService retryService; +// Handles tuple events (emit, ack etc.) +private transient KafkaTupleListener tupleListener; // timer == null for auto commit mode private transient Timer commitTimer; // Flag indicating that the spout is still undergoing initialization process. @@ -95,7 +97,7 @@ public class KafkaSpout extends BaseRichSpout { public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { -