[GitHub] storm issue #2464: STORM-2847 1.x

2017-12-20 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2464
  
Can you please squash the commits. You can merge after.


---


[GitHub] storm pull request #2464: STORM-2847 1.x

2017-12-20 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2464#discussion_r158206579
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS;
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnitRule;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutReactivationTest {
+
+@Rule
+public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+@Captor
+private ArgumentCaptor> 
commitCapture;
+
+private final TopologyContext topologyContext = 
mock(TopologyContext.class);
+private final Map conf = new HashMap<>();
+private final SpoutOutputCollector collector = 
mock(SpoutOutputCollector.class);
+private final long commitOffsetPeriodMs = 2_000;
+private KafkaConsumer consumerSpy;
+private KafkaConsumer postReactivationConsumerSpy;
+private KafkaSpout spout;
+private final int maxPollRecords = 10;
+
+@Before
+public void setUp() {
+KafkaSpoutConfig spoutConfig =
+SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+KafkaSpoutConfig.builder("127.0.0.1:" + 
kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+SingleTopicKafkaSpoutConfiguration.TOPIC))
+.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
+.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
maxPollRecords)
+.build();
+KafkaConsumerFactory consumerFactory = new 
KafkaConsumerFactoryDefault<>();
+this.consumerSpy = 
spy(consumerFactory.createConsumer(spoutConfig));
+this.postReactivationConsumerSpy = 
spy(consumerFactory.createConsumer(spoutConfig));
+KafkaConsumerFactory consumerFactoryMock = 
mock(KafkaConsumerFactory.class);
--- End diff --

I need to verify that all messages were committed 
https://github.com/apache/storm/pull/2464/files#diff-ed317082b62537f44916bda817cf1cecR120
 which is done by checking if commit was called on the consumer spy.


---


[GitHub] storm pull request #2464: STORM-2847 1.x

2017-12-20 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2464#discussion_r158205889
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS;
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnitRule;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutReactivationTest {
+
+@Rule
+public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+@Captor
+private ArgumentCaptor> 
commitCapture;
+
+private final TopologyContext topologyContext = 
mock(TopologyContext.class);
+private final Map conf = new HashMap<>();
+private final SpoutOutputCollector collector = 
mock(SpoutOutputCollector.class);
+private final long commitOffsetPeriodMs = 2_000;
+private KafkaConsumer consumerSpy;
+private KafkaConsumer postReactivationConsumerSpy;
+private KafkaSpout spout;
+private final int maxPollRecords = 10;
+
+@Before
+public void setUp() {
+KafkaSpoutConfig spoutConfig =
+SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+KafkaSpoutConfig.builder("127.0.0.1:" + 
kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+SingleTopicKafkaSpoutConfiguration.TOPIC))
+.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
+.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
maxPollRecords)
+.build();
+KafkaConsumerFactory consumerFactory = new 
KafkaConsumerFactoryDefault<>();
+this.consumerSpy = 
spy(consumerFactory.createConsumer(spoutConfig));
+this.postReactivationConsumerSpy = 
spy(consumerFactory.createConsumer(spoutConfig));
+KafkaConsumerFactory consumerFactoryMock = 
mock(KafkaConsumerFactory.class);
--- End diff --

why do you need consumerFactory and consumerFactoryMock?


---


