[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96925159
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
 ---
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+String getTopic(Tuple tuple);
--- End diff --

File another JIRA and we can look into it.  I think it is beyond the scope 
of this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96925279
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
 ---
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class FieldNameBasedTupleToKafkaMapper implements 
TridentTupleToKafkaMapper {
+
+public final String keyFieldName;
+public final String msgFieldName;
+
+public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String 
msgFieldName) {
+this.keyFieldName = keyFieldName;
+this.msgFieldName = msgFieldName;
+}
+
+@Override
+public K getKeyFromTuple(TridentTuple tuple) {
+return (K) tuple.getValueByField(keyFieldName);
--- End diff --

It is possible but that is the point of the generics.  To try and reduce 
the likelihood of it happening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96924990
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState implements State {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaState.class);
+
+private KafkaProducer producer;
+private OutputCollector collector;
+
+private TridentTupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+
+public TridentKafkaState 
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector 
selector) {
+this.topicSelector = selector;
+return this;
+}
+
+@Override
+public void beginCommit(Long txid) {
+LOG.debug("beginCommit is Noop.");
+}
+
+@Override
+public void commit(Long txid) {
+LOG.debug("commit is Noop.");
+}
+
+public void prepare(Properties options) {
+if (mapper == null) throw new NullPointerException("mapper can not 
be null");
--- End diff --

Objects does not exist in java 6 and I would prefer to keep the code 
compatible as mush as possible to avoid extra rework when pulling these changes 
back.  If you insist I will do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96923940
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState implements State {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaState.class);
+
+private KafkaProducer producer;
+private OutputCollector collector;
+
+private TridentTupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+
+public TridentKafkaState 
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
--- End diff --

This is code that was "moved" like with the KafkaBolt for storm-kafka to 
storm-kafka-client.  If we really want to make it immutable we can, but I think 
that is beyond the scope of this JIRA  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96923481
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
 ---
@@ -44,29 +43,30 @@
 // Bookkeeping
 private final KafkaSpoutConfig kafkaSpoutConfig;
 // Declare some KafkaSpoutConfig references for convenience
-private KafkaSpoutStreams kafkaSpoutStreams;// Object 
that wraps all the logic to declare output fields and emit tuples
-private KafkaSpoutTuplesBuilder tuplesBuilder;// Object 
that contains the logic to build tuples for each ConsumerRecord
+private final Fields fields;
 
 public KafkaTridentSpoutManager(KafkaSpoutConfig 
kafkaSpoutConfig) {
 this.kafkaSpoutConfig = kafkaSpoutConfig;
-kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
-tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
+RecordTranslator translator = 
kafkaSpoutConfig.getTranslator();
+Fields fields = null;
--- End diff --

I'm a bit confused about what benefit that would give.  The Fields come 
from the SpoutConfig, by way of the RecordTranslator.  Why would you want to 
override the Fields but not the RecordTranslator that must conform to those 
fields?  I just don't see value it separating two things that are highly 
coupled together.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96911529
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
 ---
@@ -15,14 +15,26 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+package org.apache.storm.kafka.spout;
 
-package org.apache.storm.kafka.spout.internal.partition;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
 
-import java.util.List;
+public class RoundRobinManualPartitioner implements ManualPartitioner {
 
-public interface KafkaPartitionReader {
-List readPartitions(KafkaConsumer consumer);
+   @Override
+   public List partition(List 
allPartitions, TopologyContext context) {
+   int thisTaskIndex = context.getThisTaskIndex();
+   int totalTaskCount = 
context.getComponentTasks(context.getThisComponentId()).size();
+   Set myPartitions = new 
HashSet<>(allPartitions.size()/totalTaskCount+1);
+   for (int i = thisTaskIndex; i < allPartitions.size(); i += 
totalTaskCount) {
--- End diff --

No.  `i < allPartitions.size()` guarantees that we will never call get on 
allPartitions with an index that is out of bounds.  The Set is just setting the 
initial size to avoid more memory allocation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96909498
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
 ---
@@ -15,22 +15,33 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
-
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.List;
-
-public class KafkaSpoutTuplesBuilderWildcardTopics implements 
KafkaSpoutTuplesBuilder {
-private KafkaSpoutTupleBuilder tupleBuilder;
+import org.apache.storm.tuple.Values;
 
-public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder tupleBuilder) {
-this.tupleBuilder = tupleBuilder;
+/**
+ * A list of Values in a tuple that can be routed 
+ * to a given stream.
+ */
+public class KafkaTuple extends Values {
+private static final long serialVersionUID = 4803794470450587992L;
+private String stream = null;
+
+public KafkaTuple() {
+super();
+}
+
+public KafkaTuple(Object... vals) {
+super(vals);
+}
+
+public KafkaTuple routedTo(String stream) {
--- End diff --

This is because the constructor is varadic following the Values parent 
class. It is ambiguous to have

```
public KafkaTuple(Object... vals)
```

and any other constructor.  Java complains.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96908491
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscribe to all topics that follow a given list of values
+ */
+public class NamedSubscription extends Subscription {
+private static final Logger LOG = 
LoggerFactory.getLogger(NamedSubscription.class);
+private static final long serialVersionUID = 3438543305215813839L;
+protected final Collection topics;
+
+public NamedSubscription(Collection topics) {
+super();
+this.topics = Collections.unmodifiableCollection(new 
ArrayList<>(topics));
+}
+
+public NamedSubscription(String ... topics) {
+this(Arrays.asList(topics));
+}
+
+@Override
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener, TopologyContext unused) {
+consumer.subscribe(topics, listener);
+LOG.info("Kafka consumer subscribed topics {}", topics);
+}
+
+@Override
+public String getTopicsString() {
--- End diff --

It is in the parent javadocs.  That is around the override


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96908178
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
 ---
@@ -1,16 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.common.TopicPartition;
-
 import java.util.Comparator;
 
+import org.apache.kafka.common.TopicPartition;
+
 public class TopicPartitionComparator implements 
Comparator {
-@Override
-public int compare(TopicPartition o1, TopicPartition o2) {
-if (!o1.topic().equals(o2.topic())) {
-return o1.topic().compareTo(o2.topic());
-} else {
-return o1.partition() - o2.partition();
-}
-}
+   public static final TopicPartitionComparator INSTANCE = new 
TopicPartitionComparator();
--- End diff --

I'm not sure we need to guarantee it.  The old code had the INSTANCE where 
it was used, I moved it here in hopes that others might use it.  I could make 
the constructor private if we really want it to be a singleton, but I don't 
think it is a requirement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96907232
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -325,15 +310,19 @@ private boolean 
emitTupleIfNotEmitted(ConsumerRecord record) {
 } else {
 boolean isScheduled = retryService.isScheduled(msgId);
 if (!isScheduled || retryService.isReady(msgId)) {   // not 
scheduled <=> never failed (i.e. never emitted) or ready to be retried
-final List tuple = 
tuplesBuilder.buildTuple(record);
-kafkaSpoutStreams.emit(collector, tuple, msgId);
+final List tuple = 
kafkaSpoutConfig.getTranslator().apply(record);
+if (tuple instanceof KafkaTuple) {
--- End diff --

Yes, and it is documented in RecordTranslator.  I will add in more 
documentation on it though.

The reason specifically for this was because the spout is not able to keep 
track of a single message being emitted to multiple streams.  It would get 
confused and ack it before it was truly done.  This makes it impossible for 
that to happen.  What is more the built in record translators should cover 99% 
of the use cases, so the fact that it is not super well documented should be 
more of a corner case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96897291
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -325,15 +310,19 @@ private boolean 
emitTupleIfNotEmitted(ConsumerRecord record) {
 } else {
 boolean isScheduled = retryService.isScheduled(msgId);
 if (!isScheduled || retryService.isReady(msgId)) {   // not 
scheduled <=> never failed (i.e. never emitted) or ready to be retried
-final List tuple = 
tuplesBuilder.buildTuple(record);
-kafkaSpoutStreams.emit(collector, tuple, msgId);
+final List tuple = 
kafkaSpoutConfig.getTranslator().apply(record);
+if (tuple instanceof KafkaTuple) {
--- End diff --

The javadocs for RecordTranslator state.
```
 * @return the objects in the tuple.  Return a {@link KafkaTuple}
 * if you want to route the tuple to a non-default stream
```

All of the provided implementations support this.  Both Simple and ByTopic, 
by way of the SimpleRecordTranslator.

I will add a section in the docs to talk about this, but it only really 
matters if you are writing your own record translator from scratch, instead of 
using the built in ones that should cover the vast majority of the use cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96895148
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A subscription to kafka.
+ */
+public abstract class Subscription implements Serializable {
+private static final long serialVersionUID = -216136367240198716L;
+
+/**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the 
subscription
+ */
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener, TopologyContext context) {
+   subscribe(consumer, listener);
+}
+
+/**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the 
subscription
+ * @deprecated please use the version with the TopologyContext in it
+ */
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener) {
--- End diff --

I didn't feel good about deprecating it in the first place and will remove 
it.  It just means that I will also pull in the changes I made for STORM-2236 
back to 1.1 too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96894587
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+public class ByTopicRecordTranslator implements RecordTranslator {
+private static final long serialVersionUID = -121699733778988688L;
+private final RecordTranslator defaultTranslator;
+private final Map> topicToTranslator = 
new HashMap<>();
+private final Map streamToFields = new HashMap<>();
+
+public ByTopicRecordTranslator(Func, 
List> func, Fields fields, String stream) {
+this(new SimpleRecordTranslator<>(func, fields, stream));
+}
+
+public ByTopicRecordTranslator(Func, 
List> func, Fields fields) {
+this(new SimpleRecordTranslator<>(func, fields));
+}
+
+public ByTopicRecordTranslator(RecordTranslator 
defaultTranslator) {
+this.defaultTranslator = defaultTranslator;
+cacheNCheckFields(defaultTranslator);
+}
+
+public ByTopicRecordTranslator forTopic(String topic, 
Func, List> func, Fields fields) {
+return forTopic(topic, new SimpleRecordTranslator<>(func, fields));
+}
+
+public ByTopicRecordTranslator forTopic(String topic, 
Func, List> func, Fields fields, String stream) {
+return forTopic(topic, new SimpleRecordTranslator<>(func, fields, 
stream));
+}
+
+public ByTopicRecordTranslator forTopic(String topic, 
RecordTranslator translator) {
+if (topicToTranslator.containsKey(topic)) {
+throw new IllegalStateException("Topic " + topic + " is 
already registered");
+}
+topicToTranslator.put(topic, translator);
+cacheNCheckFields(translator);
+return this;
+}
+
+private void cacheNCheckFields(RecordTranslator translator) {
+for (String stream : translator.streams()) {
+Fields fromTrans = translator.getFieldsFor(stream);
+Fields cached = streamToFields.get(stream);
+if (cached != null && !fromTrans.equals(cached)) {
+throw new IllegalArgumentException("Stream " + stream + " 
currently has Fields of " + cached + " which is not the same as those being 
added in " + fromTrans);
--- End diff --

In this case it is the argument being passed in that is bad and we are 
rejecting it.  They could come back and switch it to a new stream which would 
be fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96893867
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+public class ByTopicRecordTranslator implements RecordTranslator {
+private static final long serialVersionUID = -121699733778988688L;
+private final RecordTranslator defaultTranslator;
+private final Map> topicToTranslator = 
new HashMap<>();
+private final Map streamToFields = new HashMap<>();
+
+public ByTopicRecordTranslator(Func, 
List> func, Fields fields, String stream) {
+this(new SimpleRecordTranslator<>(func, fields, stream));
+}
+
+public ByTopicRecordTranslator(Func, 
List> func, Fields fields) {
+this(new SimpleRecordTranslator<>(func, fields));
+}
+
+public ByTopicRecordTranslator(RecordTranslator 
defaultTranslator) {
+this.defaultTranslator = defaultTranslator;
+cacheNCheckFields(defaultTranslator);
+}
+
+public ByTopicRecordTranslator forTopic(String topic, 
Func, List> func, Fields fields) {
+return forTopic(topic, new SimpleRecordTranslator<>(func, fields));
+}
+
+public ByTopicRecordTranslator forTopic(String topic, 
Func, List> func, Fields fields, String stream) {
+return forTopic(topic, new SimpleRecordTranslator<>(func, fields, 
stream));
+}
+
+public ByTopicRecordTranslator forTopic(String topic, 
RecordTranslator translator) {
+if (topicToTranslator.containsKey(topic)) {
+throw new IllegalStateException("Topic " + topic + " is 
already registered");
+}
+topicToTranslator.put(topic, translator);
+cacheNCheckFields(translator);
+return this;
+}
+
+private void cacheNCheckFields(RecordTranslator translator) {
+for (String stream : translator.streams()) {
+Fields fromTrans = translator.getFieldsFor(stream);
+Fields cached = streamToFields.get(stream);
+if (cached != null && !fromTrans.equals(cached)) {
--- End diff --

```
ByTopicRecordTranslator trans = new ByTopicRecordTranslator((rec) -> 
Arrays.asList(rec.offset()), new Fields("offset"), "default");
trans.forTopic("specialTopic", (rec) -> Arrays.asList(rec.offset(), 
rec.message()), new Fields("offset", "message"), "default");
```
At this point we have tried to declare that the "default" stream has Fields 
["offset"], and ["offset", "message"]  This is not supported by storm so we 
should not allow anyone to configure the spout to do this.

streamToFields is not yet updated for the new translator we are adding it 
yet.  We do it after we have verified that the Fields match for anything we 
have done already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96891568
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
 ---
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field with a given index to select the topic name from a tuple .
+ */
+public class FieldIndexTopicSelector implements KafkaTopicSelector {
+private static final long serialVersionUID = -3830575380208166367L;
--- End diff --

Because I missed them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96891946
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
 ---
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field name to select topic name from tuple .
+ */
+public class FieldNameTopicSelector implements KafkaTopicSelector {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FieldNameTopicSelector.class);
+
+private final String fieldName;
+private final String defaultTopicName;
+
+
+public FieldNameTopicSelector(String fieldName, String 
defaultTopicName) {
+this.fieldName = fieldName;
+this.defaultTopicName = defaultTopicName;
+}
+
+@Override
+public String getTopic(Tuple tuple) {
--- End diff --

I agree but both for backwards compatibility and to limit the scope of this 
JIRA I would rather see it done in a follow on JIRA


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96889563
  
--- Diff: 
external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java ---
@@ -51,7 +51,9 @@
  * This bolt uses 0.8.2 Kafka Producer API.
  * 
  * It works for sending tuples to older Kafka version (0.8.1).
+ * @deprecated Please use the KafkaBolt in storm-kafka-client
  */
+@Deprecated
--- End diff --

I would rather do that in a follow on JIRA.  Most of the code is the same 
between the two so getting the rework from here to make there is a lot simpler 
if I can do a cherry pick


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96889298
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A subscription to kafka.
+ */
+public abstract class Subscription implements Serializable {
+private static final long serialVersionUID = -216136367240198716L;
+
+/**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the 
subscription
+ */
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener, TopologyContext context) {
+   subscribe(consumer, listener);
+}
+
+/**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the 
subscription
+ * @deprecated please use the version with the TopologyContext in it
+ */
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener) {
+   throw new IllegalStateException("At least one subscribe method 
must be overwritten");
+}
+
+/**
+ * @return a string representing the subscribed topics.
--- End diff --

I didn't know either, but the only place I can see it used is in logging so 
that is what I did.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96887342
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -61,129 +66,244 @@
  * If no offset has been committed, it behaves as LATEST.
  * 
  * */
-public enum FirstPollOffsetStrategy {
+public static enum FirstPollOffsetStrategy {
 EARLIEST,
 LATEST,
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
-
-// Kafka consumer configuration
-private final Map kafkaProps;
-private final Deserializer keyDeserializer;
-private final Deserializer valueDeserializer;
-private final long pollTimeoutMs;
-
-// Kafka spout configuration
-private final long offsetCommitPeriodMs;
-private final int maxRetries;
-private final int maxUncommittedOffsets;
-private final long partitionRefreshPeriodMs;
-private final boolean manualPartitionAssignment;
-private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-private final KafkaSpoutStreams kafkaSpoutStreams;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-private KafkaSpoutConfig(Builder builder) {
-this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-this.keyDeserializer = builder.keyDeserializer;
-this.valueDeserializer = builder.valueDeserializer;
-this.pollTimeoutMs = builder.pollTimeoutMs;
-this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-this.maxRetries = builder.maxRetries;
-this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
-this.manualPartitionAssignment = builder.manualPartitionAssignment;
-this.tuplesBuilder = builder.tuplesBuilder;
-this.retryService = builder.retryService;
+
+public static Builder builder(String bootstrapServers, 
String ... topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
 }
-
-private Map setDefaultsAndGetKafkaProps(Map kafkaProps) {
+
+public static Builder builder(String bootstrapServers, 
Collection topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+public static Builder builder(String bootstrapServers, 
Pattern topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+private static Map 
setDefaultsAndGetKafkaProps(Map kafkaProps) {
 // set defaults for properties not specified
-if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+if 
(!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
 }
 return kafkaProps;
 }
-
+
 public static class Builder {
 private final Map kafkaProps;
-private SerializableDeserializer keyDeserializer;
-private SerializableDeserializer valueDeserializer;
+private Subscription subscription;
+private final SerializableDeserializer keyDes;
+private final Class> keyDesClazz;
+private final SerializableDeserializer valueDes;
+private final Class> valueDesClazz;
+private RecordTranslator translator;
 private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
 private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
 private int maxRetries = DEFAULT_MAX_RETRIES;
 private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-private final KafkaSpoutStreams kafkaSpoutStreams;
 private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
+private KafkaSpoutRetryService retryService = 
DEFAULT_RETRY_SERVICE;
 private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
-private boolean manualPartitionAssignment = false;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-/*

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96886556
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -61,129 +66,244 @@
  * If no offset has been committed, it behaves as LATEST.
  * 
  * */
-public enum FirstPollOffsetStrategy {
+public static enum FirstPollOffsetStrategy {
 EARLIEST,
 LATEST,
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
-
-// Kafka consumer configuration
-private final Map kafkaProps;
-private final Deserializer keyDeserializer;
-private final Deserializer valueDeserializer;
-private final long pollTimeoutMs;
-
-// Kafka spout configuration
-private final long offsetCommitPeriodMs;
-private final int maxRetries;
-private final int maxUncommittedOffsets;
-private final long partitionRefreshPeriodMs;
-private final boolean manualPartitionAssignment;
-private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-private final KafkaSpoutStreams kafkaSpoutStreams;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-private KafkaSpoutConfig(Builder builder) {
-this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-this.keyDeserializer = builder.keyDeserializer;
-this.valueDeserializer = builder.valueDeserializer;
-this.pollTimeoutMs = builder.pollTimeoutMs;
-this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-this.maxRetries = builder.maxRetries;
-this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
-this.manualPartitionAssignment = builder.manualPartitionAssignment;
-this.tuplesBuilder = builder.tuplesBuilder;
-this.retryService = builder.retryService;
+
+public static Builder builder(String bootstrapServers, 
String ... topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
 }
-
-private Map setDefaultsAndGetKafkaProps(Map kafkaProps) {
+
+public static Builder builder(String bootstrapServers, 
Collection topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+public static Builder builder(String bootstrapServers, 
Pattern topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+private static Map 
setDefaultsAndGetKafkaProps(Map kafkaProps) {
 // set defaults for properties not specified
-if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+if 
(!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
 }
 return kafkaProps;
 }
-
+
 public static class Builder {
 private final Map kafkaProps;
-private SerializableDeserializer keyDeserializer;
-private SerializableDeserializer valueDeserializer;
+private Subscription subscription;
+private final SerializableDeserializer keyDes;
+private final Class> keyDesClazz;
+private final SerializableDeserializer valueDes;
+private final Class> valueDesClazz;
+private RecordTranslator translator;
 private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
 private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
 private int maxRetries = DEFAULT_MAX_RETRIES;
 private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-private final KafkaSpoutStreams kafkaSpoutStreams;
 private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
+private KafkaSpoutRetryService retryService = 
DEFAULT_RETRY_SERVICE;
 private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
-private boolean manualPartitionAssignment = false;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-/*

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96884451
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -225,11 +213,7 @@ public void nextTuple() {
 }
 
 if (poll()) {
-try {
-setWaitingToEmit(pollKafkaBroker());
-} catch (RetriableException e) {
-LOG.error("Failed to poll from kafka.", e);
-}
+setWaitingToEmit(pollKafkaBroker());
--- End diff --

That's right it was a merge error.  Great catch though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96879498
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw new IllegalArgumentException("topic should be 
specified in bolt's configuration");
+}
+}
+
+producer = mkProducer(boltSpecfiedProperties);
+this.collector = collector;
+}
+
+/**
+ * Intended to be overridden for tests.  Make the producer from props
--- End diff --

No sorry wrong place for that comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHu

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96879307
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw new IllegalArgumentException("topic should be 
specified in bolt's configuration");
+}
+}
+
+producer = mkProducer(boltSpecfiedProperties);
+this.collector = collector;
+}
+
+/**
+ * Intended to be overridden for tests.  Make the producer from props
--- End diff --

backwards compatibility


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If you

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96879248
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw new IllegalArgumentException("topic should be 
specified in bolt's configuration");
+}
+}
+
+producer = mkProducer(boltSpecfiedProperties);
+this.collector = collector;
+}
+
+/**
+ * Intended to be overridden for tests.  Make the producer from props
+ */
+protected KafkaProducer mkProducer(Properties props) {
--- End diff --

backwards compatibility



---
If your project is set up for it, yo

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96877333
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
--- End diff --

I don't want to rename public facing APIs right now, because this is a copy 
of what is in external/storm-kafka.  The other one is deprecated, but I want to 
maintain compatibility if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96877744
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
--- End diff --

This is "moving" existing code so I want to maintain compatibility if 
possible.  But we are doing a copy + deprecation because the two libraries are 
compiled with different versions of Kafka so combining the two libraries in a 
single topology is difficult in some cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96875424
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,232 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka copnsumer API.
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatibility
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing an instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to 
select the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic 
name in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write 
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+These are also defined in 
`org.apache.kafka.clients.producer.ProducerConfig`
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Us

[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1808
  
The test failures are unrelated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1808
  
@srdo @harshach sorry to do this to you, but I just fixed the conflicts 
with STORM-2236.  Sadly the fastest way I could do it was to revert the 
original code and implement similar functionality, which is the latest commit.  
Could you please take a look at it?  I created a few new Subscription 
implementations that can do the manual partition management.  It only needed a 
small change to the Spout to support a timeout.  I will try to look at adding 
some documentation and also the impact to the trident spout. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

2017-01-17 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1868
  
@harshach and @ppoulosk I just pushed fixes for your review comments.  
@harshach you were right I could and did remove StormStringDeserializer.  I 
think this made the code much better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1878: STORM-2295:KafkaSpoutStreamsNamedTopics changing the sequ...

2017-01-17 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1878
  
+1 the change looks good to me.  Great catch @pasalkarsachin1 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....

2017-01-17 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1868#discussion_r96432779
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/StormStringDeserializer.java
 ---
@@ -15,22 +15,11 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
-
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.List;
-
-public class KafkaSpoutTuplesBuilderWildcardTopics implements 
KafkaSpoutTuplesBuilder {
-private KafkaSpoutTupleBuilder tupleBuilder;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
-public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder tupleBuilder) {
-this.tupleBuilder = tupleBuilder;
-}
+public class StormStringDeserializer extends StringDeserializer implements 
SerializableDeserializer {
--- End diff --

Yes we need this even more now.  The Kafka Deserializer (including 
StringDeserializer) is not java serializable.  So if we don't do this on a real 
storm cluster we will get exceptions when we try to write out the spout.

I can look into trying to support some kind of generics like 
```
public  Builder setKey(Class> 
keyDeserializer) {
```
But I really don't know if that works.  I'll try it out and let you know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....

2017-01-11 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1868#discussion_r95682402
  
--- Diff: 
examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
 ---
@@ -18,87 +18,58 @@
 
 package org.apache.storm.kafka.trident;
 
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.AlreadyAliveException;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.kafka.spout.Func;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
+import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics;
-import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager;
 import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-
 public class TridentKafkaClientWordCountNamedTopics {
 private static final String TOPIC_1 = "test-trident";
 private static final String TOPIC_2 = "test-trident-1";
 private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
 
 private KafkaTridentSpoutOpaque 
newKafkaTridentSpoutOpaque() {
-return new KafkaTridentSpoutOpaque<>(new 
KafkaTridentSpoutManager<>(
-newKafkaSpoutConfig(
-newKafkaSpoutStreams(;
+return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig());
 }
 
-private KafkaSpoutConfig 
newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
-return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(),
-kafkaSpoutStreams, newTuplesBuilder(), 
newRetryService())
+   private static Func, List> 
JUST_VALUE_FUNC = new Func, List>() {
+   @Override
+   public List apply(ConsumerRecord 
record) {
+   return new Values(record.value());
+   }
+   };
+
+protected KafkaSpoutConfig newKafkaSpoutConfig() {
+return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPIC_1, TOPIC_2)
--- End diff --

This is an exact translation of the original code. Even down to not using 
KAFKA_LOCAL_BROKER.  If people want me to change it I am happy to, but I 
thought it best to not overreach on the scope of the pull request.  At least 
until the code worked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.

2017-01-11 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1808
  
The test failures are unrelated and are around the integration tests that 
always seem to fail lately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

2017-01-11 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1868
  
The test failures are unrelated and are around the integration tests that 
always seem to fail lately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1862: STORM-2278: Allow max number of disruptor queue flusher t...

2017-01-09 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1862
  
@HeartSaVioR I rebased and I think I addressed your review comments.  
Please have a look and see if it is what you wanted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.

2017-01-09 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1808
  
All outstanding review comments should be done now.  This and the 1.x port 
at #1868 should be ready for a final pass and hopefully being merged in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1868: STORM-2225: change spout config to be simpler. (1.x)

2017-01-09 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1868
  
This is the 1.x version of #1808 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1868: STORM-2225: change spout config to be simpler. (1....

2017-01-09 Thread revans2
GitHub user revans2 opened a pull request:

https://github.com/apache/storm/pull/1868

STORM-2225: change spout config to be simpler. (1.x)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/revans2/incubator-storm STORM-2225-1.x

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1868.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1868


commit 95883ac6f0202367b7a7f47eaa50ddeef824dc27
Author: Robert (Bobby) Evans 
Date:   2016-11-30T03:39:26Z

STORM-1997: copy state/bolt from storm-kafka to storm-kafka-client
STORM-2225: change spout config to be simpler.
STORM-2228: removed ability to request a single topic go to multiple streams

Conflicts:

examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java

external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java

commit 2e041c3af64143157fd2e7a0af419685857e8083
Author: Robert (Bobby) Evans 
Date:   2016-12-08T19:26:49Z

fixed some issues with rebase

Conflicts:

external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java

commit dbf040082357e585bd3aba94ed31d25e8c5f3ea9
Author: Robert (Bobby) Evans 
Date:   2016-12-08T22:10:12Z

addressed review comments

Conflicts:
external/storm-kafka-client/README.md

commit f95fc1670c6c03e27082d59555e26560636d15a9
Author: Robert (Bobby) Evans 
Date:   2016-12-08T22:12:43Z

oops

commit b8e32fceefaa1e71b4e4dec6e67d0a126761e949
Author: Robert (Bobby) Evans 
Date:   2017-01-06T22:37:31Z

STORM-2225: make the core API java7 compatible

commit 38f4ede2899145b4f3b527a746e7cd0999e9bb46
Author: Robert (Bobby) Evans 
Date:   2017-01-06T22:42:31Z

STORM-2225: addressed doc review comments

commit 853c524313cf1375499d3a9ccb0ec5a3509a7ae8
Author: Robert (Bobby) Evans 
Date:   2017-01-09T16:11:41Z

STORM-2225: java7 modifications

commit 89eb16faef264057efebb6eb19f4a8089dd820a9
Author: Robert (Bobby) Evans 
Date:   2017-01-09T16:58:27Z

STORM-2225: Updated docs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1862: STORM-2278: Allow max number of disruptor queue fl...

2017-01-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1862#discussion_r95174977
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
@@ -63,6 +63,18 @@
 private static final String PREFIX = "disruptor-";
 private static final FlusherPool FLUSHER = new FlusherPool();
 
+private static int getNumFlusherPoolThreads() {
+int numThreads = 100;
+try {
+String threads = 
System.getProperty("num_flusher_pool_threads", "100");
--- End diff --

The issue with using a Config for this is that readStormConfig inside the 
worker would read the system config, not the topology config, and would not let 
us override it on a per topology bases.  I will add in a config for the system 
default, but in the documents there indicate the system property that can be 
set to override it for a topology.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1862: STORM-2278: Allow max number of disruptor queue fl...

2017-01-09 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1862#discussion_r95169637
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
@@ -63,6 +63,18 @@
 private static final String PREFIX = "disruptor-";
 private static final FlusherPool FLUSHER = new FlusherPool();
 
+private static int getNumFlusherPoolThreads() {
+int numThreads = 100;
+try {
+String threads = 
System.getProperty("num_flusher_pool_threads", "100");
--- End diff --

I agree that it does not add a lot of value in it's current form.  If you 
want me to document it I am happy to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.

2017-01-06 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1808
  
I think I have addressed all of the review comments so far.  I will try to 
get my 1.x version of the patch up shortly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1853: STORM-2264 OpaqueTridentKafkaSpout failing after STORM-22...

2017-01-06 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1853
  
I am also OK with reverting STORM-2216 if it is causing a lot of issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1853: STORM-2264 OpaqueTridentKafkaSpout failing after STORM-22...

2017-01-06 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1853
  
+1 seems fine to me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1785: [STORM-2201] Add dynamic scheduler configuration l...

2017-01-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1785#discussion_r94972777
  
--- Diff: docs/IConfigLoader.md ---
@@ -0,0 +1,58 @@
+---
+title: IConfigLoader
+layout: documentation
+documentation: true
+---
+
+
+### Introduction
+IConfigLoader is an interface designed to allow way to dynamically load 
scheduler resource constraints into scheduler implementations. Currently, the 
MultiTenant scheduler uses this interface to dynamically load the number of 
isolated nodes a given user has been guaranteed, and the ResoureAwareScheduler 
uses the interface to dynamically load per user resource guarantees.
--- End diff --

Perhaps "designed to allow dynamic loading of scheduler resource 
constraints"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1767: STORM-2194: Report error and die, not report error or die

2017-01-06 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1767
  
@sathyafmt sorry I have taken so long to respond December was a really 
crazy month for me.  From STORM-2194 I see that the SocketTimeoutException goes 
through the code being changed.  The RMI code does not go through that path at 
all.

```
2016-12-01 04:24:41.721 STDERR [INFO] Error: Exception thrown by the agent 
: java.rmi.server.ExportException: Port already in use: 56700; nested exception 
is:
2016-12-01 04:24:41.722 STDERR [INFO] java.net.BindException: Address 
already in use
```

If it did then we would have exited because BindException and 
ExportException are neither InterruptedIOException nor InterruptedException. 

So this patch, nor the one I proposed would have any impact on the RMI case 
at all.  Something else is catching the ExportException and printing to STDERR 
the error message above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1862: STORM-2278: Allow max number of disruptor queue fl...

2017-01-05 Thread revans2
GitHub user revans2 opened a pull request:

https://github.com/apache/storm/pull/1862

STORM-2278: Allow max number of disruptor queue flusher threads to be 
configurable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/revans2/incubator-storm STORM-2278

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1862.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1862


commit 753b1cbca44ff26784eee8f1a0a5cbce5a1b97d6
Author: Robert (Bobby) Evans 
Date:   2017-01-05T20:08:02Z

STORM-2278: Allow max number of disruptor queue flusher threads to be 
configurable




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1827: STORM-2243: adds ip address to supervisor id

2017-01-05 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1827
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1842: Merge remote-tracking branch 'refs/remotes/apache/master'

2017-01-05 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1842
  
@leongu-tc this is an empty commit/pull request.  Could you please close 
this pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1839: STORM-1292

2017-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1839#discussion_r94782513
  
--- Diff: storm-core/test/jvm/org/apache/storm/MessagingTest.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.*;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class MessagingTest {
+private final static Logger LOG = 
LoggerFactory.getLogger(MessagingTest.class);
+
+@Test
+public void testLocalTransport() throws Exception {
+Config stormConf = new Config();
+//stormConf.putAll(Utils.readDefaultConfig());
+stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+stormConf.put(Config.STORM_MESSAGING_TRANSPORT , 
"org.apache.storm.messaging.netty.Context");
--- End diff --

This was in the original code 
https://github.com/apache/storm/blob/d5acec9e3b9473a0e8cf39c7e12393626a3ca426/storm-core/test/clj/org/apache/storm/messaging_test.clj#L32-L33

But yes it should be optional.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1839: STORM-1292

2017-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1839#discussion_r94783130
  
--- Diff: storm-core/test/jvm/org/apache/storm/MessagingTest.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.*;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class MessagingTest {
+private final static Logger LOG = 
LoggerFactory.getLogger(MessagingTest.class);
+
+@Test
+public void testLocalTransport() throws Exception {
+Config stormConf = new Config();
+//stormConf.putAll(Utils.readDefaultConfig());
+stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+stormConf.put(Config.STORM_MESSAGING_TRANSPORT , 
"org.apache.storm.messaging.netty.Context");
+boolean[] transportOptions = {true, false};
+for(boolean transportOn:transportOptions) {
+stormConf.put(Config.STORM_LOCAL_MODE_ZMQ, transportOn);
+//List seeds = new ArrayList<>();
+//seeds.add("localhost");
+//stormConf.put(Config.NIMBUS_HOST, "localhost");
+//stormConf.put(Config.NIMBUS_SEEDS, seeds);
+//stormConf.put("storm.cluster.mode", "local");
+//stormConf.put(Config.STORM_LOCAL_HOSTNAME, "localhost");
+
+ILocalCluster cluster = new 
LocalCluster.Builder().withSimulatedTime().withSupervisors(1).withPortsPerSupervisor(2)
--- End diff --

This needs to be in a try block, so the autoclose in cluster is called 
properly.

```
try (ILocalCluster cluster = new LocalCluster.Builder()build()) {
  //Rest of the test that used cluster
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1839: STORM-1292

2017-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1839#discussion_r94783485
  
--- Diff: storm-core/test/jvm/org/apache/storm/MessagingTest.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.*;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class MessagingTest {
+private final static Logger LOG = 
LoggerFactory.getLogger(MessagingTest.class);
+
+@Test
+public void testLocalTransport() throws Exception {
+Config stormConf = new Config();
+//stormConf.putAll(Utils.readDefaultConfig());
+stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+stormConf.put(Config.STORM_MESSAGING_TRANSPORT , 
"org.apache.storm.messaging.netty.Context");
+boolean[] transportOptions = {true, false};
+for(boolean transportOn:transportOptions) {
+stormConf.put(Config.STORM_LOCAL_MODE_ZMQ, transportOn);
+//List seeds = new ArrayList<>();
+//seeds.add("localhost");
+//stormConf.put(Config.NIMBUS_HOST, "localhost");
+//stormConf.put(Config.NIMBUS_SEEDS, seeds);
+//stormConf.put("storm.cluster.mode", "local");
+//stormConf.put(Config.STORM_LOCAL_HOSTNAME, "localhost");
+
+ILocalCluster cluster = new 
LocalCluster.Builder().withSimulatedTime().withSupervisors(1).withPortsPerSupervisor(2)
+.withDaemonConf(stormConf).build();
+Thrift.SpoutDetails spoutDetails = 
Thrift.prepareSpoutDetails(new TestWordSpout(false), 2);
+Map inputs = new HashMap<>();
+inputs.put(Utils.getGlobalStreamId("1", null), 
Thrift.prepareShuffleGrouping());
+Thrift.BoltDetails boltDetails = 
Thrift.prepareBoltDetails(inputs, new TestGlobalCount(), 6);
+Map spoutMap = new HashMap<>();
+spoutMap.put("1", spoutDetails);
+Map boltMap = new HashMap<>();
+boltMap.put("2", boltDetails);
+StormTopology stormTopology = Thrift.buildTopology(spoutMap, 
boltMap);
+//TopologyBuilder builder = new TopologyBuilder();
+//builder.setSpout("1", new TestWordSpout(false), 2);
+//builder.setBolt("2", new TestGlobalCount(), 
6).shuffleGrouping("1");
+//StormTopology stormTopology = builder.createTopology();
+FixedTuple[] fixedTuple = {new FixedTuple((List) 
Collections.singletonList((Object) "a")), new FixedTuple((List) 
Collections.singletonList((Object) "b")),
--- End diff --

Or better yet if the commented out code works use it instead.  It is a lot 
smaller and fits the java API better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1839: STORM-1292

2017-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1839#discussion_r94784021
  
--- Diff: storm-core/test/jvm/org/apache/storm/MessagingTest.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.*;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class MessagingTest {
+private final static Logger LOG = 
LoggerFactory.getLogger(MessagingTest.class);
+
+@Test
+public void testLocalTransport() throws Exception {
+Config stormConf = new Config();
+//stormConf.putAll(Utils.readDefaultConfig());
+stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+stormConf.put(Config.STORM_MESSAGING_TRANSPORT , 
"org.apache.storm.messaging.netty.Context");
+boolean[] transportOptions = {true, false};
+for(boolean transportOn:transportOptions) {
+stormConf.put(Config.STORM_LOCAL_MODE_ZMQ, transportOn);
+//List seeds = new ArrayList<>();
+//seeds.add("localhost");
+//stormConf.put(Config.NIMBUS_HOST, "localhost");
+//stormConf.put(Config.NIMBUS_SEEDS, seeds);
+//stormConf.put("storm.cluster.mode", "local");
+//stormConf.put(Config.STORM_LOCAL_HOSTNAME, "localhost");
+
+ILocalCluster cluster = new 
LocalCluster.Builder().withSimulatedTime().withSupervisors(1).withPortsPerSupervisor(2)
+.withDaemonConf(stormConf).build();
+Thrift.SpoutDetails spoutDetails = 
Thrift.prepareSpoutDetails(new TestWordSpout(false), 2);
+Map inputs = new HashMap<>();
+inputs.put(Utils.getGlobalStreamId("1", null), 
Thrift.prepareShuffleGrouping());
+Thrift.BoltDetails boltDetails = 
Thrift.prepareBoltDetails(inputs, new TestGlobalCount(), 6);
+Map spoutMap = new HashMap<>();
+spoutMap.put("1", spoutDetails);
+Map boltMap = new HashMap<>();
+boltMap.put("2", boltDetails);
+StormTopology stormTopology = Thrift.buildTopology(spoutMap, 
boltMap);
+//TopologyBuilder builder = new TopologyBuilder();
+//builder.setSpout("1", new TestWordSpout(false), 2);
+//builder.setBolt("2", new TestGlobalCount(), 
6).shuffleGrouping("1");
+//StormTopology stormTopology = builder.createTopology();
+FixedTuple[] fixedTuple = {new FixedTuple((List) 
Collections.singletonList((Object) "a")), new FixedTuple((List) 
Collections.singletonList((Object) "b")),
--- End diff --

Actually this code is not used any more `fixedTuples` with an s right below 
this is the one that is put into the data set.  Please delete this code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1839: STORM-1292

2017-01-05 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1839#discussion_r94782323
  
--- Diff: storm-core/test/jvm/org/apache/storm/MessagingTest.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.*;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class MessagingTest {
+private final static Logger LOG = 
LoggerFactory.getLogger(MessagingTest.class);
+
+@Test
+public void testLocalTransport() throws Exception {
+Config stormConf = new Config();
+//stormConf.putAll(Utils.readDefaultConfig());
+stormConf.put(Config.TOPOLOGY_WORKERS, 2);
+stormConf.put(Config.STORM_MESSAGING_TRANSPORT , 
"org.apache.storm.messaging.netty.Context");
+boolean[] transportOptions = {true, false};
--- End diff --

This is OK but for anything more complex it might be good to use 
https://github.com/junit-team/junit4/wiki/Parameterized-tests instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1852: [STORM-2271] ClosedByInterruptException should be handled...

2017-01-05 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1852
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1857: STORM-2275: Nimbus crashed during state transition of top...

2017-01-05 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1857
  
+1 - Thanks for fixing this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1859: STORM-2276 Remove twitter4j usages due to license issue (...

2017-01-05 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1859
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1860: STORM-2276 Remove twitter4j usages due to license issue (...

2017-01-05 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1860
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1858: STORM-2276 Remove twitter4j usages due to license issue (...

2017-01-05 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1858
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94649177
  
--- Diff: 
storm-core/test/jvm/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
 ---
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.DefaultScheduler;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.SchedulerAssignmentImpl;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+
+public class TestBlacklistScheduler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(TestBlacklistScheduler.class);
+
+private static int currentTime = 1468216504;
+
+@Test
+public void TestBadSupervisor() {
+INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+
+Map supMap = 
TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+
+Config config = new Config();
+config.putAll(Utils.readDefaultConfig());
+config.put(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 200);
+config.put(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 2);
+config.put(Config.BLACKLIST_SCHEDULER_RESUME_TIME, 300);
+
+Map topoMap = new HashMap();
+
+TopologyDetails topo1 = 
TestUtilsForBlacklistScheduler.getTopology("topo-1", config, 5, 15, 1, 1, 
currentTime - 2, true);
+//TopologyDetails topo2 = 
TestUtilsForBlacklistScheduler.getTopology("topo-2", config, 5, 15, 1, 1, 
currentTime - 8,true);
+//TopologyDetails topo3 = 
TestUtilsForBlacklistScheduler.getTopology("topo-3", config, 5, 15, 1, 1, 
currentTime - 16,true);
+topoMap.put(topo1.getId(), topo1);
+//topoMap.put(topo2.getId(), topo2);
+//topoMap.put(topo3.getId(), topo3);
--- End diff --

Please remove the commented out code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94665415
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java 
---
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class BlacklistScheduler implements IScheduler {
+private static final Logger LOG = 
LoggerFactory.getLogger(BlacklistScheduler.class);
+IScheduler underlyingScheduler;
+@SuppressWarnings("rawtypes")
+private Map _conf;
+
+protected int toleranceTime;
+protected int toleranceCount;
+protected int resumeTime;
+protected IReporter reporter;
+protected IBlacklistStrategy blacklistStrategy;
+
+protected int nimbusMonitorFreqSecs;
+
+protected Map> cachedSupervisors;
+
+//key is supervisor key ,value is supervisor ports
+protected CircularBuffer>> 
badSupervisorsTolerance;
+protected Set blacklistHost;
+
+public BlacklistScheduler(IScheduler underlyingScheduler) {
+this.underlyingScheduler = underlyingScheduler;
+}
+
+@Override
+public void prepare(Map conf) {
+LOG.info("prepare black list scheduler");
+LOG.info(conf.toString());
+underlyingScheduler.prepare(conf);
+_conf = conf;
+if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) {
+toleranceTime = (Integer) 
_conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME);
--- End diff --

nimbusMonitorFreqSecs also needs this help too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94661600
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java 
---
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class BlacklistScheduler implements IScheduler {
+private static final Logger LOG = 
LoggerFactory.getLogger(BlacklistScheduler.class);
+IScheduler underlyingScheduler;
+@SuppressWarnings("rawtypes")
+private Map _conf;
+
+protected int toleranceTime;
+protected int toleranceCount;
+protected int resumeTime;
+protected IReporter reporter;
+protected IBlacklistStrategy blacklistStrategy;
+
+protected int nimbusMonitorFreqSecs;
+
+protected Map> cachedSupervisors;
+
+//key is supervisor key ,value is supervisor ports
+protected CircularBuffer>> 
badSupervisorsTolerance;
+protected Set blacklistHost;
+
+public BlacklistScheduler(IScheduler underlyingScheduler) {
+this.underlyingScheduler = underlyingScheduler;
+}
+
+@Override
+public void prepare(Map conf) {
+LOG.info("prepare black list scheduler");
+LOG.info(conf.toString());
+underlyingScheduler.prepare(conf);
+_conf = conf;
+if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) {
+toleranceTime = (Integer) 
_conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME);
--- End diff --

the isInteger annotation does not guarantee that it is an Integer you will 
get, but it guarantees that you will get a Number that can be turned into an 
Integer without losing data. 

```
toleranceTime = Utils.getInt( 
_conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME));
```

This holds true for `toleranceCount` and `resumeTime` too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94675511
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/CircularBuffer.java ---
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import java.io.Serializable;
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public final class CircularBuffer extends 
AbstractCollection implements Serializable {
+
+// This is the largest capacity allowed by this implementation
+private static final int MAX_CAPACITY = 1 << 30;
+
+private int size = 0;
+private int producerIndex = 0;
+private int consumerIndex = 0;
+
+private int capacity;
+
+private Serializable[] underlying;
+
+// Construct a buffer which has at least the specified capacity.  If
+// the value specified is a power of two then the buffer will be
+// exactly the specified size.  Otherwise the buffer will be the
+// first power of two which is greater than the specified value.
+public CircularBuffer(int capacity) {
+
+if (capacity > MAX_CAPACITY) {
+throw new IllegalArgumentException("Capacity greater than " +
+"allowed");
+}
+
+this.capacity = capacity;
+underlying = new Serializable[this.capacity];
+}
+
+// Constructor used by clone()
+private CircularBuffer(CircularBuffer oldBuffer) {
+size = oldBuffer.size;
+producerIndex = oldBuffer.producerIndex;
+consumerIndex = oldBuffer.consumerIndex;
+capacity = oldBuffer.capacity;
+//bitmask = oldBuffer.bitmask;
+underlying = new Serializable[oldBuffer.underlying.length];
+System.arraycopy(oldBuffer.underlying, 0, underlying, 0, 
underlying.length);
+}
+
+private boolean isFull() {
+return size == capacity;
+}
+
+public boolean add(Serializable obj) {
--- End diff --

Shouldn't this be

```public boolean add(T obj)```

Also if the is overriding an existing method we should use the `@Override` 
annotation.  This is for all of the methods here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94665531
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java 
---
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class BlacklistScheduler implements IScheduler {
+private static final Logger LOG = 
LoggerFactory.getLogger(BlacklistScheduler.class);
+IScheduler underlyingScheduler;
+@SuppressWarnings("rawtypes")
+private Map _conf;
+
+protected int toleranceTime;
+protected int toleranceCount;
+protected int resumeTime;
+protected IReporter reporter;
+protected IBlacklistStrategy blacklistStrategy;
+
+protected int nimbusMonitorFreqSecs;
+
+protected Map> cachedSupervisors;
+
+//key is supervisor key ,value is supervisor ports
+protected CircularBuffer>> 
badSupervisorsTolerance;
+protected Set blacklistHost;
+
+public BlacklistScheduler(IScheduler underlyingScheduler) {
+this.underlyingScheduler = underlyingScheduler;
+}
+
+@Override
+public void prepare(Map conf) {
+LOG.info("prepare black list scheduler");
+LOG.info(conf.toString());
+underlyingScheduler.prepare(conf);
+_conf = conf;
+if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) {
+toleranceTime = (Integer) 
_conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME);
+}
+if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) 
{
+toleranceCount = (Integer) 
_conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+}
+if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_RESUME_TIME)) {
+resumeTime = (Integer) 
_conf.get(Config.BLACKLIST_SCHEDULER_RESUME_TIME);
+}
+String reporterClassName = 
_conf.containsKey(Config.BLACKLIST_SCHEDULER_REPORTER) ? (String) 
_conf.get(Config.BLACKLIST_SCHEDULER_REPORTER) : "";
--- End diff --

Can we set the default to 
"org.apache.storm.scheduler.blacklist.reporters.LogReporter" instead of "".  It 
makes it just work for the unit tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94678615
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
 ---
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist.strategies;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class DefaultBlacklistStrategy implements IBlacklistStrategy {
+
+private static Logger LOG = 
LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
+
+private IReporter _reporter;
+
+private int _toleranceTime;
+private int _toleranceCount;
+private int _resumeTime;
+private int _nimbusMonitorFreqSecs;
+
+private TreeMap blacklist;
+
+@Override
+public void prepare(IReporter reporter, int toleranceTime, int 
toleranceCount, int resumeTime, int nimbusMonitorFreqSecs) {
--- End diff --

Could we have conf passed in here too, or not make this a plugin?  I know 
we don't need/use it now, but if we want this to be user pluggable we should 
think about what other things people might want to do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94677722
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/CircularBuffer.java ---
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import java.io.Serializable;
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public final class CircularBuffer extends 
AbstractCollection implements Serializable {
--- End diff --

I am a little confused why we need all of this.  It feels like we could do 
all of this with an ArrayBlockingQueue.

```
public final class CircularBuffer extends 
ArrayBlockingQueue {
@Override
public boolean add(T obj) {
while (!offer(obj)) {
poll();
}
}

public List toList() {
return new ArrayList<>(this);
}
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94675703
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/CircularBuffer.java ---
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import java.io.Serializable;
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public final class CircularBuffer extends 
AbstractCollection implements Serializable {
+
+// This is the largest capacity allowed by this implementation
+private static final int MAX_CAPACITY = 1 << 30;
+
+private int size = 0;
+private int producerIndex = 0;
+private int consumerIndex = 0;
+
+private int capacity;
+
+private Serializable[] underlying;
+
+// Construct a buffer which has at least the specified capacity.  If
+// the value specified is a power of two then the buffer will be
+// exactly the specified size.  Otherwise the buffer will be the
+// first power of two which is greater than the specified value.
+public CircularBuffer(int capacity) {
+
+if (capacity > MAX_CAPACITY) {
+throw new IllegalArgumentException("Capacity greater than " +
+"allowed");
+}
+
+this.capacity = capacity;
+underlying = new Serializable[this.capacity];
+}
+
+// Constructor used by clone()
+private CircularBuffer(CircularBuffer oldBuffer) {
--- End diff --

This shouldn't oldBuffer be `CircularBuffer` or something like that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94668925
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java 
---
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class BlacklistScheduler implements IScheduler {
+private static final Logger LOG = 
LoggerFactory.getLogger(BlacklistScheduler.class);
+IScheduler underlyingScheduler;
+@SuppressWarnings("rawtypes")
+private Map _conf;
+
+protected int toleranceTime;
+protected int toleranceCount;
+protected int resumeTime;
+protected IReporter reporter;
+protected IBlacklistStrategy blacklistStrategy;
+
+protected int nimbusMonitorFreqSecs;
+
+protected Map> cachedSupervisors;
+
+//key is supervisor key ,value is supervisor ports
+protected CircularBuffer>> 
badSupervisorsTolerance;
+protected Set blacklistHost;
+
+public BlacklistScheduler(IScheduler underlyingScheduler) {
+this.underlyingScheduler = underlyingScheduler;
+}
+
+@Override
+public void prepare(Map conf) {
+LOG.info("prepare black list scheduler");
+LOG.info(conf.toString());
--- End diff --

Could we drop this log message, it does not seem to really be needed any 
more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94668597
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java 
---
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class BlacklistScheduler implements IScheduler {
+private static final Logger LOG = 
LoggerFactory.getLogger(BlacklistScheduler.class);
+IScheduler underlyingScheduler;
--- End diff --

I'm not really sure why this is package, nor why it is not final.  I don't 
see a reason to have it be mutable at this point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94678822
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
 ---
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist.strategies;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.blacklist.CircularBuffer;
--- End diff --

I don't think this is used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94668464
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java 
---
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class BlacklistScheduler implements IScheduler {
+private static final Logger LOG = 
LoggerFactory.getLogger(BlacklistScheduler.class);
+IScheduler underlyingScheduler;
+@SuppressWarnings("rawtypes")
+private Map _conf;
+
+private int toleranceTime;
+private int toleranceCount;
+private int resumeTime;
+private IReporter reporter;
+private IBlacklistStrategy blacklistStrategy;
+
+private int nimbusMonitorFreqSecs;
+
+private Map> cachedSupervisors;
+
+//key is supervisor key ,value is supervisor ports
+private CircularBuffer>> 
badSupervisorsTolerance;
+private Set blacklistHost;
+
+public BlacklistScheduler(IScheduler underlyingScheduler) {
+this.underlyingScheduler = underlyingScheduler;
+}
+
+@Override
+public void prepare(Map conf) {
+LOG.info("prepare black list scheduler");
+LOG.info(conf.toString());
+underlyingScheduler.prepare(conf);
+_conf = conf;
+if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) {
+toleranceTime = (Integer) 
_conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME);
+}
+if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) 
{
+toleranceCount = (Integer) 
_conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+}
+if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_RESUME_TIME)) {
+resumeTime = (Integer) 
_conf.get(Config.BLACKLIST_SCHEDULER_RESUME_TIME);
+}
+String reporterClassName = 
_conf.containsKey(Config.BLACKLIST_SCHEDULER_REPORTER) ? (String) 
_conf.get(Config.BLACKLIST_SCHEDULER_REPORTER) : "";
+try {
+reporter = (IReporter) 
Class.forName(reporterClassName).newInstance();
+} catch (ClassNotFoundException e) {
+LOG.error("Can't find blacklist reporter for name {}", 
reporterClassName);
+throw new RuntimeException(e);
+} catch (InstantiationException e) {
+LOG.error("Throw InstantiationException blacklist reporter for 
name {}", reporterClassName);
+throw new RuntimeException(e);
+} catch (IllegalAccessException e) {
+LOG.error("Throw illegalAccessException blacklist reporter for 
name {}", reporterClassName);
+throw new RuntimeException(e);
+}
+
+String strategyClassName = 
_conf.containsKey(Config.BLACKLIST_SCHEDULER_STRATEGY) ? (String) 
_conf.get(Config.BLACKLIST_SCHEDULER_STRATEGY) : "";
+try {
+blacklistStrategy = (IBlacklistStrategy) 
Class.forName(strategyClassName).newInstance();
+} catch (ClassNotFoundException e) {
+  

[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94677987
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/CircularBuffer.java ---
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import java.io.Serializable;
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public final class CircularBuffer extends 
AbstractCollection implements Serializable {
--- End diff --

And some javadocs explaining what it does any why would be helpful, because 
even though this is currently implemented as a CircularBuffer it has slightly 
different behavior from a typical Collection.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94678786
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
 ---
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist.strategies;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.blacklist.CircularBuffer;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+public interface IBlacklistStrategy {
+
+public void prepare(IReporter reporter, int toleranceTime, int 
toleranceCount, int resumeTime, int nimbusMonitorFreqSecs);
+
+public Set getBlacklist(List>> 
toleranceBuffer, Cluster cluster, Topologies topologies);
+
+public void resumeFromBlacklist();
+
+}
--- End diff --

Could we have some javadocs explaining how this is used and the life cycle 
of the strategy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94665652
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java 
---
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.blacklist.reporters.IReporter;
+import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class BlacklistScheduler implements IScheduler {
+private static final Logger LOG = 
LoggerFactory.getLogger(BlacklistScheduler.class);
+IScheduler underlyingScheduler;
+@SuppressWarnings("rawtypes")
+private Map _conf;
+
+protected int toleranceTime;
+protected int toleranceCount;
+protected int resumeTime;
+protected IReporter reporter;
+protected IBlacklistStrategy blacklistStrategy;
+
+protected int nimbusMonitorFreqSecs;
+
+protected Map> cachedSupervisors;
+
+//key is supervisor key ,value is supervisor ports
+protected CircularBuffer>> 
badSupervisorsTolerance;
+protected Set blacklistHost;
+
+public BlacklistScheduler(IScheduler underlyingScheduler) {
+this.underlyingScheduler = underlyingScheduler;
+}
+
+@Override
+public void prepare(Map conf) {
+LOG.info("prepare black list scheduler");
+LOG.info(conf.toString());
+underlyingScheduler.prepare(conf);
+_conf = conf;
+if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME)) {
+toleranceTime = (Integer) 
_conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_TIME);
+}
+if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT)) 
{
+toleranceCount = (Integer) 
_conf.get(Config.BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+}
+if (_conf.containsKey(Config.BLACKLIST_SCHEDULER_RESUME_TIME)) {
+resumeTime = (Integer) 
_conf.get(Config.BLACKLIST_SCHEDULER_RESUME_TIME);
+}
+String reporterClassName = 
_conf.containsKey(Config.BLACKLIST_SCHEDULER_REPORTER) ? (String) 
_conf.get(Config.BLACKLIST_SCHEDULER_REPORTER) : "";
+try {
+reporter = (IReporter) 
Class.forName(reporterClassName).newInstance();
+} catch (ClassNotFoundException e) {
+LOG.error("Can't find blacklist reporter for name {}", 
reporterClassName);
+throw new RuntimeException(e);
+} catch (InstantiationException e) {
+LOG.error("Throw InstantiationException blacklist reporter for 
name {}", reporterClassName);
+throw new RuntimeException(e);
+} catch (IllegalAccessException e) {
+LOG.error("Throw illegalAccessException blacklist reporter for 
name {}", reporterClassName);
+throw new RuntimeException(e);
+}
+
+String strategyClassName = 
_conf.containsKey(Config.BLACKLIST_SCHEDULER_STRATEGY) ? (String) 
_conf.get(Config.BLACKLIST_SCHEDULER_STRATEGY) : "";
--- End diff --

Can we make the default 
"org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy" 
instead of "" for the same reasons as

[GitHub] storm pull request #1674: STORM-2083: Blacklist scheduler

2017-01-04 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1674#discussion_r94675699
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/scheduler/blacklist/CircularBuffer.java ---
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler.blacklist;
+
+import java.io.Serializable;
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public final class CircularBuffer extends 
AbstractCollection implements Serializable {
+
+// This is the largest capacity allowed by this implementation
+private static final int MAX_CAPACITY = 1 << 30;
+
+private int size = 0;
+private int producerIndex = 0;
+private int consumerIndex = 0;
+
+private int capacity;
+
+private Serializable[] underlying;
--- End diff --

Shouldn't this be an array of type `T`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1854: STORM-2272: don't leak simulated time

2017-01-04 Thread revans2
GitHub user revans2 opened a pull request:

https://github.com/apache/storm/pull/1854

STORM-2272: don't leak simulated time



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/revans2/incubator-storm STORM-2272

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1854.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1854


commit 4e0da8f8d4cc0e9b593fdefe6003e24366ac60d0
Author: Robert (Bobby) Evans 
Date:   2017-01-04T20:40:09Z

STORM-2272: don't leak simulated time




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1793: STORM-2214: add in cacheing of the Login

2017-01-04 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1793
  
I also wanted to say we have been running with this in prod for a while now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1793: STORM-2214: add in cacheing of the Login

2017-01-04 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1793
  
@harshach the issue is that when there are lots of supervisors the load 
placed on the KDC is a lot higher than it is for a similar number of Hadoop 
nodes.  Each new Login will contact the KDC and fetch a new TGT followed by a 
new service ticket.  If we can reuse the Subject then the TGT and the service 
tickets can be reused too reducing the load on the KDC.  The Login handles the 
life cycle of the Subject, creates it and destroys it, so we wanted to cache it 
at that level to avoid issues there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.

2016-12-12 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1808
  
@srdo as part of backporting this to 1.x I am going to need to make a 
change to not use Function directly, because it is only in java 8.  So to 
maintain compatibility between 1.x and 2.x I am going to need to make a few 
changes in this patch too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.

2016-12-08 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1808
  
@srdo I think I addressed all of your review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-08 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91618249
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.Testing;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.tuple.Tuple;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaBoltTest {
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
+
+@SuppressWarnings({ "unchecked", "serial" })
+@Test
+public void testSimple() {
+final KafkaProducer producer = 
mock(KafkaProducer.class);
+when(producer.send(any(), any())).thenAnswer(new Answer() {
+@Override
+public Object answer(InvocationOnMock invocation) throws 
Throwable {
+Callback c = (Callback)invocation.getArguments()[1];
+c.onCompletion(null, null);
+return null;
+}
+});
+KafkaBolt bolt = new KafkaBolt() {
--- End diff --

In this case because I am subclassing KafkaBolt it the compiler(at least in 
eclipse) actually complains that I am not allowed to do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-08 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91613603
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ * 
+ * This bolt uses 0.8.2 Kafka Producer API.
+ * 
+ * It works for sending tuples to older Kafka version (0.8.1).
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+/**
+ * With default setting for fireAndForget and async, the callback is 
called when the sending succeeds.
+ * By setting fireAndForget true, the send will not wait at all for 
kafka to ack.
+ * "acks" setting in 0.8.2 Producer API config doesn't matter if 
fireAndForget is set.
+ * By setting async false, synchronous sending is used. 
+ */
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+  

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-08 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91592134
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into 
default topic .
+Please make sure the default topic have created .
+
+### Specifying Kafka producer properties
+You can provide all the produce properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a Kafka Spo

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-08 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91592178
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into 
default topic .
+Please make sure the default topic have created .
+
+### Specifying Kafka producer properties
+You can provide all the produce properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a Kafka Spo

[GitHub] storm issue #1818: STORM-2104 1.x

2016-12-08 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1818
  
+1 pending travis (and the waiting period)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

2016-12-08 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1696
  
This looks good to me.  Now that I have gone through the kafka spout code 
for my other pull request I am confident in giving this a +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1696: STORM-2104: More graceful handling of acked/failed...

2016-12-08 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1696#discussion_r91569248
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
 ---
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * @param  The type this deserializer deserializes to.
+ */
+public interface SerializableDeserializer extends Deserializer, 
Serializable { 
--- End diff --

Good point.  I think I am going to need to fix that on my patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1808
  
@srdo for me backwards compatibility for 1.x is more a question of 
violating out versioning than anything else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java

2016-12-07 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1800
  
@ptgoetz I have updated the packaging to have a separate directory for the 
DRPC server dependencies.  I have run manual tests and everything works.  The 
big difference is that {{apache-storm-2.0.0-SNAPSHOT.tar.gz}} and 
{{apache-storm-2.0.0-SNAPSHOT.zip}} are now under {{./final-package/target/}} 
instead of {{./target}}  I will try and look to see if there are any docs that 
I need to update around this, but I wanted to give everyone a chance to look 
and give feedback on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java

2016-12-07 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1800
  
One more option, still ugly, but with a lot less impact.  I can do 
something similar to Hadoop.  They use several different invocations of the 
maven assembly plugin into directories (predates moduleSets) and then have a 
separate script that produces the final release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java

2016-12-07 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1800
  
So shading is not really an option for jersey.
I was able to split the DRPC server off into its own package with tests, 
but packaging it up with the assembly plugin is proving to be difficult.  If I 
upgrade to 3.0.0 of the assembly plugin I can use moduleSets and make it work, 
but I have to change {{storm-dist/binary}} to a multi-module build and move the 
code that actually packages the final release to a sub package under it.  Oh 
and we leak two empty/useless jar files into the release package that should be 
ignored.  moduleSets do not package pom modules.  They require an artifact that 
is a file.

I really dislike all of these options.  I see a few cleaner options, but 
they will require a lot more work.

1.  Move to gradle
2. Change how we do shading so that the assembly subModule code works the 
way they intended it.
This would involve essentially having a separate package/build for what 
we want shaded (i.e. storm-shaded-deps.jar)

If someone has a cleaner solution I am happy to do it, but I think option 2 
is the best so far, although I don't like it all that much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1764: STORM-2190: reduce contention between submission and sche...

2016-12-03 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1764
  
@HeartSaVioR I plan on doing that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java

2016-12-02 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1800
  
Shading Jersey is becoming rather difficult (lots of dependencies including 
aop and dependency injection.  Splitting the DPRC server off into it's own 
location seems much simpler and less error prone, so I will spend some time on 
that instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1764: STORM-2190: reduce contention between submission and sche...

2016-12-02 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1764
  
@ptgoetz @jerrypeng 

I made a few changes to thing.  I fixed the race condition and I addressed 
the review comments, but I also put in some optimizations to storm submitter.  
We were literally calling getClusterInfo 3+ times for each topology submission, 
and because the ultimate goal of STORM-2190 is to make it more scalable this 
helps a lot.  There is still some lock contention, but it is much better then 
it was before.

If things look good here I will backport the changes to my other pull 
request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1767: STORM-2194: Report error and die, not report error or die

2016-12-02 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1767
  
We should be able to fix this with code like.

```
(if (or
   (exception-cause? InterruptedException error)
   (and
   (exception-cause? java.io.InterruptedIOException error)
   (not (exception-cause? java.net.SocketTimeoutException
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1767: STORM-2194: Report error and die, not report error or die

2016-12-02 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1767
  
@chawco 

Okay so I understand the issue better now. SocketTimeoutException is a 
subclass of InterruptedIOException.


https://docs.oracle.com/javase/7/docs/api/java/net/SocketTimeoutException.html

I could argue that it is a mistake on the part of java and that it is 
wrong, but that is already set in stone so we have to deal with it.

I see two options. 

1) We can treat a SocketTimeoutException differently from other 
InterruptedIOExceptions, 
2) or we can just treat all InterruptedIOExceptions as fatal.

We started ignoring InterruptedIOExceptions because we would occasionally 
run into them in the supervisor or nimbus local cluster tests and that would 
fail everything.  Having proper behavior is more important than having super 
stable unit tests, but if we can have both (option 1) I think that would be 
best.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java

2016-12-01 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1800
  
@ptgoetz if this looks good as is I will look into shading Jersey.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1764: STORM-2190: reduce contention between submission and sche...

2016-12-01 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1764
  
Just so we don't miss the comment from @jerrypeng 

> Couldn't a wrong ordering of events happen since we are locking when 
calculating a scheduling then unlocking and then locking and uploading the new 
scheduling and unlocking
> for example:
> T0: submit
> T1: rebalance
> T2: rebalance - calculate new scheduling
> T3: submit - calculate new scheduling
> T4: rebalance - upload new scheduling to zk
> T5: submit - upload new scheduling to zk
> 
> even though we should end up with the scheduling calculated by the 
rebalance but we end up with scheduling calculated from the original submit.

Yes, that is correct.  We should do something here, and he suggested that 
perhaps as part of a refactor of Nimbus we should look at support for long 
running scheduling.

In the short term I think I might make scheduling and writing to ZK atomic, 
but long term I think I will file a JIRA to look at better scheduling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1764: STORM-2190: reduce contention between submission a...

2016-12-01 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1764#discussion_r90542812
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---
@@ -1008,23 +1008,24 @@
   (reset! (:id->worker-resources nimbus) {}))
 ;; tasks figure out what tasks to talk to by looking at topology at 
runtime
 ;; only log/set when there's been a change to the assignment
-(doseq [[topology-id assignment] new-assignments
-:let [existing-assignment (get existing-assignments 
topology-id)
-  topology-details (.getById topologies topology-id)]]
-  (if (= existing-assignment assignment)
-(log-debug "Assignment for " topology-id " hasn't changed")
-(do
-  (log-message "Setting new assignment for topology id " 
topology-id ": " (pr-str assignment))
-  (.setAssignment storm-cluster-state topology-id 
(thriftify-assignment assignment))
-  )))
-(->> new-assignments
-  (map (fn [[topology-id assignment]]
-(let [existing-assignment (get existing-assignments 
topology-id)]
-  [topology-id (map to-worker-slot (newly-added-slots 
existing-assignment assignment))]
-  )))
-  (into {})
-  (.assignSlots inimbus topologies)))
-(log-message "not a leader, skipping assignments")))
+(locking (:sched-lock nimbus)
--- End diff --

I agree and now that nimbus is in java we can look at doing some 
refactoring along those lines.  If you feel that we need to do it now and that 
this is a blocker I can spend some time looking into how to do that better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1764: STORM-2190: reduce contention between submission a...

2016-12-01 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1764#discussion_r90538639
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---
@@ -1008,23 +1008,24 @@
   (reset! (:id->worker-resources nimbus) {}))
 ;; tasks figure out what tasks to talk to by looking at topology at 
runtime
 ;; only log/set when there's been a change to the assignment
-(doseq [[topology-id assignment] new-assignments
-:let [existing-assignment (get existing-assignments 
topology-id)
-  topology-details (.getById topologies topology-id)]]
-  (if (= existing-assignment assignment)
-(log-debug "Assignment for " topology-id " hasn't changed")
-(do
-  (log-message "Setting new assignment for topology id " 
topology-id ": " (pr-str assignment))
-  (.setAssignment storm-cluster-state topology-id 
(thriftify-assignment assignment))
-  )))
-(->> new-assignments
-  (map (fn [[topology-id assignment]]
-(let [existing-assignment (get existing-assignments 
topology-id)]
-  [topology-id (map to-worker-slot (newly-added-slots 
existing-assignment assignment))]
-  )))
-  (into {})
-  (.assignSlots inimbus topologies)))
-(log-message "not a leader, skipping assignments")))
+(locking (:sched-lock nimbus)
--- End diff --

@jerrypeng You are correct that this could happen.  I don't really think it 
will be that likely to happen in practice but I'll think about it and see if we 
can fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


<    8   9   10   11   12   13   14   15   16   17   >