[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96925159 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java --- @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt.selector; + +import org.apache.storm.tuple.Tuple; + +import java.io.Serializable; + +public interface KafkaTopicSelector extends Serializable { +String getTopic(Tuple tuple); --- End diff -- File another JIRA and we can look into it. I think it is beyond the scope of this one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96925279 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java --- @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident.mapper; + +import org.apache.storm.trident.tuple.TridentTuple; + +public class FieldNameBasedTupleToKafkaMapper implements TridentTupleToKafkaMapper { + +public final String keyFieldName; +public final String msgFieldName; + +public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) { +this.keyFieldName = keyFieldName; +this.msgFieldName = msgFieldName; +} + +@Override +public K getKeyFromTuple(TridentTuple tuple) { +return (K) tuple.getValueByField(keyFieldName); --- End diff -- It is possible but that is the point of the generics. To try and reduce the likelihood of it happening. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96924990 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java --- @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.FailedException; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; +import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class TridentKafkaState implements State { +private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class); + +private KafkaProducer producer; +private OutputCollector collector; + +private TridentTupleToKafkaMapper mapper; +private KafkaTopicSelector topicSelector; + +public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) { +this.mapper = mapper; +return this; +} + +public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector selector) { +this.topicSelector = selector; +return this; +} + +@Override +public void beginCommit(Long txid) { +LOG.debug("beginCommit is Noop."); +} + +@Override +public void commit(Long txid) { +LOG.debug("commit is Noop."); +} + +public void prepare(Properties options) { +if (mapper == null) throw new NullPointerException("mapper can not be null"); --- End diff -- Objects does not exist in java 6 and I would prefer to keep the code compatible as mush as possible to avoid extra rework when pulling these changes back. If you insist I will do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96923940 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java --- @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.FailedException; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; +import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class TridentKafkaState implements State { +private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class); + +private KafkaProducer producer; +private OutputCollector collector; + +private TridentTupleToKafkaMapper mapper; +private KafkaTopicSelector topicSelector; + +public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) { --- End diff -- This is code that was "moved" like with the KafkaBolt for storm-kafka to storm-kafka-client. If we really want to make it immutable we can, but I think that is beyond the scope of this JIRA --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96923481 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java --- @@ -44,29 +43,30 @@ // Bookkeeping private final KafkaSpoutConfig kafkaSpoutConfig; // Declare some KafkaSpoutConfig references for convenience -private KafkaSpoutStreams kafkaSpoutStreams;// Object that wraps all the logic to declare output fields and emit tuples -private KafkaSpoutTuplesBuilder tuplesBuilder;// Object that contains the logic to build tuples for each ConsumerRecord +private final Fields fields; public KafkaTridentSpoutManager(KafkaSpoutConfig kafkaSpoutConfig) { this.kafkaSpoutConfig = kafkaSpoutConfig; -kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams(); -tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder(); +RecordTranslator translator = kafkaSpoutConfig.getTranslator(); +Fields fields = null; --- End diff -- I'm a bit confused about what benefit that would give. The Fields come from the SpoutConfig, by way of the RecordTranslator. Why would you want to override the Fields but not the RecordTranslator that must conform to those fields? I just don't see value it separating two things that are highly coupled together. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96911529 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java --- @@ -15,14 +15,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.storm.kafka.spout; -package org.apache.storm.kafka.spout.internal.partition; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; -import java.util.List; +public class RoundRobinManualPartitioner implements ManualPartitioner { -public interface KafkaPartitionReader { -List readPartitions(KafkaConsumer consumer); + @Override + public List partition(List allPartitions, TopologyContext context) { + int thisTaskIndex = context.getThisTaskIndex(); + int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size(); + Set myPartitions = new HashSet<>(allPartitions.size()/totalTaskCount+1); + for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) { --- End diff -- No. `i < allPartitions.size()` guarantees that we will never call get on allPartitions with an index that is out of bounds. The Set is just setting the initial size to avoid more memory allocation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96909498 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java --- @@ -15,22 +15,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.storm.kafka.spout; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.List; - -public class KafkaSpoutTuplesBuilderWildcardTopics implements KafkaSpoutTuplesBuilder { -private KafkaSpoutTupleBuilder tupleBuilder; +import org.apache.storm.tuple.Values; -public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder tupleBuilder) { -this.tupleBuilder = tupleBuilder; +/** + * A list of Values in a tuple that can be routed + * to a given stream. + */ +public class KafkaTuple extends Values { +private static final long serialVersionUID = 4803794470450587992L; +private String stream = null; + +public KafkaTuple() { +super(); +} + +public KafkaTuple(Object... vals) { +super(vals); +} + +public KafkaTuple routedTo(String stream) { --- End diff -- This is because the constructor is varadic following the Values parent class. It is ambiguous to have ``` public KafkaTuple(Object... vals) ``` and any other constructor. Java complains. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96908491 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.storm.task.TopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Subscribe to all topics that follow a given list of values + */ +public class NamedSubscription extends Subscription { +private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class); +private static final long serialVersionUID = 3438543305215813839L; +protected final Collection topics; + +public NamedSubscription(Collection topics) { +super(); +this.topics = Collections.unmodifiableCollection(new ArrayList<>(topics)); +} + +public NamedSubscription(String ... topics) { +this(Arrays.asList(topics)); +} + +@Override +public void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener, TopologyContext unused) { +consumer.subscribe(topics, listener); +LOG.info("Kafka consumer subscribed topics {}", topics); +} + +@Override +public String getTopicsString() { --- End diff -- It is in the parent javadocs. That is around the override --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96908178 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java --- @@ -1,16 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.kafka.spout; -import org.apache.kafka.common.TopicPartition; - import java.util.Comparator; +import org.apache.kafka.common.TopicPartition; + public class TopicPartitionComparator implements Comparator { -@Override -public int compare(TopicPartition o1, TopicPartition o2) { -if (!o1.topic().equals(o2.topic())) { -return o1.topic().compareTo(o2.topic()); -} else { -return o1.partition() - o2.partition(); -} -} + public static final TopicPartitionComparator INSTANCE = new TopicPartitionComparator(); --- End diff -- I'm not sure we need to guarantee it. The old code had the INSTANCE where it was used, I moved it here in hopes that others might use it. I could make the constructor private if we really want it to be a singleton, but I don't think it is a requirement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96907232 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -325,15 +310,19 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord record) { } else { boolean isScheduled = retryService.isScheduled(msgId); if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried -final List tuple = tuplesBuilder.buildTuple(record); -kafkaSpoutStreams.emit(collector, tuple, msgId); +final List tuple = kafkaSpoutConfig.getTranslator().apply(record); +if (tuple instanceof KafkaTuple) { --- End diff -- Yes, and it is documented in RecordTranslator. I will add in more documentation on it though. The reason specifically for this was because the spout is not able to keep track of a single message being emitted to multiple streams. It would get confused and ack it before it was truly done. This makes it impossible for that to happen. What is more the built in record translators should cover 99% of the use cases, so the fact that it is not super well documented should be more of a corner case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96897291 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -325,15 +310,19 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord record) { } else { boolean isScheduled = retryService.isScheduled(msgId); if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried -final List tuple = tuplesBuilder.buildTuple(record); -kafkaSpoutStreams.emit(collector, tuple, msgId); +final List tuple = kafkaSpoutConfig.getTranslator().apply(record); +if (tuple instanceof KafkaTuple) { --- End diff -- The javadocs for RecordTranslator state. ``` * @return the objects in the tuple. Return a {@link KafkaTuple} * if you want to route the tuple to a non-default stream ``` All of the provided implementations support this. Both Simple and ByTopic, by way of the SimpleRecordTranslator. I will add a section in the docs to talk about this, but it only really matters if you are writing your own record translator from scratch, instead of using the built in ones that should cover the vast majority of the use cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96895148 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.io.Serializable; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.storm.task.TopologyContext; + +/** + * A subscription to kafka. + */ +public abstract class Subscription implements Serializable { +private static final long serialVersionUID = -216136367240198716L; + +/** + * Subscribe the KafkaConsumer to the proper topics + * @param consumer the Consumer to get. + * @param listener the rebalance listener to include in the subscription + */ +public void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener, TopologyContext context) { + subscribe(consumer, listener); +} + +/** + * Subscribe the KafkaConsumer to the proper topics + * @param consumer the Consumer to get. + * @param listener the rebalance listener to include in the subscription + * @deprecated please use the version with the TopologyContext in it + */ +public void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener) { --- End diff -- I didn't feel good about deprecating it in the first place and will remove it. It just means that I will also pull in the changes I made for STORM-2236 back to 1.1 too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96894587 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.tuple.Fields; + +public class ByTopicRecordTranslator implements RecordTranslator { +private static final long serialVersionUID = -121699733778988688L; +private final RecordTranslator defaultTranslator; +private final Map> topicToTranslator = new HashMap<>(); +private final Map streamToFields = new HashMap<>(); + +public ByTopicRecordTranslator(Func, List> func, Fields fields, String stream) { +this(new SimpleRecordTranslator<>(func, fields, stream)); +} + +public ByTopicRecordTranslator(Func, List> func, Fields fields) { +this(new SimpleRecordTranslator<>(func, fields)); +} + +public ByTopicRecordTranslator(RecordTranslator defaultTranslator) { +this.defaultTranslator = defaultTranslator; +cacheNCheckFields(defaultTranslator); +} + +public ByTopicRecordTranslator forTopic(String topic, Func, List> func, Fields fields) { +return forTopic(topic, new SimpleRecordTranslator<>(func, fields)); +} + +public ByTopicRecordTranslator forTopic(String topic, Func, List> func, Fields fields, String stream) { +return forTopic(topic, new SimpleRecordTranslator<>(func, fields, stream)); +} + +public ByTopicRecordTranslator forTopic(String topic, RecordTranslator translator) { +if (topicToTranslator.containsKey(topic)) { +throw new IllegalStateException("Topic " + topic + " is already registered"); +} +topicToTranslator.put(topic, translator); +cacheNCheckFields(translator); +return this; +} + +private void cacheNCheckFields(RecordTranslator translator) { +for (String stream : translator.streams()) { +Fields fromTrans = translator.getFieldsFor(stream); +Fields cached = streamToFields.get(stream); +if (cached != null && !fromTrans.equals(cached)) { +throw new IllegalArgumentException("Stream " + stream + " currently has Fields of " + cached + " which is not the same as those being added in " + fromTrans); --- End diff -- In this case it is the argument being passed in that is bad and we are rejecting it. They could come back and switch it to a new stream which would be fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96893867 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.tuple.Fields; + +public class ByTopicRecordTranslator implements RecordTranslator { +private static final long serialVersionUID = -121699733778988688L; +private final RecordTranslator defaultTranslator; +private final Map> topicToTranslator = new HashMap<>(); +private final Map streamToFields = new HashMap<>(); + +public ByTopicRecordTranslator(Func, List> func, Fields fields, String stream) { +this(new SimpleRecordTranslator<>(func, fields, stream)); +} + +public ByTopicRecordTranslator(Func, List> func, Fields fields) { +this(new SimpleRecordTranslator<>(func, fields)); +} + +public ByTopicRecordTranslator(RecordTranslator defaultTranslator) { +this.defaultTranslator = defaultTranslator; +cacheNCheckFields(defaultTranslator); +} + +public ByTopicRecordTranslator forTopic(String topic, Func, List> func, Fields fields) { +return forTopic(topic, new SimpleRecordTranslator<>(func, fields)); +} + +public ByTopicRecordTranslator forTopic(String topic, Func, List> func, Fields fields, String stream) { +return forTopic(topic, new SimpleRecordTranslator<>(func, fields, stream)); +} + +public ByTopicRecordTranslator forTopic(String topic, RecordTranslator translator) { +if (topicToTranslator.containsKey(topic)) { +throw new IllegalStateException("Topic " + topic + " is already registered"); +} +topicToTranslator.put(topic, translator); +cacheNCheckFields(translator); +return this; +} + +private void cacheNCheckFields(RecordTranslator translator) { +for (String stream : translator.streams()) { +Fields fromTrans = translator.getFieldsFor(stream); +Fields cached = streamToFields.get(stream); +if (cached != null && !fromTrans.equals(cached)) { --- End diff -- ``` ByTopicRecordTranslator trans = new ByTopicRecordTranslator((rec) -> Arrays.asList(rec.offset()), new Fields("offset"), "default"); trans.forTopic("specialTopic", (rec) -> Arrays.asList(rec.offset(), rec.message()), new Fields("offset", "message"), "default"); ``` At this point we have tried to declare that the "default" stream has Fields ["offset"], and ["offset", "message"] This is not supported by storm so we should not allow anyone to configure the spout to do this. streamToFields is not yet updated for the new translator we are adding it yet. We do it after we have verified that the Fields match for anything we have done already. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96891568 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java --- @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt.selector; + +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses field with a given index to select the topic name from a tuple . + */ +public class FieldIndexTopicSelector implements KafkaTopicSelector { +private static final long serialVersionUID = -3830575380208166367L; --- End diff -- Because I missed them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96891946 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java --- @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt.selector; + +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses field name to select topic name from tuple . + */ +public class FieldNameTopicSelector implements KafkaTopicSelector { + +private static final Logger LOG = LoggerFactory.getLogger(FieldNameTopicSelector.class); + +private final String fieldName; +private final String defaultTopicName; + + +public FieldNameTopicSelector(String fieldName, String defaultTopicName) { +this.fieldName = fieldName; +this.defaultTopicName = defaultTopicName; +} + +@Override +public String getTopic(Tuple tuple) { --- End diff -- I agree but both for backwards compatibility and to limit the scope of this JIRA I would rather see it done in a follow on JIRA --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96889563 --- Diff: external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -51,7 +51,9 @@ * This bolt uses 0.8.2 Kafka Producer API. * * It works for sending tuples to older Kafka version (0.8.1). + * @deprecated Please use the KafkaBolt in storm-kafka-client */ +@Deprecated --- End diff -- I would rather do that in a follow on JIRA. Most of the code is the same between the two so getting the rework from here to make there is a lot simpler if I can do a cherry pick --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96889298 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.io.Serializable; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.storm.task.TopologyContext; + +/** + * A subscription to kafka. + */ +public abstract class Subscription implements Serializable { +private static final long serialVersionUID = -216136367240198716L; + +/** + * Subscribe the KafkaConsumer to the proper topics + * @param consumer the Consumer to get. + * @param listener the rebalance listener to include in the subscription + */ +public void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener, TopologyContext context) { + subscribe(consumer, listener); +} + +/** + * Subscribe the KafkaConsumer to the proper topics + * @param consumer the Consumer to get. + * @param listener the rebalance listener to include in the subscription + * @deprecated please use the version with the TopologyContext in it + */ +public void subscribe(KafkaConsumer consumer, ConsumerRebalanceListener listener) { + throw new IllegalStateException("At least one subscribe method must be overwritten"); +} + +/** + * @return a string representing the subscribed topics. --- End diff -- I didn't know either, but the only place I can see it used is in logging so that is what I did. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96887342 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -61,129 +66,244 @@ * If no offset has been committed, it behaves as LATEST. * * */ -public enum FirstPollOffsetStrategy { +public static enum FirstPollOffsetStrategy { EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST } - -// Kafka consumer configuration -private final Map kafkaProps; -private final Deserializer keyDeserializer; -private final Deserializer valueDeserializer; -private final long pollTimeoutMs; - -// Kafka spout configuration -private final long offsetCommitPeriodMs; -private final int maxRetries; -private final int maxUncommittedOffsets; -private final long partitionRefreshPeriodMs; -private final boolean manualPartitionAssignment; -private final FirstPollOffsetStrategy firstPollOffsetStrategy; -private final KafkaSpoutStreams kafkaSpoutStreams; -private final KafkaSpoutTuplesBuilder tuplesBuilder; -private final KafkaSpoutRetryService retryService; - -private KafkaSpoutConfig(Builder builder) { -this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); -this.keyDeserializer = builder.keyDeserializer; -this.valueDeserializer = builder.valueDeserializer; -this.pollTimeoutMs = builder.pollTimeoutMs; -this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; -this.maxRetries = builder.maxRetries; -this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; -this.kafkaSpoutStreams = builder.kafkaSpoutStreams; -this.maxUncommittedOffsets = builder.maxUncommittedOffsets; -this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs; -this.manualPartitionAssignment = builder.manualPartitionAssignment; -this.tuplesBuilder = builder.tuplesBuilder; -this.retryService = builder.retryService; + +public static Builder builder(String bootstrapServers, String ... topics) { +return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); } - -private Map setDefaultsAndGetKafkaProps(Map kafkaProps) { + +public static Builder builder(String bootstrapServers, Collection topics) { +return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); +} + +public static Builder builder(String bootstrapServers, Pattern topics) { +return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); +} + +private static Map setDefaultsAndGetKafkaProps(Map kafkaProps) { // set defaults for properties not specified -if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) { -kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false"); +if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { +kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); } return kafkaProps; } - + public static class Builder { private final Map kafkaProps; -private SerializableDeserializer keyDeserializer; -private SerializableDeserializer valueDeserializer; +private Subscription subscription; +private final SerializableDeserializer keyDes; +private final Class> keyDesClazz; +private final SerializableDeserializer valueDes; +private final Class> valueDesClazz; +private RecordTranslator translator; private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS; private int maxRetries = DEFAULT_MAX_RETRIES; private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; -private final KafkaSpoutStreams kafkaSpoutStreams; private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; +private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE; private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; -private boolean manualPartitionAssignment = false; -private final KafkaSpoutTuplesBuilder tuplesBuilder; -private final KafkaSpoutRetryService retryService; - -/*
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96886556 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -61,129 +66,244 @@ * If no offset has been committed, it behaves as LATEST. * * */ -public enum FirstPollOffsetStrategy { +public static enum FirstPollOffsetStrategy { EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST } - -// Kafka consumer configuration -private final Map kafkaProps; -private final Deserializer keyDeserializer; -private final Deserializer valueDeserializer; -private final long pollTimeoutMs; - -// Kafka spout configuration -private final long offsetCommitPeriodMs; -private final int maxRetries; -private final int maxUncommittedOffsets; -private final long partitionRefreshPeriodMs; -private final boolean manualPartitionAssignment; -private final FirstPollOffsetStrategy firstPollOffsetStrategy; -private final KafkaSpoutStreams kafkaSpoutStreams; -private final KafkaSpoutTuplesBuilder tuplesBuilder; -private final KafkaSpoutRetryService retryService; - -private KafkaSpoutConfig(Builder builder) { -this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); -this.keyDeserializer = builder.keyDeserializer; -this.valueDeserializer = builder.valueDeserializer; -this.pollTimeoutMs = builder.pollTimeoutMs; -this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; -this.maxRetries = builder.maxRetries; -this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; -this.kafkaSpoutStreams = builder.kafkaSpoutStreams; -this.maxUncommittedOffsets = builder.maxUncommittedOffsets; -this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs; -this.manualPartitionAssignment = builder.manualPartitionAssignment; -this.tuplesBuilder = builder.tuplesBuilder; -this.retryService = builder.retryService; + +public static Builder builder(String bootstrapServers, String ... topics) { +return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); } - -private Map setDefaultsAndGetKafkaProps(Map kafkaProps) { + +public static Builder builder(String bootstrapServers, Collection topics) { +return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); +} + +public static Builder builder(String bootstrapServers, Pattern topics) { +return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); +} + +private static Map setDefaultsAndGetKafkaProps(Map kafkaProps) { // set defaults for properties not specified -if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) { -kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false"); +if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { +kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); } return kafkaProps; } - + public static class Builder { private final Map kafkaProps; -private SerializableDeserializer keyDeserializer; -private SerializableDeserializer valueDeserializer; +private Subscription subscription; +private final SerializableDeserializer keyDes; +private final Class> keyDesClazz; +private final SerializableDeserializer valueDes; +private final Class> valueDesClazz; +private RecordTranslator translator; private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS; private int maxRetries = DEFAULT_MAX_RETRIES; private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; -private final KafkaSpoutStreams kafkaSpoutStreams; private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; +private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE; private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; -private boolean manualPartitionAssignment = false; -private final KafkaSpoutTuplesBuilder tuplesBuilder; -private final KafkaSpoutRetryService retryService; - -/*
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96884451 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -225,11 +213,7 @@ public void nextTuple() { } if (poll()) { -try { -setWaitingToEmit(pollKafkaBroker()); -} catch (RetriableException e) { -LOG.error("Failed to poll from kafka.", e); -} +setWaitingToEmit(pollKafkaBroker()); --- End diff -- That's right it was a merge error. Great catch though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96879498 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper; +import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; +import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.Properties; + + +/** + * Bolt implementation that can send Tuple data to Kafka + * + * It expects the producer configuration and topic in storm config under + * + * 'kafka.broker.properties' and 'topic' + * + * respectively. + */ +public class KafkaBolt extends BaseRichBolt { +private static final long serialVersionUID = -5205886631877033478L; + +private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class); + +public static final String TOPIC = "topic"; + +private KafkaProducer producer; +private OutputCollector collector; +private TupleToKafkaMapper mapper; +private KafkaTopicSelector topicSelector; +private Properties boltSpecfiedProperties = new Properties(); +private boolean fireAndForget = false; +private boolean async = true; + +public KafkaBolt() {} + +public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper mapper) { +this.mapper = mapper; +return this; +} + +public KafkaBolt withTopicSelector(String topic) { +return withTopicSelector(new DefaultTopicSelector(topic)); +} + +public KafkaBolt withTopicSelector(KafkaTopicSelector selector) { +this.topicSelector = selector; +return this; +} + +public KafkaBolt withProducerProperties(Properties producerProperties) { +this.boltSpecfiedProperties = producerProperties; +return this; +} + +@Override +public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { +//for backward compatibility. +if(mapper == null) { +this.mapper = new FieldNameBasedTupleToKafkaMapper(); +} + +//for backward compatibility. +if(topicSelector == null) { +if(stormConf.containsKey(TOPIC)) { +this.topicSelector = new DefaultTopicSelector((String) stormConf.get(TOPIC)); +} else { +throw new IllegalArgumentException("topic should be specified in bolt's configuration"); +} +} + +producer = mkProducer(boltSpecfiedProperties); +this.collector = collector; +} + +/** + * Intended to be overridden for tests. Make the producer from props --- End diff -- No sorry wrong place for that comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHu
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96879307 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper; +import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; +import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.Properties; + + +/** + * Bolt implementation that can send Tuple data to Kafka + * + * It expects the producer configuration and topic in storm config under + * + * 'kafka.broker.properties' and 'topic' + * + * respectively. + */ +public class KafkaBolt extends BaseRichBolt { +private static final long serialVersionUID = -5205886631877033478L; + +private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class); + +public static final String TOPIC = "topic"; + +private KafkaProducer producer; +private OutputCollector collector; +private TupleToKafkaMapper mapper; +private KafkaTopicSelector topicSelector; +private Properties boltSpecfiedProperties = new Properties(); +private boolean fireAndForget = false; +private boolean async = true; + +public KafkaBolt() {} + +public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper mapper) { +this.mapper = mapper; +return this; +} + +public KafkaBolt withTopicSelector(String topic) { +return withTopicSelector(new DefaultTopicSelector(topic)); +} + +public KafkaBolt withTopicSelector(KafkaTopicSelector selector) { +this.topicSelector = selector; +return this; +} + +public KafkaBolt withProducerProperties(Properties producerProperties) { +this.boltSpecfiedProperties = producerProperties; +return this; +} + +@Override +public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { +//for backward compatibility. +if(mapper == null) { +this.mapper = new FieldNameBasedTupleToKafkaMapper(); +} + +//for backward compatibility. +if(topicSelector == null) { +if(stormConf.containsKey(TOPIC)) { +this.topicSelector = new DefaultTopicSelector((String) stormConf.get(TOPIC)); +} else { +throw new IllegalArgumentException("topic should be specified in bolt's configuration"); +} +} + +producer = mkProducer(boltSpecfiedProperties); +this.collector = collector; +} + +/** + * Intended to be overridden for tests. Make the producer from props --- End diff -- backwards compatibility --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If you
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96879248 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper; +import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; +import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.Properties; + + +/** + * Bolt implementation that can send Tuple data to Kafka + * + * It expects the producer configuration and topic in storm config under + * + * 'kafka.broker.properties' and 'topic' + * + * respectively. + */ +public class KafkaBolt extends BaseRichBolt { +private static final long serialVersionUID = -5205886631877033478L; + +private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class); + +public static final String TOPIC = "topic"; + +private KafkaProducer producer; +private OutputCollector collector; +private TupleToKafkaMapper mapper; +private KafkaTopicSelector topicSelector; +private Properties boltSpecfiedProperties = new Properties(); +private boolean fireAndForget = false; +private boolean async = true; + +public KafkaBolt() {} + +public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper mapper) { +this.mapper = mapper; +return this; +} + +public KafkaBolt withTopicSelector(String topic) { +return withTopicSelector(new DefaultTopicSelector(topic)); +} + +public KafkaBolt withTopicSelector(KafkaTopicSelector selector) { +this.topicSelector = selector; +return this; +} + +public KafkaBolt withProducerProperties(Properties producerProperties) { +this.boltSpecfiedProperties = producerProperties; +return this; +} + +@Override +public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { +//for backward compatibility. +if(mapper == null) { +this.mapper = new FieldNameBasedTupleToKafkaMapper(); +} + +//for backward compatibility. +if(topicSelector == null) { +if(stormConf.containsKey(TOPIC)) { +this.topicSelector = new DefaultTopicSelector((String) stormConf.get(TOPIC)); +} else { +throw new IllegalArgumentException("topic should be specified in bolt's configuration"); +} +} + +producer = mkProducer(boltSpecfiedProperties); +this.collector = collector; +} + +/** + * Intended to be overridden for tests. Make the producer from props + */ +protected KafkaProducer mkProducer(Properties props) { --- End diff -- backwards compatibility --- If your project is set up for it, yo
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96877333 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper; +import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; +import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.Properties; + + +/** + * Bolt implementation that can send Tuple data to Kafka + * + * It expects the producer configuration and topic in storm config under + * + * 'kafka.broker.properties' and 'topic' + * + * respectively. + */ +public class KafkaBolt extends BaseRichBolt { +private static final long serialVersionUID = -5205886631877033478L; + +private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class); + +public static final String TOPIC = "topic"; + +private KafkaProducer producer; +private OutputCollector collector; +private TupleToKafkaMapper mapper; +private KafkaTopicSelector topicSelector; +private Properties boltSpecfiedProperties = new Properties(); +private boolean fireAndForget = false; +private boolean async = true; + +public KafkaBolt() {} + +public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper mapper) { +this.mapper = mapper; +return this; +} + +public KafkaBolt withTopicSelector(String topic) { --- End diff -- I don't want to rename public facing APIs right now, because this is a copy of what is in external/storm-kafka. The other one is deprecated, but I want to maintain compatibility if possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96877744 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper; +import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; +import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.Properties; + + +/** + * Bolt implementation that can send Tuple data to Kafka + * + * It expects the producer configuration and topic in storm config under + * + * 'kafka.broker.properties' and 'topic' + * + * respectively. + */ +public class KafkaBolt extends BaseRichBolt { +private static final long serialVersionUID = -5205886631877033478L; + +private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class); + +public static final String TOPIC = "topic"; + +private KafkaProducer producer; +private OutputCollector collector; +private TupleToKafkaMapper mapper; +private KafkaTopicSelector topicSelector; +private Properties boltSpecfiedProperties = new Properties(); +private boolean fireAndForget = false; +private boolean async = true; + +public KafkaBolt() {} + +public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper mapper) { +this.mapper = mapper; +return this; +} + +public KafkaBolt withTopicSelector(String topic) { +return withTopicSelector(new DefaultTopicSelector(topic)); +} + +public KafkaBolt withTopicSelector(KafkaTopicSelector selector) { +this.topicSelector = selector; +return this; +} + +public KafkaBolt withProducerProperties(Properties producerProperties) { --- End diff -- This is "moving" existing code so I want to maintain compatibility if possible. But we are doing a copy + deprecation because the two libraries are compiled with different versions of Kafka so combining the two libraries in a single topology is difficult in some cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96875424 --- Diff: docs/storm-kafka-client.md --- @@ -1,90 +1,232 @@ -#Storm Kafka Spout with New Kafka Consumer API +#Storm Apache Kafka integration using the kafka-client jar +This includes the new Apache Kafka copnsumer API. -Apache Storm Spout implementation to consume data from Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version Compatibility] (#compatibility)). +##Compatibility -The Apache Storm Spout allows clients to consume data from Kafka starting at offsets as defined by the offset strategy specified in `FirstPollOffsetStrategy`. -In case of failure, the Kafka Spout will re-start consuming messages from the offset that matches the chosen `FirstPollOffsetStrategy`. +Apache Kafka versions 0.10 onwards -The Kafka Spout implementation allows you to specify the stream (`KafkaSpoutStream`) associated with each topic or topic wildcard. `KafkaSpoutStream` represents the stream and output fields. For named topics use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use `KafkaSpoutStreamsWildcardTopics`. +##Writing to Kafka as part of your topology +You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you +are using trident you can use org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and +org.apache.storm.kafka.trident.TridentKafkaUpdater. -The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s from `ConsumerRecord`s. The logic is provided by the user through implementing the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use `KafkaSpoutTuplesBuilderWildcardTopics`. +You need to provide implementations for the following 2 interfaces -Multiple topics and topic wildcards can use the same `KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s from `ConsumerRecord`s is identical. +###TupleToKafkaMapper and TridentTupleToKafkaMapper +These interfaces have 2 methods defined: +```java +K getKeyFromTuple(Tuple/TridentTuple tuple); +V getMessageFromTuple(Tuple/TridentTuple tuple); +``` + +As the name suggests, these methods are called to map a tuple to a Kafka key and a Kafka message. If you just want one field +as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java +implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you +use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility +reasons. Alternatively you could also specify a different key and message field by using the non default constructor. +In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. +These should be specified while constructing an instance of FieldNameBasedTupleToKafkaMapper. + +###KafkaTopicSelector and trident KafkaTopicSelector +This interface has only one method +```java +public interface KafkaTopicSelector { +String getTopics(Tuple/TridentTuple tuple); +} +``` +The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published +You can return a null and the message will be ignored. If you have one static topic name then you can use +DefaultTopicSelector.java and set the name of the topic in the constructor. +`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to select the topic should to publish a tuple to. +A user just needs to specify the field name or field index for the topic name in the tuple itself. +When the topic is name not found , the `Field*TopicSelector` will write messages into default topic . +Please make sure the default topic has been created . + +### Specifying Kafka producer properties +You can provide all the producer properties in your Storm topology by calling `KafkaBolt.withProducerProperties()` and `TridentKafkaStateFactory.withProducerProperties()`. Please see http://kafka.apache.org/documentation.html#newproducerconfigs +Section "Important configuration properties for the producer" for more details. +These are also defined in `org.apache.kafka.clients.producer.ProducerConfig` + +###Using wildcard kafka topic match +You can do a wildcard topic match by adding the following config +``` + Config config = new Config(); + config.put("kafka.topic.wildcard.match",true); -# Us
[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1808 The test failures are unrelated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1808 @srdo @harshach sorry to do this to you, but I just fixed the conflicts with STORM-2236. Sadly the fastest way I could do it was to revert the original code and implement similar functionality, which is the latest commit. Could you please take a look at it? I created a few new Subscription implementations that can do the manual partition management. It only needed a small change to the Spout to support a timeout. I will try to look at adding some documentation and also the impact to the trident spout. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1868 @harshach and @ppoulosk I just pushed fixes for your review comments. @harshach you were right I could and did remove StormStringDeserializer. I think this made the code much better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1878: STORM-2295:KafkaSpoutStreamsNamedTopics changing the sequ...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1878 +1 the change looks good to me. Great catch @pasalkarsachin1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1868#discussion_r96432779 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/StormStringDeserializer.java --- @@ -15,22 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.storm.kafka.spout; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.List; - -public class KafkaSpoutTuplesBuilderWildcardTopics implements KafkaSpoutTuplesBuilder { -private KafkaSpoutTupleBuilder tupleBuilder; +import org.apache.kafka.common.serialization.StringDeserializer; -public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder tupleBuilder) { -this.tupleBuilder = tupleBuilder; -} +public class StormStringDeserializer extends StringDeserializer implements SerializableDeserializer { --- End diff -- Yes we need this even more now. The Kafka Deserializer (including StringDeserializer) is not java serializable. So if we don't do this on a real storm cluster we will get exceptions when we try to write out the spout. I can look into trying to support some kind of generics like ``` public Builder setKey(Class> keyDeserializer) { ``` But I really don't know if that works. I'll try it out and let you know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1868#discussion_r95682402 --- Diff: examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java --- @@ -18,87 +18,58 @@ package org.apache.storm.kafka.trident; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.kafka.spout.Func; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; import org.apache.storm.kafka.spout.KafkaSpoutRetryService; -import org.apache.storm.kafka.spout.KafkaSpoutStreams; -import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics; -import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder; -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics; -import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager; import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; - public class TridentKafkaClientWordCountNamedTopics { private static final String TOPIC_1 = "test-trident"; private static final String TOPIC_2 = "test-trident-1"; private static final String KAFKA_LOCAL_BROKER = "localhost:9092"; private KafkaTridentSpoutOpaque newKafkaTridentSpoutOpaque() { -return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>( -newKafkaSpoutConfig( -newKafkaSpoutStreams(; +return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig()); } -private KafkaSpoutConfig newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) { -return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(), -kafkaSpoutStreams, newTuplesBuilder(), newRetryService()) + private static Func, List> JUST_VALUE_FUNC = new Func, List>() { + @Override + public List apply(ConsumerRecord record) { + return new Values(record.value()); + } + }; + +protected KafkaSpoutConfig newKafkaSpoutConfig() { +return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPIC_1, TOPIC_2) --- End diff -- This is an exact translation of the original code. Even down to not using KAFKA_LOCAL_BROKER. If people want me to change it I am happy to, but I thought it best to not overreach on the scope of the pull request. At least until the code worked. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1808 The test failures are unrelated and are around the integration tests that always seem to fail lately. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1868 The test failures are unrelated and are around the integration tests that always seem to fail lately. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1862: STORM-2278: Allow max number of disruptor queue flusher t...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1862 @HeartSaVioR I rebased and I think I addressed your review comments. Please have a look and see if it is what you wanted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1808 All outstanding review comments should be done now. This and the 1.x port at #1868 should be ready for a final pass and hopefully being merged in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1868 This is the 1.x version of #1808 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/1868 STORM-2225: change spout config to be simpler. (1.x) You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-2225-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1868.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1868 commit 95883ac6f0202367b7a7f47eaa50ddeef824dc27 Author: Robert (Bobby) Evans Date: 2016-11-30T03:39:26Z STORM-1997: copy state/bolt from storm-kafka to storm-kafka-client STORM-2225: change spout config to be simpler. STORM-2228: removed ability to request a single topic go to multiple streams Conflicts: examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java commit 2e041c3af64143157fd2e7a0af419685857e8083 Author: Robert (Bobby) Evans Date: 2016-12-08T19:26:49Z fixed some issues with rebase Conflicts: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java commit dbf040082357e585bd3aba94ed31d25e8c5f3ea9 Author: Robert (Bobby) Evans Date: 2016-12-08T22:10:12Z addressed review comments Conflicts: external/storm-kafka-client/README.md commit f95fc1670c6c03e27082d59555e26560636d15a9 Author: Robert (Bobby) Evans Date: 2016-12-08T22:12:43Z oops commit b8e32fceefaa1e71b4e4dec6e67d0a126761e949 Author: Robert (Bobby) Evans Date: 2017-01-06T22:37:31Z STORM-2225: make the core API java7 compatible commit 38f4ede2899145b4f3b527a746e7cd0999e9bb46 Author: Robert (Bobby) Evans Date: 2017-01-06T22:42:31Z STORM-2225: addressed doc review comments commit 853c524313cf1375499d3a9ccb0ec5a3509a7ae8 Author: Robert (Bobby) Evans Date: 2017-01-09T16:11:41Z STORM-2225: java7 modifications commit 89eb16faef264057efebb6eb19f4a8089dd820a9 Author: Robert (Bobby) Evans Date: 2017-01-09T16:58:27Z STORM-2225: Updated docs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1862: STORM-2278: Allow max number of disruptor queue fl...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1862#discussion_r95174977 --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --- @@ -63,6 +63,18 @@ private static final String PREFIX = "disruptor-"; private static final FlusherPool FLUSHER = new FlusherPool(); +private static int getNumFlusherPoolThreads() { +int numThreads = 100; +try { +String threads = System.getProperty("num_flusher_pool_threads", "100"); --- End diff -- The issue with using a Config for this is that readStormConfig inside the worker would read the system config, not the topology config, and would not let us override it on a per topology bases. I will add in a config for the system default, but in the documents there indicate the system property that can be set to override it for a topology. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1862: STORM-2278: Allow max number of disruptor queue fl...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1862#discussion_r95169637 --- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java --- @@ -63,6 +63,18 @@ private static final String PREFIX = "disruptor-"; private static final FlusherPool FLUSHER = new FlusherPool(); +private static int getNumFlusherPoolThreads() { +int numThreads = 100; +try { +String threads = System.getProperty("num_flusher_pool_threads", "100"); --- End diff -- I agree that it does not add a lot of value in it's current form. If you want me to document it I am happy to. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1808 I think I have addressed all of the review comments so far. I will try to get my 1.x version of the patch up shortly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1853: STORM-2264 OpaqueTridentKafkaSpout failing after STORM-22...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1853 I am also OK with reverting STORM-2216 if it is causing a lot of issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1853: STORM-2264 OpaqueTridentKafkaSpout failing after STORM-22...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1853 +1 seems fine to me --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1785: [STORM-2201] Add dynamic scheduler configuration l...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1785#discussion_r94972777 --- Diff: docs/IConfigLoader.md --- @@ -0,0 +1,58 @@ +--- +title: IConfigLoader +layout: documentation +documentation: true +--- + + +### Introduction +IConfigLoader is an interface designed to allow way to dynamically load scheduler resource constraints into scheduler implementations. Currently, the MultiTenant scheduler uses this interface to dynamically load the number of isolated nodes a given user has been guaranteed, and the ResoureAwareScheduler uses the interface to dynamically load per user resource guarantees. --- End diff -- Perhaps "designed to allow dynamic loading of scheduler resource constraints" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1767: STORM-2194: Report error and die, not report error or die
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1767 @sathyafmt sorry I have taken so long to respond December was a really crazy month for me. From STORM-2194 I see that the SocketTimeoutException goes through the code being changed. The RMI code does not go through that path at all. ``` 2016-12-01 04:24:41.721 STDERR [INFO] Error: Exception thrown by the agent : java.rmi.server.ExportException: Port already in use: 56700; nested exception is: 2016-12-01 04:24:41.722 STDERR [INFO] java.net.BindException: Address already in use ``` If it did then we would have exited because BindException and ExportException are neither InterruptedIOException nor InterruptedException. So this patch, nor the one I proposed would have any impact on the RMI case at all. Something else is catching the ExportException and printing to STDERR the error message above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1862: STORM-2278: Allow max number of disruptor queue fl...
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/1862 STORM-2278: Allow max number of disruptor queue flusher threads to be configurable You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-2278 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1862.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1862 commit 753b1cbca44ff26784eee8f1a0a5cbce5a1b97d6 Author: Robert (Bobby) Evans Date: 2017-01-05T20:08:02Z STORM-2278: Allow max number of disruptor queue flusher threads to be configurable --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1827: STORM-2243: adds ip address to supervisor id
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1827 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1842: Merge remote-tracking branch 'refs/remotes/apache/master'
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1842 @leongu-tc this is an empty commit/pull request. Could you please close this pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1839: STORM-1292
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1839#discussion_r94782513 --- Diff: storm-core/test/jvm/org/apache/storm/MessagingTest.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm; + +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.testing.*; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class MessagingTest { +private final static Logger LOG = LoggerFactory.getLogger(MessagingTest.class); + +@Test +public void testLocalTransport() throws Exception { +Config stormConf = new Config(); +//stormConf.putAll(Utils.readDefaultConfig()); +stormConf.put(Config.TOPOLOGY_WORKERS, 2); +stormConf.put(Config.STORM_MESSAGING_TRANSPORT , "org.apache.storm.messaging.netty.Context"); --- End diff -- This was in the original code https://github.com/apache/storm/blob/d5acec9e3b9473a0e8cf39c7e12393626a3ca426/storm-core/test/clj/org/apache/storm/messaging_test.clj#L32-L33 But yes it should be optional. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1839: STORM-1292
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1839#discussion_r94783130 --- Diff: storm-core/test/jvm/org/apache/storm/MessagingTest.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm; + +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.testing.*; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class MessagingTest { +private final static Logger LOG = LoggerFactory.getLogger(MessagingTest.class); + +@Test +public void testLocalTransport() throws Exception { +Config stormConf = new Config(); +//stormConf.putAll(Utils.readDefaultConfig()); +stormConf.put(Config.TOPOLOGY_WORKERS, 2); +stormConf.put(Config.STORM_MESSAGING_TRANSPORT , "org.apache.storm.messaging.netty.Context"); +boolean[] transportOptions = {true, false}; +for(boolean transportOn:transportOptions) { +stormConf.put(Config.STORM_LOCAL_MODE_ZMQ, transportOn); +//List seeds = new ArrayList<>(); +//seeds.add("localhost"); +//stormConf.put(Config.NIMBUS_HOST, "localhost"); +//stormConf.put(Config.NIMBUS_SEEDS, seeds); +//stormConf.put("storm.cluster.mode", "local"); +//stormConf.put(Config.STORM_LOCAL_HOSTNAME, "localhost"); + +ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withSupervisors(1).withPortsPerSupervisor(2) --- End diff -- This needs to be in a try block, so the autoclose in cluster is called properly. ``` try (ILocalCluster cluster = new LocalCluster.Builder()build()) { //Rest of the test that used cluster } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1839: STORM-1292
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1839#discussion_r94783485 --- Diff: storm-core/test/jvm/org/apache/storm/MessagingTest.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm; + +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.testing.*; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class MessagingTest { +private final static Logger LOG = LoggerFactory.getLogger(MessagingTest.class); + +@Test +public void testLocalTransport() throws Exception { +Config stormConf = new Config(); +//stormConf.putAll(Utils.readDefaultConfig()); +stormConf.put(Config.TOPOLOGY_WORKERS, 2); +stormConf.put(Config.STORM_MESSAGING_TRANSPORT , "org.apache.storm.messaging.netty.Context"); +boolean[] transportOptions = {true, false}; +for(boolean transportOn:transportOptions) { +stormConf.put(Config.STORM_LOCAL_MODE_ZMQ, transportOn); +//List seeds = new ArrayList<>(); +//seeds.add("localhost"); +//stormConf.put(Config.NIMBUS_HOST, "localhost"); +//stormConf.put(Config.NIMBUS_SEEDS, seeds); +//stormConf.put("storm.cluster.mode", "local"); +//stormConf.put(Config.STORM_LOCAL_HOSTNAME, "localhost"); + +ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withSupervisors(1).withPortsPerSupervisor(2) +.withDaemonConf(stormConf).build(); +Thrift.SpoutDetails spoutDetails = Thrift.prepareSpoutDetails(new TestWordSpout(false), 2); +Map inputs = new HashMap<>(); +inputs.put(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()); +Thrift.BoltDetails boltDetails = Thrift.prepareBoltDetails(inputs, new TestGlobalCount(), 6); +Map spoutMap = new HashMap<>(); +spoutMap.put("1", spoutDetails); +Map boltMap = new HashMap<>(); +boltMap.put("2", boltDetails); +StormTopology stormTopology = Thrift.buildTopology(spoutMap, boltMap); +//TopologyBuilder builder = new TopologyBuilder(); +//builder.setSpout("1", new TestWordSpout(false), 2); +//builder.setBolt("2", new TestGlobalCount(), 6).shuffleGrouping("1"); +//StormTopology stormTopology = builder.createTopology(); +FixedTuple[] fixedTuple = {new FixedTuple((List) Collections.singletonList((Object) "a")), new FixedTuple((List) Collections.singletonList((Object) "b")), --- End diff -- Or better yet if the commented out code works use it instead. It is a lot smaller and fits the java API better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1839: STORM-1292
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1839#discussion_r94784021 --- Diff: storm-core/test/jvm/org/apache/storm/MessagingTest.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm; + +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.testing.*; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class MessagingTest { +private final static Logger LOG = LoggerFactory.getLogger(MessagingTest.class); + +@Test +public void testLocalTransport() throws Exception { +Config stormConf = new Config(); +//stormConf.putAll(Utils.readDefaultConfig()); +stormConf.put(Config.TOPOLOGY_WORKERS, 2); +stormConf.put(Config.STORM_MESSAGING_TRANSPORT , "org.apache.storm.messaging.netty.Context"); +boolean[] transportOptions = {true, false}; +for(boolean transportOn:transportOptions) { +stormConf.put(Config.STORM_LOCAL_MODE_ZMQ, transportOn); +//List seeds = new ArrayList<>(); +//seeds.add("localhost"); +//stormConf.put(Config.NIMBUS_HOST, "localhost"); +//stormConf.put(Config.NIMBUS_SEEDS, seeds); +//stormConf.put("storm.cluster.mode", "local"); +//stormConf.put(Config.STORM_LOCAL_HOSTNAME, "localhost"); + +ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withSupervisors(1).withPortsPerSupervisor(2) +.withDaemonConf(stormConf).build(); +Thrift.SpoutDetails spoutDetails = Thrift.prepareSpoutDetails(new TestWordSpout(false), 2); +Map inputs = new HashMap<>(); +inputs.put(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()); +Thrift.BoltDetails boltDetails = Thrift.prepareBoltDetails(inputs, new TestGlobalCount(), 6); +Map spoutMap = new HashMap<>(); +spoutMap.put("1", spoutDetails); +Map boltMap = new HashMap<>(); +boltMap.put("2", boltDetails); +StormTopology stormTopology = Thrift.buildTopology(spoutMap, boltMap); +//TopologyBuilder builder = new TopologyBuilder(); +//builder.setSpout("1", new TestWordSpout(false), 2); +//builder.setBolt("2", new TestGlobalCount(), 6).shuffleGrouping("1"); +//StormTopology stormTopology = builder.createTopology(); +FixedTuple[] fixedTuple = {new FixedTuple((List) Collections.singletonList((Object) "a")), new FixedTuple((List) Collections.singletonList((Object) "b")), --- End diff -- Actually this code is not used any more `fixedTuples` with an s right below this is the one that is put into the data set. Please delete this code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1839: STORM-1292
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1839#discussion_r94782323 --- Diff: storm-core/test/jvm/org/apache/storm/MessagingTest.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm; + +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.testing.*; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class MessagingTest { +private final static Logger LOG = LoggerFactory.getLogger(MessagingTest.class); + +@Test +public void testLocalTransport() throws Exception { +Config stormConf = new Config(); +//stormConf.putAll(Utils.readDefaultConfig()); +stormConf.put(Config.TOPOLOGY_WORKERS, 2); +stormConf.put(Config.STORM_MESSAGING_TRANSPORT , "org.apache.storm.messaging.netty.Context"); +boolean[] transportOptions = {true, false}; --- End diff -- This is OK but for anything more complex it might be good to use https://github.com/junit-team/junit4/wiki/Parameterized-tests instead --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1852: [STORM-2271] ClosedByInterruptException should be handled...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1852 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1857: STORM-2275: Nimbus crashed during state transition of top...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1857 +1 - Thanks for fixing this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1859: STORM-2276 Remove twitter4j usages due to license issue (...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1859 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1860: STORM-2276 Remove twitter4j usages due to license issue (...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1860 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1858: STORM-2276 Remove twitter4j usages due to license issue (...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1858 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94649177 --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java --- @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.DefaultScheduler; +import org.apache.storm.scheduler.INimbus; +import org.apache.storm.scheduler.SchedulerAssignmentImpl; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.List; +import java.util.ArrayList; + +public class TestBlacklistScheduler { + +private static final Logger LOG = LoggerFactory.getLogger(TestBlacklistScheduler.class); + +private static int currentTime = 1468216504; + +@Test +public void TestBadSupervisor() { +INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest(); + +Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4); + +Config config = new Config(); +config.putAll(Utils.readDefaultConfig()); +config.put(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200); +config.put(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2); +config.put(Config.BLACKLIST_SCHEDULER_RESUME_TIME, 300); + +Map topoMap = new HashMap(); + +TopologyDetails topo1 = TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, true); +//TopologyDetails topo2 = TestUtilsForBlacklistScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8,true); +//TopologyDetails topo3 = TestUtilsForBlacklistScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16,true); +topoMap.put(topo1.getId(), topo1); +//topoMap.put(topo2.getId(), topo2); +//topoMap.put(topo3.getId(), topo3); --- End diff -- Please remove the commented out code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94665415 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java --- @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import org.apache.storm.Config; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.IScheduler; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.blacklist.reporters.IReporter; +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class BlacklistScheduler implements IScheduler { +private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class); +IScheduler underlyingScheduler; +@SuppressWarnings("rawtypes") +private Map _conf; + +protected int toleranceTime; +protected int toleranceCount; +protected int resumeTime; +protected IReporter reporter; +protected IBlacklistStrategy blacklistStrategy; + +protected int nimbusMonitorFreqSecs; + +protected Map> cachedSupervisors; + +//key is supervisor key ,value is supervisor ports +protected CircularBuffer>> badSupervisorsTolerance; +protected Set blacklistHost; + +public BlacklistScheduler(IScheduler underlyingScheduler) { +this.underlyingScheduler = underlyingScheduler; +} + +@Override +public void prepare(Map conf) { +LOG.info("prepare black list scheduler"); +LOG.info(conf.toString()); +underlyingScheduler.prepare(conf); +_conf = conf; +if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) { +toleranceTime = (Integer) _conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME); --- End diff -- nimbusMonitorFreqSecs also needs this help too --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94661600 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java --- @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import org.apache.storm.Config; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.IScheduler; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.blacklist.reporters.IReporter; +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class BlacklistScheduler implements IScheduler { +private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class); +IScheduler underlyingScheduler; +@SuppressWarnings("rawtypes") +private Map _conf; + +protected int toleranceTime; +protected int toleranceCount; +protected int resumeTime; +protected IReporter reporter; +protected IBlacklistStrategy blacklistStrategy; + +protected int nimbusMonitorFreqSecs; + +protected Map> cachedSupervisors; + +//key is supervisor key ,value is supervisor ports +protected CircularBuffer>> badSupervisorsTolerance; +protected Set blacklistHost; + +public BlacklistScheduler(IScheduler underlyingScheduler) { +this.underlyingScheduler = underlyingScheduler; +} + +@Override +public void prepare(Map conf) { +LOG.info("prepare black list scheduler"); +LOG.info(conf.toString()); +underlyingScheduler.prepare(conf); +_conf = conf; +if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) { +toleranceTime = (Integer) _conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME); --- End diff -- the isInteger annotation does not guarantee that it is an Integer you will get, but it guarantees that you will get a Number that can be turned into an Integer without losing data. ``` toleranceTime = Utils.getInt( _conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME)); ``` This holds true for `toleranceCount` and `resumeTime` too --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94675511 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/CircularBuffer.java --- @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import java.io.Serializable; +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public final class CircularBuffer extends AbstractCollection implements Serializable { + +// This is the largest capacity allowed by this implementation +private static final int MAX_CAPACITY = 1 << 30; + +private int size = 0; +private int producerIndex = 0; +private int consumerIndex = 0; + +private int capacity; + +private Serializable[] underlying; + +// Construct a buffer which has at least the specified capacity. If +// the value specified is a power of two then the buffer will be +// exactly the specified size. Otherwise the buffer will be the +// first power of two which is greater than the specified value. +public CircularBuffer(int capacity) { + +if (capacity > MAX_CAPACITY) { +throw new IllegalArgumentException("Capacity greater than " + +"allowed"); +} + +this.capacity = capacity; +underlying = new Serializable[this.capacity]; +} + +// Constructor used by clone() +private CircularBuffer(CircularBuffer oldBuffer) { +size = oldBuffer.size; +producerIndex = oldBuffer.producerIndex; +consumerIndex = oldBuffer.consumerIndex; +capacity = oldBuffer.capacity; +//bitmask = oldBuffer.bitmask; +underlying = new Serializable[oldBuffer.underlying.length]; +System.arraycopy(oldBuffer.underlying, 0, underlying, 0, underlying.length); +} + +private boolean isFull() { +return size == capacity; +} + +public boolean add(Serializable obj) { --- End diff -- Shouldn't this be ```public boolean add(T obj)``` Also if the is overriding an existing method we should use the `@Override` annotation. This is for all of the methods here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94665531 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java --- @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import org.apache.storm.Config; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.IScheduler; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.blacklist.reporters.IReporter; +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class BlacklistScheduler implements IScheduler { +private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class); +IScheduler underlyingScheduler; +@SuppressWarnings("rawtypes") +private Map _conf; + +protected int toleranceTime; +protected int toleranceCount; +protected int resumeTime; +protected IReporter reporter; +protected IBlacklistStrategy blacklistStrategy; + +protected int nimbusMonitorFreqSecs; + +protected Map> cachedSupervisors; + +//key is supervisor key ,value is supervisor ports +protected CircularBuffer>> badSupervisorsTolerance; +protected Set blacklistHost; + +public BlacklistScheduler(IScheduler underlyingScheduler) { +this.underlyingScheduler = underlyingScheduler; +} + +@Override +public void prepare(Map conf) { +LOG.info("prepare black list scheduler"); +LOG.info(conf.toString()); +underlyingScheduler.prepare(conf); +_conf = conf; +if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) { +toleranceTime = (Integer) _conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME); +} +if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) { +toleranceCount = (Integer) _conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT); +} +if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_RESUME_TIME)) { +resumeTime = (Integer) _conf.get(Config.BLACKLIST_SCHEDULER_RESUME_TIME); +} +String reporterClassName = _conf.containsKey(Config.BLACKLIST_SCHEDULER_REPORTER) ? (String) _conf.get(Config.BLACKLIST_SCHEDULER_REPORTER) : ""; --- End diff -- Can we set the default to "org.apache.storm.scheduler.blacklist.reporters.LogReporter" instead of "". It makes it just work for the unit tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94678615 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java --- @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist.strategies; + +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.blacklist.reporters.IReporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +public class DefaultBlacklistStrategy implements IBlacklistStrategy { + +private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class); + +private IReporter _reporter; + +private int _toleranceTime; +private int _toleranceCount; +private int _resumeTime; +private int _nimbusMonitorFreqSecs; + +private TreeMap blacklist; + +@Override +public void prepare(IReporter reporter, int toleranceTime, int toleranceCount, int resumeTime, int nimbusMonitorFreqSecs) { --- End diff -- Could we have conf passed in here too, or not make this a plugin? I know we don't need/use it now, but if we want this to be user pluggable we should think about what other things people might want to do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94677722 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/CircularBuffer.java --- @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import java.io.Serializable; +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public final class CircularBuffer extends AbstractCollection implements Serializable { --- End diff -- I am a little confused why we need all of this. It feels like we could do all of this with an ArrayBlockingQueue. ``` public final class CircularBuffer extends ArrayBlockingQueue { @Override public boolean add(T obj) { while (!offer(obj)) { poll(); } } public List toList() { return new ArrayList<>(this); } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94675703 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/CircularBuffer.java --- @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import java.io.Serializable; +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public final class CircularBuffer extends AbstractCollection implements Serializable { + +// This is the largest capacity allowed by this implementation +private static final int MAX_CAPACITY = 1 << 30; + +private int size = 0; +private int producerIndex = 0; +private int consumerIndex = 0; + +private int capacity; + +private Serializable[] underlying; + +// Construct a buffer which has at least the specified capacity. If +// the value specified is a power of two then the buffer will be +// exactly the specified size. Otherwise the buffer will be the +// first power of two which is greater than the specified value. +public CircularBuffer(int capacity) { + +if (capacity > MAX_CAPACITY) { +throw new IllegalArgumentException("Capacity greater than " + +"allowed"); +} + +this.capacity = capacity; +underlying = new Serializable[this.capacity]; +} + +// Constructor used by clone() +private CircularBuffer(CircularBuffer oldBuffer) { --- End diff -- This shouldn't oldBuffer be `CircularBuffer` or something like that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94668925 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java --- @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import org.apache.storm.Config; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.IScheduler; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.blacklist.reporters.IReporter; +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class BlacklistScheduler implements IScheduler { +private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class); +IScheduler underlyingScheduler; +@SuppressWarnings("rawtypes") +private Map _conf; + +protected int toleranceTime; +protected int toleranceCount; +protected int resumeTime; +protected IReporter reporter; +protected IBlacklistStrategy blacklistStrategy; + +protected int nimbusMonitorFreqSecs; + +protected Map> cachedSupervisors; + +//key is supervisor key ,value is supervisor ports +protected CircularBuffer>> badSupervisorsTolerance; +protected Set blacklistHost; + +public BlacklistScheduler(IScheduler underlyingScheduler) { +this.underlyingScheduler = underlyingScheduler; +} + +@Override +public void prepare(Map conf) { +LOG.info("prepare black list scheduler"); +LOG.info(conf.toString()); --- End diff -- Could we drop this log message, it does not seem to really be needed any more. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94668597 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java --- @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import org.apache.storm.Config; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.IScheduler; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.blacklist.reporters.IReporter; +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class BlacklistScheduler implements IScheduler { +private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class); +IScheduler underlyingScheduler; --- End diff -- I'm not really sure why this is package, nor why it is not final. I don't see a reason to have it be mutable at this point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94678822 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java --- @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist.strategies; + +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.blacklist.CircularBuffer; --- End diff -- I don't think this is used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94668464 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java --- @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import org.apache.storm.Config; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.IScheduler; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.blacklist.reporters.IReporter; +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class BlacklistScheduler implements IScheduler { +private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class); +IScheduler underlyingScheduler; +@SuppressWarnings("rawtypes") +private Map _conf; + +private int toleranceTime; +private int toleranceCount; +private int resumeTime; +private IReporter reporter; +private IBlacklistStrategy blacklistStrategy; + +private int nimbusMonitorFreqSecs; + +private Map> cachedSupervisors; + +//key is supervisor key ,value is supervisor ports +private CircularBuffer>> badSupervisorsTolerance; +private Set blacklistHost; + +public BlacklistScheduler(IScheduler underlyingScheduler) { +this.underlyingScheduler = underlyingScheduler; +} + +@Override +public void prepare(Map conf) { +LOG.info("prepare black list scheduler"); +LOG.info(conf.toString()); +underlyingScheduler.prepare(conf); +_conf = conf; +if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) { +toleranceTime = (Integer) _conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME); +} +if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) { +toleranceCount = (Integer) _conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT); +} +if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_RESUME_TIME)) { +resumeTime = (Integer) _conf.get(Config.BLACKLIST_SCHEDULER_RESUME_TIME); +} +String reporterClassName = _conf.containsKey(Config.BLACKLIST_SCHEDULER_REPORTER) ? (String) _conf.get(Config.BLACKLIST_SCHEDULER_REPORTER) : ""; +try { +reporter = (IReporter) Class.forName(reporterClassName).newInstance(); +} catch (ClassNotFoundException e) { +LOG.error("Can't find blacklist reporter for name {}", reporterClassName); +throw new RuntimeException(e); +} catch (InstantiationException e) { +LOG.error("Throw InstantiationException blacklist reporter for name {}", reporterClassName); +throw new RuntimeException(e); +} catch (IllegalAccessException e) { +LOG.error("Throw illegalAccessException blacklist reporter for name {}", reporterClassName); +throw new RuntimeException(e); +} + +String strategyClassName = _conf.containsKey(Config.BLACKLIST_SCHEDULER_STRATEGY) ? (String) _conf.get(Config.BLACKLIST_SCHEDULER_STRATEGY) : ""; +try { +blacklistStrategy = (IBlacklistStrategy) Class.forName(strategyClassName).newInstance(); +} catch (ClassNotFoundException e) { +
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94677987 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/CircularBuffer.java --- @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import java.io.Serializable; +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public final class CircularBuffer extends AbstractCollection implements Serializable { --- End diff -- And some javadocs explaining what it does any why would be helpful, because even though this is currently implemented as a CircularBuffer it has slightly different behavior from a typical Collection. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94678786 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java --- @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist.strategies; + +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.blacklist.CircularBuffer; +import org.apache.storm.scheduler.blacklist.reporters.IReporter; + +import java.util.HashMap; +import java.util.List; +import java.util.Set; + +public interface IBlacklistStrategy { + +public void prepare(IReporter reporter, int toleranceTime, int toleranceCount, int resumeTime, int nimbusMonitorFreqSecs); + +public Set getBlacklist(List>> toleranceBuffer, Cluster cluster, Topologies topologies); + +public void resumeFromBlacklist(); + +} --- End diff -- Could we have some javadocs explaining how this is used and the life cycle of the strategy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94665652 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java --- @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import org.apache.storm.Config; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.IScheduler; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.blacklist.reporters.IReporter; +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class BlacklistScheduler implements IScheduler { +private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class); +IScheduler underlyingScheduler; +@SuppressWarnings("rawtypes") +private Map _conf; + +protected int toleranceTime; +protected int toleranceCount; +protected int resumeTime; +protected IReporter reporter; +protected IBlacklistStrategy blacklistStrategy; + +protected int nimbusMonitorFreqSecs; + +protected Map> cachedSupervisors; + +//key is supervisor key ,value is supervisor ports +protected CircularBuffer>> badSupervisorsTolerance; +protected Set blacklistHost; + +public BlacklistScheduler(IScheduler underlyingScheduler) { +this.underlyingScheduler = underlyingScheduler; +} + +@Override +public void prepare(Map conf) { +LOG.info("prepare black list scheduler"); +LOG.info(conf.toString()); +underlyingScheduler.prepare(conf); +_conf = conf; +if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) { +toleranceTime = (Integer) _conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME); +} +if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) { +toleranceCount = (Integer) _conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT); +} +if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_RESUME_TIME)) { +resumeTime = (Integer) _conf.get(Config.BLACKLIST_SCHEDULER_RESUME_TIME); +} +String reporterClassName = _conf.containsKey(Config.BLACKLIST_SCHEDULER_REPORTER) ? (String) _conf.get(Config.BLACKLIST_SCHEDULER_REPORTER) : ""; +try { +reporter = (IReporter) Class.forName(reporterClassName).newInstance(); +} catch (ClassNotFoundException e) { +LOG.error("Can't find blacklist reporter for name {}", reporterClassName); +throw new RuntimeException(e); +} catch (InstantiationException e) { +LOG.error("Throw InstantiationException blacklist reporter for name {}", reporterClassName); +throw new RuntimeException(e); +} catch (IllegalAccessException e) { +LOG.error("Throw illegalAccessException blacklist reporter for name {}", reporterClassName); +throw new RuntimeException(e); +} + +String strategyClassName = _conf.containsKey(Config.BLACKLIST_SCHEDULER_STRATEGY) ? (String) _conf.get(Config.BLACKLIST_SCHEDULER_STRATEGY) : ""; --- End diff -- Can we make the default "org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy" instead of "" for the same reasons as
[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1674#discussion_r94675699 --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/blacklist/CircularBuffer.java --- @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import java.io.Serializable; +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public final class CircularBuffer extends AbstractCollection implements Serializable { + +// This is the largest capacity allowed by this implementation +private static final int MAX_CAPACITY = 1 << 30; + +private int size = 0; +private int producerIndex = 0; +private int consumerIndex = 0; + +private int capacity; + +private Serializable[] underlying; --- End diff -- Shouldn't this be an array of type `T` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1854: STORM-2272: don't leak simulated time
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/1854 STORM-2272: don't leak simulated time You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-2272 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1854.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1854 commit 4e0da8f8d4cc0e9b593fdefe6003e24366ac60d0 Author: Robert (Bobby) Evans Date: 2017-01-04T20:40:09Z STORM-2272: don't leak simulated time --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1793: STORM-2214: add in cacheing of the Login
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1793 I also wanted to say we have been running with this in prod for a while now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1793: STORM-2214: add in cacheing of the Login
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1793 @harshach the issue is that when there are lots of supervisors the load placed on the KDC is a lot higher than it is for a similar number of Hadoop nodes. Each new Login will contact the KDC and fetch a new TGT followed by a new service ticket. If we can reuse the Subject then the TGT and the service tickets can be reused too reducing the load on the KDC. The Login handles the life cycle of the Subject, creates it and destroys it, so we wanted to cache it at that level to avoid issues there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1808 @srdo as part of backporting this to 1.x I am going to need to make a change to not use Function directly, because it is only in java 8. So to maintain compatibility between 1.x and 2.x I am going to need to make a few changes in this patch too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1808 @srdo I think I addressed all of your review comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r91618249 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java --- @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.storm.Testing; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.MkTupleParam; +import org.apache.storm.tuple.Tuple; +import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaBoltTest { +private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); + +@SuppressWarnings({ "unchecked", "serial" }) +@Test +public void testSimple() { +final KafkaProducer producer = mock(KafkaProducer.class); +when(producer.send(any(), any())).thenAnswer(new Answer() { +@Override +public Object answer(InvocationOnMock invocation) throws Throwable { +Callback c = (Callback)invocation.getArguments()[1]; +c.onCompletion(null, null); +return null; +} +}); +KafkaBolt bolt = new KafkaBolt() { --- End diff -- In this case because I am subclassing KafkaBolt it the compiler(at least in eclipse) actually complains that I am not allowed to do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r91613603 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java --- @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper; +import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; +import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.Properties; + + +/** + * Bolt implementation that can send Tuple data to Kafka + * + * It expects the producer configuration and topic in storm config under + * + * 'kafka.broker.properties' and 'topic' + * + * respectively. + * + * This bolt uses 0.8.2 Kafka Producer API. + * + * It works for sending tuples to older Kafka version (0.8.1). + */ +public class KafkaBolt extends BaseRichBolt { +private static final long serialVersionUID = -5205886631877033478L; + +private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class); + +public static final String TOPIC = "topic"; + +private KafkaProducer producer; +private OutputCollector collector; +private TupleToKafkaMapper mapper; +private KafkaTopicSelector topicSelector; +private Properties boltSpecfiedProperties = new Properties(); +/** + * With default setting for fireAndForget and async, the callback is called when the sending succeeds. + * By setting fireAndForget true, the send will not wait at all for kafka to ack. + * "acks" setting in 0.8.2 Producer API config doesn't matter if fireAndForget is set. + * By setting async false, synchronous sending is used. + */ +private boolean fireAndForget = false; +private boolean async = true; + +public KafkaBolt() {} + +public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper mapper) { +this.mapper = mapper; +return this; +} + +public KafkaBolt withTopicSelector(String topic) { +return withTopicSelector(new DefaultTopicSelector(topic)); +} + +public KafkaBolt withTopicSelector(KafkaTopicSelector selector) { +this.topicSelector = selector; +return this; +} + +public KafkaBolt withProducerProperties(Properties producerProperties) { +this.boltSpecfiedProperties = producerProperties; +return this; +} + +@Override +public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { +//for backward compatibility. +if(mapper == null) { +this.mapper = new FieldNameBasedTupleToKafkaMapper(); +} + +//for backward compatibility. +if(topicSelector == null) { +if(stormConf.containsKey(TOPIC)) { +this.topicSelector = new DefaultTopicSelector((String) stormConf.get(TOPIC)); +} else { +
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r91592134 --- Diff: docs/storm-kafka-client.md --- @@ -1,90 +1,222 @@ -#Storm Kafka Spout with New Kafka Consumer API +#Storm Kafka integration using the kafka-client jar -Apache Storm Spout implementation to consume data from Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version Compatibility] (#compatibility)). +##Compatability -The Apache Storm Spout allows clients to consume data from Kafka starting at offsets as defined by the offset strategy specified in `FirstPollOffsetStrategy`. -In case of failure, the Kafka Spout will re-start consuming messages from the offset that matches the chosen `FirstPollOffsetStrategy`. +Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version Compatibility] (#compatibility)). -The Kafka Spout implementation allows you to specify the stream (`KafkaSpoutStream`) associated with each topic or topic wildcard. `KafkaSpoutStream` represents the stream and output fields. For named topics use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use `KafkaSpoutStreamsWildcardTopics`. +##Writing to Kafka as part of your topology +You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you +are using trident you can use org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and +org.apache.storm.kafka.trident.TridentKafkaUpdater. -The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s from `ConsumerRecord`s. The logic is provided by the user through implementing the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use `KafkaSpoutTuplesBuilderWildcardTopics`. +You need to provide implementations for the following 2 interfaces -Multiple topics and topic wildcards can use the same `KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s from `ConsumerRecord`s is identical. +###TupleToKafkaMapper and TridentTupleToKafkaMapper +These interfaces have 2 methods defined: +```java +K getKeyFromTuple(Tuple/TridentTuple tuple); +V getMessageFromTuple(Tuple/TridentTuple tuple); +``` + +As the name suggests, these methods are called to map a tuple to a Kafka key and a Kafka message. If you just want one field +as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java +implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you +use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility +reasons. Alternatively you could also specify a different key and message field by using the non default constructor. +In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. +These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper. + +###KafkaTopicSelector and trident KafkaTopicSelector +This interface has only one method +```java +public interface KafkaTopicSelector { +String getTopics(Tuple/TridentTuple tuple); +} +``` +The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published +You can return a null and the message will be ignored. If you have one static topic name then you can use +DefaultTopicSelector.java and set the name of the topic in the constructor. +`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support decided which topic should to push message from tuple. +User could specify the field name or field index in tuple ,selector will use this value as topic name which to publish message. +When the topic name not found , `KafkaBolt` will write messages into default topic . +Please make sure the default topic have created . + +### Specifying Kafka producer properties +You can provide all the produce properties in your Storm topology by calling `KafkaBolt.withProducerProperties()` and `TridentKafkaStateFactory.withProducerProperties()`. Please see http://kafka.apache.org/documentation.html#newproducerconfigs +Section "Important configuration properties for the producer" for more details. + +###Using wildcard kafka topic match +You can do a wildcard topic match by adding the following config +``` + Config config = new Config(); + config.put("kafka.topic.wildcard.match",true); -# Usage Examples +``` -### Create a Kafka Spo
[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r91592178 --- Diff: docs/storm-kafka-client.md --- @@ -1,90 +1,222 @@ -#Storm Kafka Spout with New Kafka Consumer API +#Storm Kafka integration using the kafka-client jar -Apache Storm Spout implementation to consume data from Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version Compatibility] (#compatibility)). +##Compatability -The Apache Storm Spout allows clients to consume data from Kafka starting at offsets as defined by the offset strategy specified in `FirstPollOffsetStrategy`. -In case of failure, the Kafka Spout will re-start consuming messages from the offset that matches the chosen `FirstPollOffsetStrategy`. +Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version Compatibility] (#compatibility)). -The Kafka Spout implementation allows you to specify the stream (`KafkaSpoutStream`) associated with each topic or topic wildcard. `KafkaSpoutStream` represents the stream and output fields. For named topics use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use `KafkaSpoutStreamsWildcardTopics`. +##Writing to Kafka as part of your topology +You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you +are using trident you can use org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and +org.apache.storm.kafka.trident.TridentKafkaUpdater. -The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s from `ConsumerRecord`s. The logic is provided by the user through implementing the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use `KafkaSpoutTuplesBuilderWildcardTopics`. +You need to provide implementations for the following 2 interfaces -Multiple topics and topic wildcards can use the same `KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s from `ConsumerRecord`s is identical. +###TupleToKafkaMapper and TridentTupleToKafkaMapper +These interfaces have 2 methods defined: +```java +K getKeyFromTuple(Tuple/TridentTuple tuple); +V getMessageFromTuple(Tuple/TridentTuple tuple); +``` + +As the name suggests, these methods are called to map a tuple to a Kafka key and a Kafka message. If you just want one field +as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java +implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you +use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility +reasons. Alternatively you could also specify a different key and message field by using the non default constructor. +In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. +These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper. + +###KafkaTopicSelector and trident KafkaTopicSelector +This interface has only one method +```java +public interface KafkaTopicSelector { +String getTopics(Tuple/TridentTuple tuple); +} +``` +The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published +You can return a null and the message will be ignored. If you have one static topic name then you can use +DefaultTopicSelector.java and set the name of the topic in the constructor. +`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support decided which topic should to push message from tuple. +User could specify the field name or field index in tuple ,selector will use this value as topic name which to publish message. +When the topic name not found , `KafkaBolt` will write messages into default topic . +Please make sure the default topic have created . + +### Specifying Kafka producer properties +You can provide all the produce properties in your Storm topology by calling `KafkaBolt.withProducerProperties()` and `TridentKafkaStateFactory.withProducerProperties()`. Please see http://kafka.apache.org/documentation.html#newproducerconfigs +Section "Important configuration properties for the producer" for more details. + +###Using wildcard kafka topic match +You can do a wildcard topic match by adding the following config +``` + Config config = new Config(); + config.put("kafka.topic.wildcard.match",true); -# Usage Examples +``` -### Create a Kafka Spo
[GitHub] storm issue #1818: STORM-2104 1.x
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1818 +1 pending travis (and the waiting period) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1696 This looks good to me. Now that I have gone through the kafka spout code for my other pull request I am confident in giving this a +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1696: STORM-2104: More graceful handling of acked/failed...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1696#discussion_r91569248 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java --- @@ -0,0 +1,25 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.io.Serializable; +import org.apache.kafka.common.serialization.Deserializer; + +/** + * @param The type this deserializer deserializes to. + */ +public interface SerializableDeserializer extends Deserializer, Serializable { --- End diff -- Good point. I think I am going to need to fix that on my patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1808 @srdo for me backwards compatibility for 1.x is more a question of violating out versioning than anything else. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1800 @ptgoetz I have updated the packaging to have a separate directory for the DRPC server dependencies. I have run manual tests and everything works. The big difference is that {{apache-storm-2.0.0-SNAPSHOT.tar.gz}} and {{apache-storm-2.0.0-SNAPSHOT.zip}} are now under {{./final-package/target/}} instead of {{./target}} I will try and look to see if there are any docs that I need to update around this, but I wanted to give everyone a chance to look and give feedback on it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1800 One more option, still ugly, but with a lot less impact. I can do something similar to Hadoop. They use several different invocations of the maven assembly plugin into directories (predates moduleSets) and then have a separate script that produces the final release. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1800 So shading is not really an option for jersey. I was able to split the DRPC server off into its own package with tests, but packaging it up with the assembly plugin is proving to be difficult. If I upgrade to 3.0.0 of the assembly plugin I can use moduleSets and make it work, but I have to change {{storm-dist/binary}} to a multi-module build and move the code that actually packages the final release to a sub package under it. Oh and we leak two empty/useless jar files into the release package that should be ignored. moduleSets do not package pom modules. They require an artifact that is a file. I really dislike all of these options. I see a few cleaner options, but they will require a lot more work. 1. Move to gradle 2. Change how we do shading so that the assembly subModule code works the way they intended it. This would involve essentially having a separate package/build for what we want shaded (i.e. storm-shaded-deps.jar) If someone has a cleaner solution I am happy to do it, but I think option 2 is the best so far, although I don't like it all that much. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1764: STORM-2190: reduce contention between submission and sche...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1764 @HeartSaVioR I plan on doing that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1800 Shading Jersey is becoming rather difficult (lots of dependencies including aop and dependency injection. Splitting the DPRC server off into it's own location seems much simpler and less error prone, so I will spend some time on that instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1764: STORM-2190: reduce contention between submission and sche...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1764 @ptgoetz @jerrypeng I made a few changes to thing. I fixed the race condition and I addressed the review comments, but I also put in some optimizations to storm submitter. We were literally calling getClusterInfo 3+ times for each topology submission, and because the ultimate goal of STORM-2190 is to make it more scalable this helps a lot. There is still some lock contention, but it is much better then it was before. If things look good here I will backport the changes to my other pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1767: STORM-2194: Report error and die, not report error or die
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1767 We should be able to fix this with code like. ``` (if (or (exception-cause? InterruptedException error) (and (exception-cause? java.io.InterruptedIOException error) (not (exception-cause? java.net.SocketTimeoutException ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1767: STORM-2194: Report error and die, not report error or die
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1767 @chawco Okay so I understand the issue better now. SocketTimeoutException is a subclass of InterruptedIOException. https://docs.oracle.com/javase/7/docs/api/java/net/SocketTimeoutException.html I could argue that it is a mistake on the part of java and that it is wrong, but that is already set in stone so we have to deal with it. I see two options. 1) We can treat a SocketTimeoutException differently from other InterruptedIOExceptions, 2) or we can just treat all InterruptedIOExceptions as fatal. We started ignoring InterruptedIOExceptions because we would occasionally run into them in the supervisor or nimbus local cluster tests and that would fail everything. Having proper behavior is more important than having super stable unit tests, but if we can have both (option 1) I think that would be best. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1800 @ptgoetz if this looks good as is I will look into shading Jersey. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1764: STORM-2190: reduce contention between submission and sche...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/1764 Just so we don't miss the comment from @jerrypeng > Couldn't a wrong ordering of events happen since we are locking when calculating a scheduling then unlocking and then locking and uploading the new scheduling and unlocking > for example: > T0: submit > T1: rebalance > T2: rebalance - calculate new scheduling > T3: submit - calculate new scheduling > T4: rebalance - upload new scheduling to zk > T5: submit - upload new scheduling to zk > > even though we should end up with the scheduling calculated by the rebalance but we end up with scheduling calculated from the original submit. Yes, that is correct. We should do something here, and he suggested that perhaps as part of a refactor of Nimbus we should look at support for long running scheduling. In the short term I think I might make scheduling and writing to ZK atomic, but long term I think I will file a JIRA to look at better scheduling. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1764: STORM-2190: reduce contention between submission a...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1764#discussion_r90542812 --- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj --- @@ -1008,23 +1008,24 @@ (reset! (:id->worker-resources nimbus) {})) ;; tasks figure out what tasks to talk to by looking at topology at runtime ;; only log/set when there's been a change to the assignment -(doseq [[topology-id assignment] new-assignments -:let [existing-assignment (get existing-assignments topology-id) - topology-details (.getById topologies topology-id)]] - (if (= existing-assignment assignment) -(log-debug "Assignment for " topology-id " hasn't changed") -(do - (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment)) - (.setAssignment storm-cluster-state topology-id (thriftify-assignment assignment)) - ))) -(->> new-assignments - (map (fn [[topology-id assignment]] -(let [existing-assignment (get existing-assignments topology-id)] - [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))] - ))) - (into {}) - (.assignSlots inimbus topologies))) -(log-message "not a leader, skipping assignments"))) +(locking (:sched-lock nimbus) --- End diff -- I agree and now that nimbus is in java we can look at doing some refactoring along those lines. If you feel that we need to do it now and that this is a blocker I can spend some time looking into how to do that better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1764: STORM-2190: reduce contention between submission a...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1764#discussion_r90538639 --- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj --- @@ -1008,23 +1008,24 @@ (reset! (:id->worker-resources nimbus) {})) ;; tasks figure out what tasks to talk to by looking at topology at runtime ;; only log/set when there's been a change to the assignment -(doseq [[topology-id assignment] new-assignments -:let [existing-assignment (get existing-assignments topology-id) - topology-details (.getById topologies topology-id)]] - (if (= existing-assignment assignment) -(log-debug "Assignment for " topology-id " hasn't changed") -(do - (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment)) - (.setAssignment storm-cluster-state topology-id (thriftify-assignment assignment)) - ))) -(->> new-assignments - (map (fn [[topology-id assignment]] -(let [existing-assignment (get existing-assignments topology-id)] - [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))] - ))) - (into {}) - (.assignSlots inimbus topologies))) -(log-message "not a leader, skipping assignments"))) +(locking (:sched-lock nimbus) --- End diff -- @jerrypeng You are correct that this could happen. I don't really think it will be that likely to happen in practice but I'll think about it and see if we can fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---