[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-20 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2465
  
Thanks, I missed those changes.

Regarding testing EARLIEST/LATEST, I don't understand why you'd need to 
mock KafkaSpoutConfig. I think it can be tested as an integration test, like 
the ones in SingleTopicKafkaSpoutTest, where you start a spout, emit and commit 
a tuple, deactivate/reactivate it, verify that it can emit and commit the next 
tuple, close the spout and create a new one and verify that the new spout 
starts over/starts at the end.

Alternatively you can do it with mocks like many of the other tests, 
basically doing the same flow but verifying that the spout calls 
seekToBeginning/seekToEnd instead of checking which tuples it emits.

Could you put a +1 on https://github.com/apache/storm/pull/2464 so I can 
merge?


---


[GitHub] storm issue #2433: [STORM-2693] Heartbeats and assignments promotion for sto...

2017-12-20 Thread danny0405
Github user danny0405 commented on the issue:

https://github.com/apache/storm/pull/2433
  
@HeartSaVioR i have fixed the checkstyle issue already, yeah the storm-core 
building is slow, i will check the reason.


---


[GitHub] storm pull request #2476: Minor optimisation about trident kafka state

2017-12-20 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2476#discussion_r158203266
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -116,8 +114,9 @@ public void updateState(List tuples, 
TridentCollector collector) {
 }
 
 if (exceptions.size() > 0) {
-StringBuilder errorMsg = new StringBuilder("Could not 
retrieve result for messages " + tuples + " from topic = " + topic
-+ " because of the following exceptions:" + 
System.lineSeparator());
+StringBuilder errorMsg = new StringBuilder("Could not 
retrieve result for messages ");
--- End diff --

Makes sense, thanks.


---


[GitHub] storm issue #2476: Minor optimisation about trident kafka state

2017-12-20 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2476
  
+1


---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158193924
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.utils.JCQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.storm.Constants.SYSTEM_TASK_ID;
+
+public class BackPressureTracker {
+static final Logger LOG = 
LoggerFactory.getLogger(BackPressureTracker.class);
+
+private final Map bpTasks = new 
ConcurrentHashMap<>(); // updates are more frequent than iteration
+private final Set nonBpTasks = ConcurrentHashMap.newKeySet();
+private final String workerId;
+
+public BackPressureTracker(String workerId, List 
allLocalTasks) {
+this.workerId = workerId;
+this.nonBpTasks.addAll(allLocalTasks);// all tasks are 
considered to be not under BP initially
+this.nonBpTasks.remove((int)SYSTEM_TASK_ID);   // not tracking 
system task
+}
+
+/* called by transferLocalBatch() on NettyWorker thread
+ * returns true if an update was recorded, false if taskId is already 
under BP
+ */
+public boolean recordBackpressure(Integer taskId, JCQueue recvQ) {
+if (nonBpTasks.remove(taskId)) {
+bpTasks.put(taskId, recvQ);
+return true;
+}
+return false;
+}
+
+// returns true if there was a change in the BP situation
+public boolean refreshBpTaskList() {
+boolean changed = false;
+LOG.debug("Running Back Pressure status change check");
+for (Iterator> itr = 
bpTasks.entrySet().iterator(); itr.hasNext(); ) {
+Entry entry = itr.next();
+if (entry.getValue().isEmptyOverflow()) {
+// move task from bpTasks to noBpTasks
+nonBpTasks.add(entry.getKey());
+itr.remove();
+changed = true;
+} else {
+//LOG.info("Task = {}, OverflowCount = {}, Q = {}", 
entry.getKey(), entry.getValue().getOverflowCount(), 
entry.getValue().getQueuedCount() );
--- End diff --

Would we want to leave it with DEBUG, or remove the comment?


---


Back-pressure mechanism

2017-12-20 Thread Walid Aljoby
Dear All,
My concern is about on which queue Storm relies to for back-pressure.
I did simple test for back-pressure supported by Storm.Each instance (executor) 
maintains incoming(receive) Q and outgoing(transfer) Q, and according to min 
and max threshold on these queues, a back-pressure works to slow down the spout 
in case of queue buildup. 
The purpose I wanted to make sure in case of link bottleneck whether 
back-pressure still helps or not. The conclusion, it helps only in case of 
queue buildup due to CPU bottleneck. I guess the reason for which why it could 
not make it for link bottleneck, because back-pessure relies only on the 
executor receive Q.
Does this make sense? If so, could we anyway make the back-pressure also 
working if ececutor transfer Q is full in case of link bottleneck?
Thanks!
Best,AlgobySent from Yahoo Mail on Android

[GitHub] storm pull request #2476: Minor optimisation about trident kafka state

2017-12-20 Thread OuYangLiang
Github user OuYangLiang commented on a diff in the pull request:

https://github.com/apache/storm/pull/2476#discussion_r158179810
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -116,8 +114,9 @@ public void updateState(List tuples, 
TridentCollector collector) {
 }
 
 if (exceptions.size() > 0) {
-StringBuilder errorMsg = new StringBuilder("Could not 
retrieve result for messages " + tuples + " from topic = " + topic
-+ " because of the following exceptions:" + 
System.lineSeparator());
+StringBuilder errorMsg = new StringBuilder("Could not 
retrieve result for messages ");
--- End diff --

There's a foreach statement right below it to iterate the exceptions, and 
append each exception to the errorMsg, it is still necessary I think.

for (ExecutionException exception : exceptions) {
errorMsg = 
errorMsg.append(exception.getMessage()).append(System.lineSeparator());
}


---


[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-20 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2465
  
These two changes address your comment: "_test that when you call 
findNextCommitOffset, it returns the metadata you put in)_"


https://github.com/hmcl/storm-apache/blob/baaf1fe6ab725ec50ecb09238f8957bc22a4e290/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java#L67


https://github.com/hmcl/storm-apache/blob/baaf1fe6ab725ec50ecb09238f8957bc22a4e290/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java#L82

As for the other tests, I started working on adding tests for scenarios 
around EARLIEST/LATEST and topology id. However, after working on it for 
awhile, it didn't feel natural that to test such a simple thing I had to mock 
so many objects (including KafkaSpoutConfig) and run the whole spout code. Then 
I decided that I would evaluate if the tests, code, or both should be 
considered for refactoring to make it easier to test. I need to dig into this 
further, but it seems to me that system tests are coupled with unit tests, and 
that makes it hard to write pure unit tests.

Let's merge https://github.com/apache/storm/pull/2464 and I will put my 
patch on top of it.



---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-12-20 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/2203
  
@arunmahadevan Rebased.


---


[GitHub] storm pull request #2476: Minor optimisation about trident kafka state

2017-12-20 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2476#discussion_r158124853
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -73,7 +71,7 @@ public void commit(Long txid) {
 public void prepare(Properties options) {
 Objects.requireNonNull(mapper, "mapper can not be null");
 Objects.requireNonNull(topicSelector, "topicSelector can not be 
null");
-producer = new KafkaProducer(options);
+producer = new KafkaProducer(options);
--- End diff --

Nit: I think `new KafkaProducer<>(options)` will work here


---


[GitHub] storm pull request #2476: Minor optimisation about trident kafka state

2017-12-20 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2476#discussion_r158125370
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
 ---
@@ -28,33 +28,34 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TridentKafkaStateFactory implements StateFactory {
+public class TridentKafkaStateFactory implements StateFactory {
 
+private static final long serialVersionUID = -3613240970062343385L;
 private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaStateFactory.class);
 
-private TridentTupleToKafkaMapper mapper;
+private TridentTupleToKafkaMapper mapper;
 private KafkaTopicSelector topicSelector;
 private Properties producerProperties = new Properties();
 
-public TridentKafkaStateFactory 
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+public TridentKafkaStateFactory 
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
 this.mapper = mapper;
 return this;
 }
 
-public TridentKafkaStateFactory 
withKafkaTopicSelector(KafkaTopicSelector selector) {
+public TridentKafkaStateFactory 
withKafkaTopicSelector(KafkaTopicSelector selector) {
 this.topicSelector = selector;
 return this;
 }
 
-public TridentKafkaStateFactory withProducerProperties(Properties 
props) {
+public TridentKafkaStateFactory 
withProducerProperties(Properties props) {
 this.producerProperties = props;
 return this;
 }
 
 @Override
 public State makeState(Map conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
 LOG.info("makeState(partitonIndex={}, numpartitions={}", 
partitionIndex, numPartitions);
-TridentKafkaState state = new TridentKafkaState()
+TridentKafkaState state = new TridentKafkaState()
--- End diff --

Can use `<>` on the right hand side


---


[GitHub] storm pull request #2476: Minor optimisation about trident kafka state

2017-12-20 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2476#discussion_r158125258
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -116,8 +114,9 @@ public void updateState(List tuples, 
TridentCollector collector) {
 }
 
 if (exceptions.size() > 0) {
-StringBuilder errorMsg = new StringBuilder("Could not 
retrieve result for messages " + tuples + " from topic = " + topic
-+ " because of the following exceptions:" + 
System.lineSeparator());
+StringBuilder errorMsg = new StringBuilder("Could not 
retrieve result for messages ");
--- End diff --

Since the message is only ~5 strings long, I don't think there's much 
reason to use a StringBuilder at all.


---


[GitHub] storm pull request #2476: Minor optimisation about trident kafka state

2017-12-20 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2476#discussion_r158124954
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -89,19 +87,19 @@ public void updateState(List tuples, 
TridentCollector collector) {
 List> futures = new 
ArrayList<>(numberOfRecords);
 for (TridentTuple tuple : tuples) {
 topic = topicSelector.getTopic(tuple);
-Object messageFromTuple = 
mapper.getMessageFromTuple(tuple);
-Object keyFromTuple = mapper.getKeyFromTuple(tuple);
+V messageFromTuple = mapper.getMessageFromTuple(tuple);
+K keyFromTuple = mapper.getKeyFromTuple(tuple);
 
 if (topic != null) {
 if (messageFromTuple != null) {
-Future result = producer.send(new 
ProducerRecord(topic, keyFromTuple, messageFromTuple));
+Future result = producer.send(new 
ProducerRecord(topic, keyFromTuple, messageFromTuple));
--- End diff --

Pretty sure the `<>` will also work here


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-20 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r158124105
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
 ---
@@ -100,6 +103,8 @@ public void 
testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExcee
 when(consumerMock.poll(anyLong()))
 .thenReturn(new ConsumerRecords<>(records));
 
+OffsetAndMetadataStub.committed(consumerMock);
--- End diff --

I tried checking out the branch and running the test without this line. It 
seems to still pass. Maybe we should just get rid of it (and 
OffsetAndMetadataStub)?


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-20 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r158114121
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -115,26 +115,30 @@ public KafkaSpoutConfig(Builder builder) {
 }
 
 /**
- * The offset used by the Kafka spout in the first poll to Kafka 
broker. The choice of this parameter will affect the number of consumer
- * records returned in the first poll. By default this parameter is 
set to UNCOMMITTED_EARLIEST. 
- * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, 
UNCOMMITTED_LATEST. 
+ * Defines how the {@link KafkaSpout} seeks the offset to be used in 
the first poll to Kafka upon topology deployment.
+ * By default this parameter is set to UNCOMMITTED_EARLIEST. If the 
strategy is set to:
+ * 
  * 
- * EARLIEST means that the kafka spout polls records starting in 
the first offset of the partition, regardless of previous
- * commits
- * LATEST means that the kafka spout polls records with offsets 
greater than the last offset in the partition, regardless of
- * previous commits
- * UNCOMMITTED_EARLIEST means that the kafka spout polls records 
from the last committed offset, if any. If no offset has been
- * committed, it behaves as EARLIEST.
- * UNCOMMITTED_LATEST means that the kafka spout polls records 
from the last committed offset, if any. If no offset has been
- * committed, it behaves as LATEST.
+ * EARLIEST - the kafka spout polls records starting in the first 
offset of the partition, regardless
+ * of previous commits. When the topology is activated/deactivated 
this setting has no effect
--- End diff --

I think the last sentence here is confusing unless you know the context 
(e.g. have read this PR). How about "This setting only takes effect on topology 
deployment"?


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-20 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r158115939
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 ---
@@ -185,29 +188,25 @@ public long commit(OffsetAndMetadata committedOffset) 
{
 LOG.trace("{}", this);
 
 LOG.debug("Committed [{}] offsets in the range [{}-{}] for 
topic-partition [{}]."
-+ " Processing will resume at [{}] if the spout restarts.",
++ " Processing will resume at [{}] upon spout restart",
 numCommittedOffsets, preCommitCommittedOffset, 
this.committedOffset - 1, tp, this.committedOffset);
 
 return numCommittedOffsets;
 }
 
-public long getCommittedOffset() {
-return committedOffset;
-}
-
-public boolean isEmpty() {
-return ackedMsgs.isEmpty();
-}
-
-public boolean contains(ConsumerRecord record) {
-return contains(new KafkaSpoutMessageId(record));
+/**
+ * Checks if this OffsetManager has committed to Kafka.
+ *
+ * @return true if this OffsetManager has made at least one commit to 
Kafka, false otherwise
+ */
+public boolean hasCommitted() {
+return committed;
 }
 
 public boolean contains(KafkaSpoutMessageId msgId) {
 return ackedMsgs.contains(msgId);
 }
 
-//VisibleForTesting
--- End diff --

Didn't you replace this with the annotation earlier?


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-20 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r158114673
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
 ---
@@ -71,6 +76,9 @@
 1, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) 
//Retry once after a minute
 .build();
 private KafkaSpout spout;
+private KafkaConsumer consumer;
--- End diff --

This doesn't seem to be used?


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-12-20 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/storm/pull/2203
  
@ptgoetz , can you rebase ? 
@revans2 , can you take a look again so that we can get this in 1.2 ?


---


[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

2017-12-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2442


---


[GitHub] storm pull request #2476: Minor optimisation about trident kafka state

2017-12-20 Thread OuYangLiang
GitHub user OuYangLiang opened a pull request:

https://github.com/apache/storm/pull/2476

Minor optimisation about trident kafka state

Make TridentKafkaState a template class to eliminate warning messages in 
eclipse, and a minor optimisation that use StringBuilder.append instead of 
string concat operation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/OuYangLiang/storm master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2476


commit 97df3fb1632b6a6411c9f0598ac70e98d52cc654
Author: OuYang Liang 
Date:   2017-12-20T14:51:29Z

Make TridentKafkaState a template class to eliminate warning messages in
eclipse, and a minor optimization that use StringBuilder.append instead
of string concat operation.




---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158017417
  
--- Diff: storm-client/pom.xml ---
@@ -257,7 +258,7 @@
 
 
 **/generated/**
-10785
+50785
--- End diff --

Kindly reminder, given that we are getting close.


---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158007018
  
--- Diff: conf/defaults.yaml ---
@@ -253,11 +278,17 @@ topology.trident.batch.emit.interval.millis: 500
 topology.testing.always.try.serialize: false
 topology.classpath: null
 topology.environment: null
-topology.bolts.outgoing.overflow.buffer.enable: false
-topology.disruptor.wait.timeout.millis: 1000
-topology.disruptor.batch.size: 100
-topology.disruptor.batch.timeout.millis: 1
-topology.disable.loadaware.messaging: false
+
+topology.transfer.buffer.size: 1000   # size of recv  queue for transfer 
worker thread
+topology.transfer.batch.size: 1   # can be no larger than half of 
`topology.transfer.buffer.size`
+
+topology.executor.receive.buffer.size: 32768  # size of recv queue for 
spouts & bolts. Will be internally rounded up to next power of 2 (if not 
already a power of 2)
+topology.producer.batch.size: 1   # can be no larger than half 
of `topology.executor.receive.buffer.size`
+
+topology.batch.flush.interval.millis: 1  # Flush tuples are disabled if 
this is set to 0 or if (topology.producer.batch.size=1 and 
topology.transfer.batch.size=1).
+topology.spout.recvq.skips: 3  # Check recvQ once every N invocations of 
Spout's nextTuple() [when ACKs disabled]
+
+topology.disable.loadaware.messaging: false   # load aware messaging can 
degrade throughput
--- End diff --

May be better to describe the cases when we recommend using load  aware 
messaging, or disable load aware messaging. The comment may mislead users to 
consider this option as always better to disable.


---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158023150
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -105,9 +115,12 @@ public Task(Executor executor, Integer taskId) throws 
IOException {
 if (grouping != null && grouping != GrouperFactory.DIRECT) {
 throw new IllegalArgumentException("Cannot emitDirect to a 
task expecting a regular grouping");
 }
-new EmitInfo(values, stream, taskId, 
Collections.singletonList(outTaskId)).applyOn(userTopologyContext);
+if(!userTopologyContext.getHooks().isEmpty()) {
--- End diff --

Nice finding!


---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158042030
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
@@ -186,9 +201,11 @@ public Runnable getSuicideCallback() {
 final AtomicBoolean isTopologyActive;
 final AtomicReference> stormComponentToDebug;
 
-// executors and taskIds running in this worker
+// executors and localTaskIds running in this worker
 final Set> executors;
--- End diff --

Based on the comment, this can be also `localExecutors`.


---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158013348
  
--- Diff: 
examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+import org.jctools.queues.MpscArrayQueue;
+
+import java.util.concurrent.locks.LockSupport;
+
+public class JCToolsPerfTest {
+public static void main(String[] args) throws Exception {
+//oneProducer1Consumer();
+twoProducer1Consumer();
+//threeProducer1Consumer();
+//oneProducer2Consumers();
+//producerFwdConsumer();
+
+//JCQueue spoutQ = new JCQueue("spoutQ", 1024, 100, 0);
+//JCQueue ackQ = new JCQueue("ackQ", 1024, 100, 0);
+//
+//final AckingProducer ackingProducer = new AckingProducer(spoutQ, 
ackQ);
+//final Acker acker = new Acker(ackQ, spoutQ);
+//
+//runAllThds(ackingProducer, acker);
+
+while(true)
+Thread.sleep(1000);
+
+}
+
+private static void oneProducer1Consumer() {
+MpscArrayQueue q1 = new MpscArrayQueue(50_000);
+
+final Prod prod1 = new Prod(q1);
+final Cons cons1 = new Cons(q1);
+
+runAllThds(prod1, cons1);
+}
+
+private static void twoProducer1Consumer() {
+MpscArrayQueue q1 = new MpscArrayQueue(50_000);
+
+final Prod prod1 = new Prod(q1);
+final Prod prod2 = new Prod(q1);
+final Cons cons1 = new Cons(q1);
+
+runAllThds(prod1, cons1, prod2);
+}
+
+private static void threeProducer1Consumer() {
+MpscArrayQueue q1 = new MpscArrayQueue(50_000);
+
+final Prod prod1 = new Prod(q1);
+final Prod prod2 = new Prod(q1);
+final Prod prod3 = new Prod(q1);
+final Cons cons1 = new Cons(q1);
+
+runAllThds(prod1, prod2, prod3, cons1);
+}
+
+
+private static void oneProducer2Consumers() {
+MpscArrayQueue q1 = new MpscArrayQueue(50_000);
+MpscArrayQueue q2 = new MpscArrayQueue(50_000);
+
+final Prod2 prod1 = new Prod2(q1,q2);
+final Cons cons1 = new Cons(q1);
+final Cons cons2 = new Cons(q2);
+
+runAllThds(prod1, cons1, cons2);
+}
+
+public static void runAllThds(MyThd... threads) {
+for (Thread thread : threads) {
+thread.start();
+}
+addShutdownHooks(threads);
+}
+
+public static void addShutdownHooks(MyThd... threads) {
+
+Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+try {
+System.err.println("Stopping");
+for (MyThd thread : threads) {
+thread.halt = true;
+}
+
+for (Thread thread : threads) {
+System.err.println("Waiting for " + thread.getName());
+thread.join();
+}
+
+for (MyThd thread : threads) {
+System.err.printf("%s : %d,  Throughput: %,d \n", 
thread.getName(), thread.count, thread.throughput() );
+}
+} catch (InterruptedException e) {
+return;
+}
+}));
+
+}
+
+}
+
+
+
+abstract class MyThd extends Thread  {
+public long count=0;
+public long runTime = 0;
+public boolean halt = false;
+
+public MyThd(String thdName) {
+super(thdName);
+}
+
+public long throughput() {
+return getCount() / (runTime / 1000);
+}
+public long getCount() { return  count; }
+}
+
+class Prod extends MyThd {
+  

[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158013269
  
--- Diff: 
examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+import org.jctools.queues.MpscArrayQueue;
+
+import java.util.concurrent.locks.LockSupport;
+
+public class JCToolsPerfTest {
+public static void main(String[] args) throws Exception {
+//oneProducer1Consumer();
+twoProducer1Consumer();
+//threeProducer1Consumer();
+//oneProducer2Consumers();
+//producerFwdConsumer();
+
+//JCQueue spoutQ = new JCQueue("spoutQ", 1024, 100, 0);
+//JCQueue ackQ = new JCQueue("ackQ", 1024, 100, 0);
+//
+//final AckingProducer ackingProducer = new AckingProducer(spoutQ, 
ackQ);
+//final Acker acker = new Acker(ackQ, spoutQ);
+//
+//runAllThds(ackingProducer, acker);
+
+while(true)
+Thread.sleep(1000);
+
+}
+
+private static void oneProducer1Consumer() {
+MpscArrayQueue q1 = new MpscArrayQueue(50_000);
+
+final Prod prod1 = new Prod(q1);
+final Cons cons1 = new Cons(q1);
+
+runAllThds(prod1, cons1);
+}
+
+private static void twoProducer1Consumer() {
+MpscArrayQueue q1 = new MpscArrayQueue(50_000);
+
+final Prod prod1 = new Prod(q1);
+final Prod prod2 = new Prod(q1);
+final Cons cons1 = new Cons(q1);
+
+runAllThds(prod1, cons1, prod2);
+}
+
+private static void threeProducer1Consumer() {
+MpscArrayQueue q1 = new MpscArrayQueue(50_000);
+
+final Prod prod1 = new Prod(q1);
+final Prod prod2 = new Prod(q1);
+final Prod prod3 = new Prod(q1);
+final Cons cons1 = new Cons(q1);
+
+runAllThds(prod1, prod2, prod3, cons1);
+}
+
+
+private static void oneProducer2Consumers() {
+MpscArrayQueue q1 = new MpscArrayQueue(50_000);
+MpscArrayQueue q2 = new MpscArrayQueue(50_000);
+
+final Prod2 prod1 = new Prod2(q1,q2);
+final Cons cons1 = new Cons(q1);
+final Cons cons2 = new Cons(q2);
+
+runAllThds(prod1, cons1, cons2);
+}
+
+public static void runAllThds(MyThd... threads) {
+for (Thread thread : threads) {
+thread.start();
+}
+addShutdownHooks(threads);
+}
+
+public static void addShutdownHooks(MyThd... threads) {
+
+Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+try {
+System.err.println("Stopping");
+for (MyThd thread : threads) {
+thread.halt = true;
+}
+
+for (Thread thread : threads) {
+System.err.println("Waiting for " + thread.getName());
+thread.join();
+}
+
+for (MyThd thread : threads) {
+System.err.printf("%s : %d,  Throughput: %,d \n", 
thread.getName(), thread.count, thread.throughput() );
+}
+} catch (InterruptedException e) {
+return;
+}
+}));
+
+}
+
+}
+
+
+
+abstract class MyThd extends Thread  {
+public long count=0;
+public long runTime = 0;
+public boolean halt = false;
+
+public MyThd(String thdName) {
+super(thdName);
+}
+
+public long throughput() {
+return getCount() / (runTime / 1000);
+}
+public long getCount() { return  count; }
+}
+
+class Prod extends MyThd {
+  

[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158024035
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -122,28 +130,33 @@ public Task(Executor executor, Integer taskId) throws 
IOException {
 return new ArrayList<>(0);
 }
 
+
 public List getOutgoingTasks(String stream, List 
values) {
 if (debug) {
 LOG.info("Emitting Tuple: taskId={} componentId={} stream={} 
values={}", taskId, componentId, stream, values);
 }
 
-List outTasks = new ArrayList<>();
-if (!streamComponentToGrouper.containsKey(stream)) {
-throw new IllegalArgumentException("Unknown stream ID: " + 
stream);
-}
-if (null != streamComponentToGrouper.get(stream)) {
-// null value for __system
-for (LoadAwareCustomStreamGrouping grouper : 
streamComponentToGrouper.get(stream).values()) {
+ArrayList outTasks = new ArrayList<>();
+
+// TODO: PERF: expensive hashtable lookup in critical path
--- End diff --

I guess the patch is ready to review again (given that we've revised 
numbers for comparison), now we may need to decide whether removing the comment 
or file an issue. For me, at least in this case, looks like hashtable lookup is 
unavoidable.


---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158013426
  
--- Diff: 
examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+import org.jctools.queues.MpscArrayQueue;
+
+import java.util.concurrent.locks.LockSupport;
+
+public class JCToolsPerfTest {
+public static void main(String[] args) throws Exception {
+//oneProducer1Consumer();
+twoProducer1Consumer();
+//threeProducer1Consumer();
+//oneProducer2Consumers();
+//producerFwdConsumer();
+
+//JCQueue spoutQ = new JCQueue("spoutQ", 1024, 100, 0);
+//JCQueue ackQ = new JCQueue("ackQ", 1024, 100, 0);
+//
+//final AckingProducer ackingProducer = new AckingProducer(spoutQ, 
ackQ);
+//final Acker acker = new Acker(ackQ, spoutQ);
+//
+//runAllThds(ackingProducer, acker);
+
+while(true)
+Thread.sleep(1000);
+
+}
+
+private static void oneProducer1Consumer() {
+MpscArrayQueue q1 = new MpscArrayQueue(50_000);
+
+final Prod prod1 = new Prod(q1);
+final Cons cons1 = new Cons(q1);
+
+runAllThds(prod1, cons1);
+}
+
+private static void twoProducer1Consumer() {
+MpscArrayQueue q1 = new MpscArrayQueue(50_000);
+
+final Prod prod1 = new Prod(q1);
+final Prod prod2 = new Prod(q1);
+final Cons cons1 = new Cons(q1);
+
+runAllThds(prod1, cons1, prod2);
+}
+
+private static void threeProducer1Consumer() {
+MpscArrayQueue q1 = new MpscArrayQueue(50_000);
+
+final Prod prod1 = new Prod(q1);
+final Prod prod2 = new Prod(q1);
+final Prod prod3 = new Prod(q1);
+final Cons cons1 = new Cons(q1);
+
+runAllThds(prod1, prod2, prod3, cons1);
+}
+
+
+private static void oneProducer2Consumers() {
+MpscArrayQueue q1 = new MpscArrayQueue(50_000);
+MpscArrayQueue q2 = new MpscArrayQueue(50_000);
+
+final Prod2 prod1 = new Prod2(q1,q2);
+final Cons cons1 = new Cons(q1);
+final Cons cons2 = new Cons(q2);
+
+runAllThds(prod1, cons1, cons2);
+}
+
+public static void runAllThds(MyThd... threads) {
+for (Thread thread : threads) {
+thread.start();
+}
+addShutdownHooks(threads);
+}
+
+public static void addShutdownHooks(MyThd... threads) {
+
+Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+try {
+System.err.println("Stopping");
+for (MyThd thread : threads) {
+thread.halt = true;
+}
+
+for (Thread thread : threads) {
+System.err.println("Waiting for " + thread.getName());
+thread.join();
+}
+
+for (MyThd thread : threads) {
+System.err.printf("%s : %d,  Throughput: %,d \n", 
thread.getName(), thread.count, thread.throughput() );
+}
+} catch (InterruptedException e) {
+return;
+}
+}));
+
+}
+
+}
+
+
+
+abstract class MyThd extends Thread  {
+public long count=0;
+public long runTime = 0;
+public boolean halt = false;
+
+public MyThd(String thdName) {
+super(thdName);
+}
+
+public long throughput() {
+return getCount() / (runTime / 1000);
+}
+public long getCount() { return  count; }
+}
+
+class Prod extends MyThd {
+  

[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158009771
  
--- Diff: docs/Performance.md ---
@@ -0,0 +1,128 @@
+---
+title: Performance Tuning
+layout: documentation
+documentation: true
+---
+
+Latency, throughput and CPU consumption are the three key dimensions 
involved in performance tuning.
+In the following sections we discuss the settings that can used to tune 
along these dimension and understand the trade-offs.
+
+It is important to understand that these settings can vary depending on 
the topology, the type of hardware and the number of hosts used by the topology.
+
+## 1. Batch Size
+Spouts and Bolts communicate with each other via concurrent message 
queues. The batch size determines the number of messages to be buffered before
+the producer (spout/bolt) attempts to actually write to the downstream 
component's message queue. Inserting messages in batches to downstream
+queues helps reduce the number of synchronization operations required for 
the inserts. Consequently this helps achieve higher throughput. However,
+sometimes it may take a little time for the buffer to fill up, before it 
is flushed into the downstream queue. This implies that the buffered messages
+will take longer to become visible to the downstream consumer who is 
waiting to process them. This can increase the average end-to-end latency for
+these messages. The latency can get very bad if the batch sizes are large 
and the topology is not experiencing high traffic.
+
+`topology.producer.batch.size` : The batch size for writes into the 
receive queue of any spout/bolt is controlled via this setting. This setting
+impacts the communication within a worker process. Each upstream producer 
maintains a separate batch to a component's receive queue. So if two spout
+instances are writing to the same downstream bolt instance, each of the 
spout instances will have maintain a separate batch.
+
+`topology.transfer.batch.size` : Messages that are destined to a 
spout/bolt running on a different worker process, are sent to a queue called
+the **Worker Transfer Queue**. The Worker Transfer Thread is responsible 
for draining the messages in this queue and send them to the appropriate
+worker process over the network. This setting controls the batch size for 
writes into the Worker Transfer Queue.  This impacts the communication
+between worker processes.
+
+ Guidance
+
+**For Low latency:** Set batch size to 1. This basically disables 
batching. This is likely to reduce peak sustainable throughput under heavy 
traffic, but
+not likely to impact throughput much under low/medium traffic situations.
+**For High throughput:** Set batch size > 1. Try values like 10, 100, 1000 
or even higher and see what yields the best throughput for the topology.
+Beyond a certain point the throughput is likely to get worse.
+**Varying throughput:** Topologies often experience fluctuating amounts of 
incoming traffic over the day. Other topos may experience higher traffic in some
+paths and lower throughput in other paths simultaneously. If latency is 
not a concern, a small bach size (e.g. 10) and in conjunction with the right 
flush
+frequency may provide a reasonable compromise for such scenarios. For 
meeting stricter latency SLAs, consider setting it to 1.
+
+
+## 2. Flush Tuple Frequency
+In low/medium traffic situations or when batch size is too large, the 
batches may take too long to fill up and consequently the messages could take 
unacceptably
+long time to become visible to downstream components. In such case, 
periodic flushing of batches is necessary to keep the messages moving and avoid 
compromising
+latencies when batching is enabled.
+
+When batching has been enabled, special messages called *flush tuples* are 
inserted periodically into the receive queues of all spout and bolt instances.
+This causes each spout/bolt instance to flush all its outstanding batches 
to their respective downstream components.
+
+`topology.flush.tuple.freq.millis` : This setting controls how often the 
flush tuples are generated. Flush tuples are not generated if this 
configuration is
+set to 0 or if (`topology.producer.batch.size`=1 and 
`topology.transfer.batch.size`=1).
+
+
+ Guidance
+Flushing interval can be used as tool to retain the higher throughput 
benefits of batching and avoid batched messages getting stuck for too long 
waiting for their.
+batch to fill. Preferably this value should be larger than the average 
execute latencies of the bolts in the topology. Trying to flush the queues more 
frequently than
+the amount of time it takes to produce the messages may hurt performance. 
Understanding the average execute latencies of each bolt w

[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158034510
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -122,28 +130,33 @@ public Task(Executor executor, Integer taskId) throws 
IOException {
 return new ArrayList<>(0);
 }
 
+
 public List getOutgoingTasks(String stream, List 
values) {
 if (debug) {
 LOG.info("Emitting Tuple: taskId={} componentId={} stream={} 
values={}", taskId, componentId, stream, values);
 }
 
-List outTasks = new ArrayList<>();
-if (!streamComponentToGrouper.containsKey(stream)) {
-throw new IllegalArgumentException("Unknown stream ID: " + 
stream);
-}
-if (null != streamComponentToGrouper.get(stream)) {
-// null value for __system
-for (LoadAwareCustomStreamGrouping grouper : 
streamComponentToGrouper.get(stream).values()) {
+ArrayList outTasks = new ArrayList<>();
+
+// TODO: PERF: expensive hashtable lookup in critical path
+ArrayList groupers = 
streamToGroupers.get(stream);
+if (null != groupers)  {
+for (int i=0; i compTasks = grouper.chooseTasks(taskId, 
values, loadMapping);
-outTasks.addAll(compTasks);
+outTasks.addAll(compTasks);   // TODO: PERF: this is a 
perf hit
--- End diff --

Same here: may be better to decide.

IMHO I'm still not convinced that it can introduce performance hit. 
Something I can imagine are allocating backed array (only once in method call) 
and expanding array, but unless we use fan-out in huge size topology, outTasks 
is expected to be small.


---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158016550
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java
 ---
@@ -356,9 +356,8 @@ public void handle(TaskInfo taskInfo, 
Collection dataPoints) {
 
 TopologyBuilder builder = new TopologyBuilder();
 
-int numEach = 4 * parallelism;
+int numEach = parallelism;
--- End diff --

Nice catch.


---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158035294
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -177,6 +196,35 @@ public BuiltinMetrics getBuiltInMetrics() {
 return builtInMetrics;
 }
 
+
+// Non Blocking call. If cannot emmit to destination immediately, such 
tuples will be added to `pendingEmits` argument
--- End diff --

nit: `emmit` -> `emit`


---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158041504
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -192,7 +240,7 @@ private TopologyContext mkTopologyContext(StormTopology 
topology) throws IOExcep
 ConfigUtils.supervisorStormDistRoot(conf, 
workerData.getTopologyId())),
 ConfigUtils.workerPidsRoot(conf, 
workerData.getWorkerId()),
 taskId,
-workerData.getPort(), workerData.getTaskIds(),
+workerData.getPort(), workerData.getLocalTaskIds(),
--- End diff --

Renamed method looks better to clarify the meaning. Nice improvement.


---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158039708
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -177,6 +196,35 @@ public BuiltinMetrics getBuiltInMetrics() {
 return builtInMetrics;
 }
 
+
+// Non Blocking call. If cannot emmit to destination immediately, such 
tuples will be added to `pendingEmits` argument
+public void sendUnanchored(String stream, List values, 
ExecutorTransfer transfer, Queue pendingEmits) {
+Tuple tuple = getTuple(stream, values);
+List tasks = getOutgoingTasks(stream, values);
+for (Integer t : tasks) {
+AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
+transfer.tryTransfer(addressedTuple, pendingEmits);
+}
+}
+
+/**
+ * Send sampled data to the eventlogger if the global or component 
level debug flag is set (via nimbus api).
+ */
+public void sendToEventLogger(Executor executor, List values,
--- End diff --

Will the method be called every emit when number of executors of event 
logger are more than 0? If then we may want to apply more optimizations here 
(not in this issue but other issue).


---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r158017461
  
--- Diff: examples/storm-perf/pom.xml ---
@@ -81,7 +81,7 @@
 maven-checkstyle-plugin
 
 
-207
+407
--- End diff --

Kindly reminder, given that we are getting close.


---


[GitHub] storm issue #2445: STORM-2843: [Flux] properties file not found when loading...

2017-12-20 Thread vesense
Github user vesense commented on the issue:

https://github.com/apache/storm/pull/2445
  
@HeartSaVioR PR updated. Could you take a look again?


---


[GitHub] storm issue #2469: STORM-2861: Explicit reference kafka-schema-registry-clie...

2017-12-20 Thread vesense
Github user vesense commented on the issue:

https://github.com/apache/storm/pull/2469
  
@ptgoetz @satishd 
Yes, I know that kafka-avro-serializer depends on 
kafka-schema-registry-client and avro.
There were io.confluent.kafka.schemaregistry.client.XXX not found errors 
when compiling the storm-hdfs code, and this PR fixed the issue. (To tell the 
truth, it's a bit strange.)
Another benefit is that users can know clearly about which avro version 
they are using. By default the avro module is from hadoop-common dependencies.


---


[GitHub] storm issue #2433: [STORM-2693] Heartbeats and assignments promotion for sto...

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2433
  
@danny0405 
Btw, the build in local passed, but it incurs considerable addition in 
build time.

```
[INFO] 

[INFO] Reactor Summary:
[INFO]
[INFO] Storm .. SUCCESS [  
3.699 s]
[INFO] Apache Storm - Checkstyle .. SUCCESS [  
0.708 s]
[INFO] multilang-javascript ... SUCCESS [  
0.126 s]
[INFO] multilang-python ... SUCCESS [  
0.265 s]
[INFO] multilang-ruby . SUCCESS [  
0.093 s]
[INFO] maven-shade-clojure-transformer  SUCCESS [  
2.413 s]
[INFO] storm-maven-plugins  SUCCESS [  
2.638 s]
[INFO] Storm Client ... SUCCESS [ 
44.607 s]
[INFO] storm-server ... SUCCESS [02:27 
min]
[INFO] storm-clojure .. SUCCESS [  
5.257 s]
[INFO] Storm Core . SUCCESS [23:34 
min]
[INFO] Storm Webapp ... SUCCESS [ 
11.411 s]
[INFO] storm-clojure-test . SUCCESS [  
2.185 s]
[INFO] storm-submit-tools . SUCCESS [  
8.076 s]
[INFO] flux ... SUCCESS [  
0.093 s]
[INFO] flux-wrappers .. SUCCESS [  
0.354 s]
[INFO] storm-kafka  SUCCESS [01:48 
min]
[INFO] storm-autocreds  SUCCESS [  
3.273 s]
[INFO] storm-hdfs . SUCCESS [ 
39.636 s]
[INFO] storm-hbase  SUCCESS [  
4.100 s]
[INFO] flux-core .. SUCCESS [  
4.035 s]
[INFO] flux-examples .. SUCCESS [ 
12.051 s]
[INFO] storm-sql-runtime .. SUCCESS [  
2.391 s]
[INFO] storm-sql-core . SUCCESS [01:47 
min]
[INFO] storm-sql-kafka  SUCCESS [  
4.736 s]
[INFO] storm-redis  SUCCESS [  
5.799 s]
[INFO] storm-sql-redis  SUCCESS [  
8.231 s]
[INFO] storm-mongodb .. SUCCESS [  
0.826 s]
[INFO] storm-sql-mongodb .. SUCCESS [  
3.936 s]
[INFO] storm-sql-hdfs . SUCCESS [ 
12.313 s]
[INFO] sql  SUCCESS [  
0.150 s]
[INFO] storm-hive . SUCCESS [ 
18.669 s]
[INFO] storm-jdbc . SUCCESS [  
1.777 s]
[INFO] storm-eventhubs  SUCCESS [  
3.725 s]
[INFO] storm-elasticsearch  SUCCESS [ 
20.401 s]
[INFO] storm-solr . SUCCESS [  
1.515 s]
[INFO] storm-metrics .. SUCCESS [  
0.590 s]
[INFO] storm-cassandra  SUCCESS [01:01 
min]
[INFO] storm-mqtt . SUCCESS [ 
22.396 s]
[INFO] storm-kafka-client . SUCCESS [02:06 
min]
[INFO] storm-opentsdb . SUCCESS [  
0.664 s]
[INFO] storm-kafka-monitor  SUCCESS [  
2.430 s]
[INFO] storm-kinesis .. SUCCESS [  
1.281 s]
[INFO] storm-druid  SUCCESS [  
1.958 s]
[INFO] storm-jms .. SUCCESS [ 
12.099 s]
[INFO] storm-pmml . SUCCESS [  
0.267 s]
[INFO] storm-rocketmq . SUCCESS [ 
11.298 s]
[INFO] blobstore-migrator . SUCCESS [ 
12.028 s]
[INFO] Storm Integration Test . SUCCESS [  
0.696 s]
[INFO] storm-starter .. SUCCESS [ 
19.945 s]
[INFO] storm-loadgen .. SUCCESS [  
2.457 s]
[INFO] storm-mongodb-examples . SUCCESS [  
0.771 s]
[INFO] storm-redis-examples ... SUCCESS [  
1.096 s]
[INFO] storm-opentsdb-examples  SUCCESS [  
1.489 s]
[INFO] storm-solr-examples  SUCCES

[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-20 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r157942171
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java
 ---
@@ -93,6 +93,10 @@ private SolrJsonMapper(Builder builder) {
 jsonUpdateUrl = builder.jsonUpdateUrl;
 }
 
+@Override
+public void configure() {
--- End diff --

This may not be needed here once it is defined with default method in 
`SolrMapper`.


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-20 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r157944148
  
--- Diff: external/storm-solr/pom.xml ---
@@ -46,19 +46,24 @@
 
 org.apache.solr
 solr-solrj
-5.2.1
-compile
+5.5.5
--- End diff --

you may want to have property for org.apache.solr version and use the same 
in other solr dependencies in this module instead of declaring at each 
dependency usage.


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-20 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r158011321
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilderV2.java
 ---
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.solr.schema.builder;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
+import org.apache.solr.client.solrj.request.schema.FieldTypeDefinition;
+import org.apache.solr.client.solrj.request.schema.SchemaRequest;
+import org.apache.solr.client.solrj.response.schema.SchemaRepresentation;
+import org.apache.solr.client.solrj.response.schema.SchemaResponse;
+import org.apache.storm.solr.config.SolrConfig;
+import org.apache.storm.solr.schema.CopyField;
+import org.apache.storm.solr.schema.Field;
+import org.apache.storm.solr.schema.FieldType;
+import org.apache.storm.solr.schema.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class that builds the {@link Schema} object from the schema returned by 
the SchemaRequest
+ */
+public class RestJsonSchemaBuilderV2 implements SchemaBuilder {
+private static final Logger logger = 
LoggerFactory.getLogger(RestJsonSchemaBuilderV2.class);
+private Schema schema = new Schema();
+private SolrConfig solrConfig;
+private String collection;
+
+public RestJsonSchemaBuilderV2(SolrConfig solrConfig, String 
collection) throws IOException {
+this.solrConfig = solrConfig;
+this.collection = collection;
+}
+
+@Override
+public void buildSchema() throws IOException {
+try {
+if (solrConfig.enableKerberos())
+HttpClientUtil.setConfigurer(new 
Krb5HttpClientConfigurer());
+
+SolrClient solrClient = new 
CloudSolrClient(solrConfig.getZkHostString());
+SchemaRequest schemaRequest = new SchemaRequest();
+logger.debug("Downloading schema for collection: {}", 
collection );
+SchemaResponse schemaResponse = 
schemaRequest.process(solrClient, collection);
+logger.debug("SchemaResponse Schema: " + schemaResponse);
--- End diff --

Better to have parameterized log statement.


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-20 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r157942068
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java 
---
@@ -26,6 +26,7 @@
 import java.util.List;
 
 public interface SolrMapper extends Serializable {
+void configure();
--- End diff --

This can break any existing mappers users may have already created. Better 
to have default method to address that.


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-20 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r157941248
  
--- Diff: external/storm-solr/README.md ---
@@ -97,6 +97,29 @@ field separates each value with the token % instead of 
the default | . To use th
 .setMultiValueFieldToken("%").build();
 ```
 
+##Working with Kerberized Solr
+If your topology is going to interact with kerberized Solr, your 
bolts/states needs to be authenticated by Solr Server. We can enable
+authentication by distributing keytabs for solr user on all worker hosts. 
We can configure the solt bolt to use keytabs by setting
--- End diff --

nit: configure the solr bolt


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-20 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r157943487
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilderV2.java
 ---
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.solr.schema.builder;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
+import org.apache.solr.client.solrj.request.schema.FieldTypeDefinition;
+import org.apache.solr.client.solrj.request.schema.SchemaRequest;
+import org.apache.solr.client.solrj.response.schema.SchemaRepresentation;
+import org.apache.solr.client.solrj.response.schema.SchemaResponse;
+import org.apache.storm.solr.config.SolrConfig;
+import org.apache.storm.solr.schema.CopyField;
+import org.apache.storm.solr.schema.Field;
+import org.apache.storm.solr.schema.FieldType;
+import org.apache.storm.solr.schema.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class that builds the {@link Schema} object from the schema returned by 
the SchemaRequest
+ */
+public class RestJsonSchemaBuilderV2 implements SchemaBuilder {
+private static final Logger logger = 
LoggerFactory.getLogger(RestJsonSchemaBuilderV2.class);
+private Schema schema = new Schema();
+private SolrConfig solrConfig;
+private String collection;
+
+public RestJsonSchemaBuilderV2(SolrConfig solrConfig, String 
collection) throws IOException {
--- End diff --

This constructor does not really throw any IOException.


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-20 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r157941206
  
--- Diff: external/storm-solr/README.md ---
@@ -97,6 +97,29 @@ field separates each value with the token % instead of 
the default | . To use th
 .setMultiValueFieldToken("%").build();
 ```
 
+##Working with Kerberized Solr
+If your topology is going to interact with kerberized Solr, your 
bolts/states needs to be authenticated by Solr Server. We can enable
--- End diff --

nit: bolts/states need to be 


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-20 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r157943956
  
--- Diff: external/storm-solr/pom.xml ---
@@ -46,19 +46,24 @@
 
 org.apache.solr
 solr-solrj
-5.2.1
-compile
+5.5.5
+
+
+org.apache.httpcomponents
+httpclient
--- End diff --

I do not see any httpclient dependency for solr-solrj but 
org.apache.httpcomponents:httpcore:4.4.1 and 
org.apache.httpcomponents:httpmime:4.4.1. Am I missing anything here?


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-20 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r157941502
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
 ---
@@ -88,11 +92,14 @@ private int capacity() {
 @Override
 protected void process(Tuple tuple) {
 try {
+LOG.debug("Processing Tuple: {}", tuple);
 SolrRequest request = solrMapper.toSolrRequest(tuple);
 solrClient.request(request, solrMapper.getCollection());
 ack(tuple);
+LOG.debug("Acked Tuple: {}", tuple);
 } catch (Exception e) {
 fail(tuple, e);
+LOG.debug("Failed Tuple: {}", tuple, e);
--- End diff --

nit: may want to push log statement to `fail` method.


---


[GitHub] storm pull request #2475: Put contextual information in the ThreadContext fo...

2017-12-20 Thread hmcc
GitHub user hmcc opened a pull request:

https://github.com/apache/storm/pull/2475

Put contextual information in the ThreadContext for multilang logging.

Connected to https://issues.apache.org/jira/browse/STORM-2862; `master` 
version of apache/storm#2474.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hmcc/storm STORM-2862

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2475.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2475


commit 4cc61f4ccf6504e926a6dafaa30d18ba457b3408
Author: Heather McCartney 
Date:   2017-12-20T11:55:48Z

Put contextual information in the ThreadContext for multilang logging.




---


[GitHub] storm issue #2433: [STORM-2693] Heartbeats and assignments promotion for sto...

2017-12-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2433
  
@danny0405 Please let me know when you are finished and the PR is ready to 
review again. Thanks a lot for the patience!


---


[GitHub] storm pull request #2474: STORM-2862: More flexible logging in multilang

2017-12-20 Thread hmcc
GitHub user hmcc opened a pull request:

https://github.com/apache/storm/pull/2474

STORM-2862: More flexible logging in multilang

Connected to https://issues.apache.org/jira/browse/STORM-2862.
Put contextual information in the ThreadContext for multilang logging.

I've updated the Python and Ruby resources in multilang, but not the JS 
one. That's because the JS one is written to support variable arguments 
(probably because of 
[STORM-2435](https://issues.apache.org/jira/browse/STORM-2435)). I can't see a 
way to support optional IDs without breaking the current behaviour.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hmcc/storm STORM-2862-1.x

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2474.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2474


commit 92511b9ccf81c23476952aaf61e31556c41920b2
Author: Heather McCartney 
Date:   2017-12-20T10:14:53Z

Put contextual information in the ThreadContext for multilang logging.




---