[GitHub] storm issue #2464: STORM-2847 1.x
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2464 Can you please squash the commits. You can merge after. ---
[GitHub] storm pull request #2464: STORM-2847 1.x
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2464#discussion_r158206579 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java --- @@ -0,0 +1,145 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.KafkaUnitRule; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaSpoutReactivationTest { + +@Rule +public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); + +@Captor +private ArgumentCaptor> commitCapture; + +private final TopologyContext topologyContext = mock(TopologyContext.class); +private final Map conf = new HashMap<>(); +private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); +private final long commitOffsetPeriodMs = 2_000; +private KafkaConsumer consumerSpy; +private KafkaConsumer postReactivationConsumerSpy; +private KafkaSpout spout; +private final int maxPollRecords = 10; + +@Before +public void setUp() { +KafkaSpoutConfig spoutConfig = +SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig( +KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), +SingleTopicKafkaSpoutConfiguration.TOPIC)) +.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST) +.setOffsetCommitPeriodMs(commitOffsetPeriodMs) +.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) +.build(); +KafkaConsumerFactory consumerFactory = new KafkaConsumerFactoryDefault<>(); +this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig)); +this.postReactivationConsumerSpy = spy(consumerFactory.createConsumer(spoutConfig)); +KafkaConsumerFactory consumerFactoryMock = mock(KafkaConsumerFactory.class); --- End diff -- I need to verify that all messages were committed https://github.com/apache/storm/pull/2464/files#diff-ed317082b62537f44916bda817cf1cecR120 which is done by checking if commit was called on the consumer spy. ---
[GitHub] storm pull request #2464: STORM-2847 1.x
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2464#discussion_r158205889 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java --- @@ -0,0 +1,145 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.KafkaUnitRule; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaSpoutReactivationTest { + +@Rule +public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); + +@Captor +private ArgumentCaptor> commitCapture; + +private final TopologyContext topologyContext = mock(TopologyContext.class); +private final Map conf = new HashMap<>(); +private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); +private final long commitOffsetPeriodMs = 2_000; +private KafkaConsumer consumerSpy; +private KafkaConsumer postReactivationConsumerSpy; +private KafkaSpout spout; +private final int maxPollRecords = 10; + +@Before +public void setUp() { +KafkaSpoutConfig spoutConfig = +SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig( +KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), +SingleTopicKafkaSpoutConfiguration.TOPIC)) +.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST) +.setOffsetCommitPeriodMs(commitOffsetPeriodMs) +.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) +.build(); +KafkaConsumerFactory consumerFactory = new KafkaConsumerFactoryDefault<>(); +this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig)); +this.postReactivationConsumerSpy = spy(consumerFactory.createConsumer(spoutConfig)); +KafkaConsumerFactory consumerFactoryMock = mock(KafkaConsumerFactory.class); --- End diff -- why do you need consumerFactory and consumerFactoryMock? ---
[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2465 Thanks, I missed those changes. Regarding testing EARLIEST/LATEST, I don't understand why you'd need to mock KafkaSpoutConfig. I think it can be tested as an integration test, like the ones in SingleTopicKafkaSpoutTest, where you start a spout, emit and commit a tuple, deactivate/reactivate it, verify that it can emit and commit the next tuple, close the spout and create a new one and verify that the new spout starts over/starts at the end. Alternatively you can do it with mocks like many of the other tests, basically doing the same flow but verifying that the spout calls seekToBeginning/seekToEnd instead of checking which tuples it emits. Could you put a +1 on https://github.com/apache/storm/pull/2464 so I can merge? ---
[GitHub] storm issue #2433: [STORM-2693] Heartbeats and assignments promotion for sto...
Github user danny0405 commented on the issue: https://github.com/apache/storm/pull/2433 @HeartSaVioR i have fixed the checkstyle issue already, yeah the storm-core building is slow, i will check the reason. ---
[GitHub] storm pull request #2476: Minor optimisation about trident kafka state
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2476#discussion_r158203266 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java --- @@ -116,8 +114,9 @@ public void updateState(List tuples, TridentCollector collector) { } if (exceptions.size() > 0) { -StringBuilder errorMsg = new StringBuilder("Could not retrieve result for messages " + tuples + " from topic = " + topic -+ " because of the following exceptions:" + System.lineSeparator()); +StringBuilder errorMsg = new StringBuilder("Could not retrieve result for messages "); --- End diff -- Makes sense, thanks. ---
[GitHub] storm issue #2476: Minor optimisation about trident kafka state
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2476 +1 ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158193924 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.daemon.worker; + +import org.apache.storm.messaging.netty.BackPressureStatus; +import org.apache.storm.utils.JCQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.storm.Constants.SYSTEM_TASK_ID; + +public class BackPressureTracker { +static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class); + +private final Map bpTasks = new ConcurrentHashMap<>(); // updates are more frequent than iteration +private final Set nonBpTasks = ConcurrentHashMap.newKeySet(); +private final String workerId; + +public BackPressureTracker(String workerId, List allLocalTasks) { +this.workerId = workerId; +this.nonBpTasks.addAll(allLocalTasks);// all tasks are considered to be not under BP initially +this.nonBpTasks.remove((int)SYSTEM_TASK_ID); // not tracking system task +} + +/* called by transferLocalBatch() on NettyWorker thread + * returns true if an update was recorded, false if taskId is already under BP + */ +public boolean recordBackpressure(Integer taskId, JCQueue recvQ) { +if (nonBpTasks.remove(taskId)) { +bpTasks.put(taskId, recvQ); +return true; +} +return false; +} + +// returns true if there was a change in the BP situation +public boolean refreshBpTaskList() { +boolean changed = false; +LOG.debug("Running Back Pressure status change check"); +for (Iterator> itr = bpTasks.entrySet().iterator(); itr.hasNext(); ) { +Entry entry = itr.next(); +if (entry.getValue().isEmptyOverflow()) { +// move task from bpTasks to noBpTasks +nonBpTasks.add(entry.getKey()); +itr.remove(); +changed = true; +} else { +//LOG.info("Task = {}, OverflowCount = {}, Q = {}", entry.getKey(), entry.getValue().getOverflowCount(), entry.getValue().getQueuedCount() ); --- End diff -- Would we want to leave it with DEBUG, or remove the comment? ---
Back-pressure mechanism
Dear All, My concern is about on which queue Storm relies to for back-pressure. I did simple test for back-pressure supported by Storm.Each instance (executor) maintains incoming(receive) Q and outgoing(transfer) Q, and according to min and max threshold on these queues, a back-pressure works to slow down the spout in case of queue buildup. The purpose I wanted to make sure in case of link bottleneck whether back-pressure still helps or not. The conclusion, it helps only in case of queue buildup due to CPU bottleneck. I guess the reason for which why it could not make it for link bottleneck, because back-pessure relies only on the executor receive Q. Does this make sense? If so, could we anyway make the back-pressure also working if ececutor transfer Q is full in case of link bottleneck? Thanks! Best,AlgobySent from Yahoo Mail on Android
[GitHub] storm pull request #2476: Minor optimisation about trident kafka state
Github user OuYangLiang commented on a diff in the pull request: https://github.com/apache/storm/pull/2476#discussion_r158179810 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java --- @@ -116,8 +114,9 @@ public void updateState(List tuples, TridentCollector collector) { } if (exceptions.size() > 0) { -StringBuilder errorMsg = new StringBuilder("Could not retrieve result for messages " + tuples + " from topic = " + topic -+ " because of the following exceptions:" + System.lineSeparator()); +StringBuilder errorMsg = new StringBuilder("Could not retrieve result for messages "); --- End diff -- There's a foreach statement right below it to iterate the exceptions, and append each exception to the errorMsg, it is still necessary I think. for (ExecutionException exception : exceptions) { errorMsg = errorMsg.append(exception.getMessage()).append(System.lineSeparator()); } ---
[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2465 These two changes address your comment: "_test that when you call findNextCommitOffset, it returns the metadata you put in)_" https://github.com/hmcl/storm-apache/blob/baaf1fe6ab725ec50ecb09238f8957bc22a4e290/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java#L67 https://github.com/hmcl/storm-apache/blob/baaf1fe6ab725ec50ecb09238f8957bc22a4e290/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java#L82 As for the other tests, I started working on adding tests for scenarios around EARLIEST/LATEST and topology id. However, after working on it for awhile, it didn't feel natural that to test such a simple thing I had to mock so many objects (including KafkaSpoutConfig) and run the whole spout code. Then I decided that I would evaluate if the tests, code, or both should be considered for refactoring to make it easier to test. I need to dig into this further, but it seems to me that system tests are coupled with unit tests, and that makes it hard to write pure unit tests. Let's merge https://github.com/apache/storm/pull/2464 and I will put my patch on top of it. ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user ptgoetz commented on the issue: https://github.com/apache/storm/pull/2203 @arunmahadevan Rebased. ---
[GitHub] storm pull request #2476: Minor optimisation about trident kafka state
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2476#discussion_r158124853 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java --- @@ -73,7 +71,7 @@ public void commit(Long txid) { public void prepare(Properties options) { Objects.requireNonNull(mapper, "mapper can not be null"); Objects.requireNonNull(topicSelector, "topicSelector can not be null"); -producer = new KafkaProducer(options); +producer = new KafkaProducer(options); --- End diff -- Nit: I think `new KafkaProducer<>(options)` will work here ---
[GitHub] storm pull request #2476: Minor optimisation about trident kafka state
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2476#discussion_r158125370 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java --- @@ -28,33 +28,34 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TridentKafkaStateFactory implements StateFactory { +public class TridentKafkaStateFactory implements StateFactory { +private static final long serialVersionUID = -3613240970062343385L; private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class); -private TridentTupleToKafkaMapper mapper; +private TridentTupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; private Properties producerProperties = new Properties(); -public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) { +public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) { this.mapper = mapper; return this; } -public TridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) { +public TridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) { this.topicSelector = selector; return this; } -public TridentKafkaStateFactory withProducerProperties(Properties props) { +public TridentKafkaStateFactory withProducerProperties(Properties props) { this.producerProperties = props; return this; } @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions); -TridentKafkaState state = new TridentKafkaState() +TridentKafkaState state = new TridentKafkaState() --- End diff -- Can use `<>` on the right hand side ---
[GitHub] storm pull request #2476: Minor optimisation about trident kafka state
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2476#discussion_r158125258 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java --- @@ -116,8 +114,9 @@ public void updateState(List tuples, TridentCollector collector) { } if (exceptions.size() > 0) { -StringBuilder errorMsg = new StringBuilder("Could not retrieve result for messages " + tuples + " from topic = " + topic -+ " because of the following exceptions:" + System.lineSeparator()); +StringBuilder errorMsg = new StringBuilder("Could not retrieve result for messages "); --- End diff -- Since the message is only ~5 strings long, I don't think there's much reason to use a StringBuilder at all. ---
[GitHub] storm pull request #2476: Minor optimisation about trident kafka state
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2476#discussion_r158124954 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java --- @@ -89,19 +87,19 @@ public void updateState(List tuples, TridentCollector collector) { List> futures = new ArrayList<>(numberOfRecords); for (TridentTuple tuple : tuples) { topic = topicSelector.getTopic(tuple); -Object messageFromTuple = mapper.getMessageFromTuple(tuple); -Object keyFromTuple = mapper.getKeyFromTuple(tuple); +V messageFromTuple = mapper.getMessageFromTuple(tuple); +K keyFromTuple = mapper.getKeyFromTuple(tuple); if (topic != null) { if (messageFromTuple != null) { -Future result = producer.send(new ProducerRecord(topic, keyFromTuple, messageFromTuple)); +Future result = producer.send(new ProducerRecord(topic, keyFromTuple, messageFromTuple)); --- End diff -- Pretty sure the `<>` will also work here ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r158124105 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java --- @@ -100,6 +103,8 @@ public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExcee when(consumerMock.poll(anyLong())) .thenReturn(new ConsumerRecords<>(records)); +OffsetAndMetadataStub.committed(consumerMock); --- End diff -- I tried checking out the branch and running the test without this line. It seems to still pass. Maybe we should just get rid of it (and OffsetAndMetadataStub)? ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r158114121 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -115,26 +115,30 @@ public KafkaSpoutConfig(Builder builder) { } /** - * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will affect the number of consumer - * records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. - * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. + * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to Kafka upon topology deployment. + * By default this parameter is set to UNCOMMITTED_EARLIEST. If the strategy is set to: + * * - * EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous - * commits - * LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of - * previous commits - * UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been - * committed, it behaves as EARLIEST. - * UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been - * committed, it behaves as LATEST. + * EARLIEST - the kafka spout polls records starting in the first offset of the partition, regardless + * of previous commits. When the topology is activated/deactivated this setting has no effect --- End diff -- I think the last sentence here is confusing unless you know the context (e.g. have read this PR). How about "This setting only takes effect on topology deployment"? ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r158115939 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java --- @@ -185,29 +188,25 @@ public long commit(OffsetAndMetadata committedOffset) { LOG.trace("{}", this); LOG.debug("Committed [{}] offsets in the range [{}-{}] for topic-partition [{}]." -+ " Processing will resume at [{}] if the spout restarts.", ++ " Processing will resume at [{}] upon spout restart", numCommittedOffsets, preCommitCommittedOffset, this.committedOffset - 1, tp, this.committedOffset); return numCommittedOffsets; } -public long getCommittedOffset() { -return committedOffset; -} - -public boolean isEmpty() { -return ackedMsgs.isEmpty(); -} - -public boolean contains(ConsumerRecord record) { -return contains(new KafkaSpoutMessageId(record)); +/** + * Checks if this OffsetManager has committed to Kafka. + * + * @return true if this OffsetManager has made at least one commit to Kafka, false otherwise + */ +public boolean hasCommitted() { +return committed; } public boolean contains(KafkaSpoutMessageId msgId) { return ackedMsgs.contains(msgId); } -//VisibleForTesting --- End diff -- Didn't you replace this with the annotation earlier? ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r158114673 --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java --- @@ -71,6 +76,9 @@ 1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute .build(); private KafkaSpout spout; +private KafkaConsumer consumer; --- End diff -- This doesn't seem to be used? ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user arunmahadevan commented on the issue: https://github.com/apache/storm/pull/2203 @ptgoetz , can you rebase ? @revans2 , can you take a look again so that we can get this in 1.2 ? ---
[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2442 ---
[GitHub] storm pull request #2476: Minor optimisation about trident kafka state
GitHub user OuYangLiang opened a pull request: https://github.com/apache/storm/pull/2476 Minor optimisation about trident kafka state Make TridentKafkaState a template class to eliminate warning messages in eclipse, and a minor optimisation that use StringBuilder.append instead of string concat operation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/OuYangLiang/storm master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2476.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2476 commit 97df3fb1632b6a6411c9f0598ac70e98d52cc654 Author: OuYang Liang Date: 2017-12-20T14:51:29Z Make TridentKafkaState a template class to eliminate warning messages in eclipse, and a minor optimization that use StringBuilder.append instead of string concat operation. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158017417 --- Diff: storm-client/pom.xml --- @@ -257,7 +258,7 @@ **/generated/** -10785 +50785 --- End diff -- Kindly reminder, given that we are getting close. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158007018 --- Diff: conf/defaults.yaml --- @@ -253,11 +278,17 @@ topology.trident.batch.emit.interval.millis: 500 topology.testing.always.try.serialize: false topology.classpath: null topology.environment: null -topology.bolts.outgoing.overflow.buffer.enable: false -topology.disruptor.wait.timeout.millis: 1000 -topology.disruptor.batch.size: 100 -topology.disruptor.batch.timeout.millis: 1 -topology.disable.loadaware.messaging: false + +topology.transfer.buffer.size: 1000 # size of recv queue for transfer worker thread +topology.transfer.batch.size: 1 # can be no larger than half of `topology.transfer.buffer.size` + +topology.executor.receive.buffer.size: 32768 # size of recv queue for spouts & bolts. Will be internally rounded up to next power of 2 (if not already a power of 2) +topology.producer.batch.size: 1 # can be no larger than half of `topology.executor.receive.buffer.size` + +topology.batch.flush.interval.millis: 1 # Flush tuples are disabled if this is set to 0 or if (topology.producer.batch.size=1 and topology.transfer.batch.size=1). +topology.spout.recvq.skips: 3 # Check recvQ once every N invocations of Spout's nextTuple() [when ACKs disabled] + +topology.disable.loadaware.messaging: false # load aware messaging can degrade throughput --- End diff -- May be better to describe the cases when we recommend using load aware messaging, or disable load aware messaging. The comment may mislead users to consider this option as always better to disable. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158023150 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java --- @@ -105,9 +115,12 @@ public Task(Executor executor, Integer taskId) throws IOException { if (grouping != null && grouping != GrouperFactory.DIRECT) { throw new IllegalArgumentException("Cannot emitDirect to a task expecting a regular grouping"); } -new EmitInfo(values, stream, taskId, Collections.singletonList(outTaskId)).applyOn(userTopologyContext); +if(!userTopologyContext.getHooks().isEmpty()) { --- End diff -- Nice finding! ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158042030 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java --- @@ -186,9 +201,11 @@ public Runnable getSuicideCallback() { final AtomicBoolean isTopologyActive; final AtomicReference> stormComponentToDebug; -// executors and taskIds running in this worker +// executors and localTaskIds running in this worker final Set> executors; --- End diff -- Based on the comment, this can be also `localExecutors`. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158013348 --- Diff: examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.utils; + +import org.jctools.queues.MpscArrayQueue; + +import java.util.concurrent.locks.LockSupport; + +public class JCToolsPerfTest { +public static void main(String[] args) throws Exception { +//oneProducer1Consumer(); +twoProducer1Consumer(); +//threeProducer1Consumer(); +//oneProducer2Consumers(); +//producerFwdConsumer(); + +//JCQueue spoutQ = new JCQueue("spoutQ", 1024, 100, 0); +//JCQueue ackQ = new JCQueue("ackQ", 1024, 100, 0); +// +//final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ); +//final Acker acker = new Acker(ackQ, spoutQ); +// +//runAllThds(ackingProducer, acker); + +while(true) +Thread.sleep(1000); + +} + +private static void oneProducer1Consumer() { +MpscArrayQueue q1 = new MpscArrayQueue(50_000); + +final Prod prod1 = new Prod(q1); +final Cons cons1 = new Cons(q1); + +runAllThds(prod1, cons1); +} + +private static void twoProducer1Consumer() { +MpscArrayQueue q1 = new MpscArrayQueue(50_000); + +final Prod prod1 = new Prod(q1); +final Prod prod2 = new Prod(q1); +final Cons cons1 = new Cons(q1); + +runAllThds(prod1, cons1, prod2); +} + +private static void threeProducer1Consumer() { +MpscArrayQueue q1 = new MpscArrayQueue(50_000); + +final Prod prod1 = new Prod(q1); +final Prod prod2 = new Prod(q1); +final Prod prod3 = new Prod(q1); +final Cons cons1 = new Cons(q1); + +runAllThds(prod1, prod2, prod3, cons1); +} + + +private static void oneProducer2Consumers() { +MpscArrayQueue q1 = new MpscArrayQueue(50_000); +MpscArrayQueue q2 = new MpscArrayQueue(50_000); + +final Prod2 prod1 = new Prod2(q1,q2); +final Cons cons1 = new Cons(q1); +final Cons cons2 = new Cons(q2); + +runAllThds(prod1, cons1, cons2); +} + +public static void runAllThds(MyThd... threads) { +for (Thread thread : threads) { +thread.start(); +} +addShutdownHooks(threads); +} + +public static void addShutdownHooks(MyThd... threads) { + +Runtime.getRuntime().addShutdownHook(new Thread(() -> { +try { +System.err.println("Stopping"); +for (MyThd thread : threads) { +thread.halt = true; +} + +for (Thread thread : threads) { +System.err.println("Waiting for " + thread.getName()); +thread.join(); +} + +for (MyThd thread : threads) { +System.err.printf("%s : %d, Throughput: %,d \n", thread.getName(), thread.count, thread.throughput() ); +} +} catch (InterruptedException e) { +return; +} +})); + +} + +} + + + +abstract class MyThd extends Thread { +public long count=0; +public long runTime = 0; +public boolean halt = false; + +public MyThd(String thdName) { +super(thdName); +} + +public long throughput() { +return getCount() / (runTime / 1000); +} +public long getCount() { return count; } +} + +class Prod extends MyThd { +
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158013269 --- Diff: examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.utils; + +import org.jctools.queues.MpscArrayQueue; + +import java.util.concurrent.locks.LockSupport; + +public class JCToolsPerfTest { +public static void main(String[] args) throws Exception { +//oneProducer1Consumer(); +twoProducer1Consumer(); +//threeProducer1Consumer(); +//oneProducer2Consumers(); +//producerFwdConsumer(); + +//JCQueue spoutQ = new JCQueue("spoutQ", 1024, 100, 0); +//JCQueue ackQ = new JCQueue("ackQ", 1024, 100, 0); +// +//final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ); +//final Acker acker = new Acker(ackQ, spoutQ); +// +//runAllThds(ackingProducer, acker); + +while(true) +Thread.sleep(1000); + +} + +private static void oneProducer1Consumer() { +MpscArrayQueue q1 = new MpscArrayQueue(50_000); + +final Prod prod1 = new Prod(q1); +final Cons cons1 = new Cons(q1); + +runAllThds(prod1, cons1); +} + +private static void twoProducer1Consumer() { +MpscArrayQueue q1 = new MpscArrayQueue(50_000); + +final Prod prod1 = new Prod(q1); +final Prod prod2 = new Prod(q1); +final Cons cons1 = new Cons(q1); + +runAllThds(prod1, cons1, prod2); +} + +private static void threeProducer1Consumer() { +MpscArrayQueue q1 = new MpscArrayQueue(50_000); + +final Prod prod1 = new Prod(q1); +final Prod prod2 = new Prod(q1); +final Prod prod3 = new Prod(q1); +final Cons cons1 = new Cons(q1); + +runAllThds(prod1, prod2, prod3, cons1); +} + + +private static void oneProducer2Consumers() { +MpscArrayQueue q1 = new MpscArrayQueue(50_000); +MpscArrayQueue q2 = new MpscArrayQueue(50_000); + +final Prod2 prod1 = new Prod2(q1,q2); +final Cons cons1 = new Cons(q1); +final Cons cons2 = new Cons(q2); + +runAllThds(prod1, cons1, cons2); +} + +public static void runAllThds(MyThd... threads) { +for (Thread thread : threads) { +thread.start(); +} +addShutdownHooks(threads); +} + +public static void addShutdownHooks(MyThd... threads) { + +Runtime.getRuntime().addShutdownHook(new Thread(() -> { +try { +System.err.println("Stopping"); +for (MyThd thread : threads) { +thread.halt = true; +} + +for (Thread thread : threads) { +System.err.println("Waiting for " + thread.getName()); +thread.join(); +} + +for (MyThd thread : threads) { +System.err.printf("%s : %d, Throughput: %,d \n", thread.getName(), thread.count, thread.throughput() ); +} +} catch (InterruptedException e) { +return; +} +})); + +} + +} + + + +abstract class MyThd extends Thread { +public long count=0; +public long runTime = 0; +public boolean halt = false; + +public MyThd(String thdName) { +super(thdName); +} + +public long throughput() { +return getCount() / (runTime / 1000); +} +public long getCount() { return count; } +} + +class Prod extends MyThd { +
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158024035 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java --- @@ -122,28 +130,33 @@ public Task(Executor executor, Integer taskId) throws IOException { return new ArrayList<>(0); } + public List getOutgoingTasks(String stream, List values) { if (debug) { LOG.info("Emitting Tuple: taskId={} componentId={} stream={} values={}", taskId, componentId, stream, values); } -List outTasks = new ArrayList<>(); -if (!streamComponentToGrouper.containsKey(stream)) { -throw new IllegalArgumentException("Unknown stream ID: " + stream); -} -if (null != streamComponentToGrouper.get(stream)) { -// null value for __system -for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) { +ArrayList outTasks = new ArrayList<>(); + +// TODO: PERF: expensive hashtable lookup in critical path --- End diff -- I guess the patch is ready to review again (given that we've revised numbers for comparison), now we may need to decide whether removing the comment or file an issue. For me, at least in this case, looks like hashtable lookup is unavoidable. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158013426 --- Diff: examples/storm-perf/src/main/java/org/apache/storm/perf/JCToolsPerfTest.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.utils; + +import org.jctools.queues.MpscArrayQueue; + +import java.util.concurrent.locks.LockSupport; + +public class JCToolsPerfTest { +public static void main(String[] args) throws Exception { +//oneProducer1Consumer(); +twoProducer1Consumer(); +//threeProducer1Consumer(); +//oneProducer2Consumers(); +//producerFwdConsumer(); + +//JCQueue spoutQ = new JCQueue("spoutQ", 1024, 100, 0); +//JCQueue ackQ = new JCQueue("ackQ", 1024, 100, 0); +// +//final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ); +//final Acker acker = new Acker(ackQ, spoutQ); +// +//runAllThds(ackingProducer, acker); + +while(true) +Thread.sleep(1000); + +} + +private static void oneProducer1Consumer() { +MpscArrayQueue q1 = new MpscArrayQueue(50_000); + +final Prod prod1 = new Prod(q1); +final Cons cons1 = new Cons(q1); + +runAllThds(prod1, cons1); +} + +private static void twoProducer1Consumer() { +MpscArrayQueue q1 = new MpscArrayQueue(50_000); + +final Prod prod1 = new Prod(q1); +final Prod prod2 = new Prod(q1); +final Cons cons1 = new Cons(q1); + +runAllThds(prod1, cons1, prod2); +} + +private static void threeProducer1Consumer() { +MpscArrayQueue q1 = new MpscArrayQueue(50_000); + +final Prod prod1 = new Prod(q1); +final Prod prod2 = new Prod(q1); +final Prod prod3 = new Prod(q1); +final Cons cons1 = new Cons(q1); + +runAllThds(prod1, prod2, prod3, cons1); +} + + +private static void oneProducer2Consumers() { +MpscArrayQueue q1 = new MpscArrayQueue(50_000); +MpscArrayQueue q2 = new MpscArrayQueue(50_000); + +final Prod2 prod1 = new Prod2(q1,q2); +final Cons cons1 = new Cons(q1); +final Cons cons2 = new Cons(q2); + +runAllThds(prod1, cons1, cons2); +} + +public static void runAllThds(MyThd... threads) { +for (Thread thread : threads) { +thread.start(); +} +addShutdownHooks(threads); +} + +public static void addShutdownHooks(MyThd... threads) { + +Runtime.getRuntime().addShutdownHook(new Thread(() -> { +try { +System.err.println("Stopping"); +for (MyThd thread : threads) { +thread.halt = true; +} + +for (Thread thread : threads) { +System.err.println("Waiting for " + thread.getName()); +thread.join(); +} + +for (MyThd thread : threads) { +System.err.printf("%s : %d, Throughput: %,d \n", thread.getName(), thread.count, thread.throughput() ); +} +} catch (InterruptedException e) { +return; +} +})); + +} + +} + + + +abstract class MyThd extends Thread { +public long count=0; +public long runTime = 0; +public boolean halt = false; + +public MyThd(String thdName) { +super(thdName); +} + +public long throughput() { +return getCount() / (runTime / 1000); +} +public long getCount() { return count; } +} + +class Prod extends MyThd { +
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158009771 --- Diff: docs/Performance.md --- @@ -0,0 +1,128 @@ +--- +title: Performance Tuning +layout: documentation +documentation: true +--- + +Latency, throughput and CPU consumption are the three key dimensions involved in performance tuning. +In the following sections we discuss the settings that can used to tune along these dimension and understand the trade-offs. + +It is important to understand that these settings can vary depending on the topology, the type of hardware and the number of hosts used by the topology. + +## 1. Batch Size +Spouts and Bolts communicate with each other via concurrent message queues. The batch size determines the number of messages to be buffered before +the producer (spout/bolt) attempts to actually write to the downstream component's message queue. Inserting messages in batches to downstream +queues helps reduce the number of synchronization operations required for the inserts. Consequently this helps achieve higher throughput. However, +sometimes it may take a little time for the buffer to fill up, before it is flushed into the downstream queue. This implies that the buffered messages +will take longer to become visible to the downstream consumer who is waiting to process them. This can increase the average end-to-end latency for +these messages. The latency can get very bad if the batch sizes are large and the topology is not experiencing high traffic. + +`topology.producer.batch.size` : The batch size for writes into the receive queue of any spout/bolt is controlled via this setting. This setting +impacts the communication within a worker process. Each upstream producer maintains a separate batch to a component's receive queue. So if two spout +instances are writing to the same downstream bolt instance, each of the spout instances will have maintain a separate batch. + +`topology.transfer.batch.size` : Messages that are destined to a spout/bolt running on a different worker process, are sent to a queue called +the **Worker Transfer Queue**. The Worker Transfer Thread is responsible for draining the messages in this queue and send them to the appropriate +worker process over the network. This setting controls the batch size for writes into the Worker Transfer Queue. This impacts the communication +between worker processes. + + Guidance + +**For Low latency:** Set batch size to 1. This basically disables batching. This is likely to reduce peak sustainable throughput under heavy traffic, but +not likely to impact throughput much under low/medium traffic situations. +**For High throughput:** Set batch size > 1. Try values like 10, 100, 1000 or even higher and see what yields the best throughput for the topology. +Beyond a certain point the throughput is likely to get worse. +**Varying throughput:** Topologies often experience fluctuating amounts of incoming traffic over the day. Other topos may experience higher traffic in some +paths and lower throughput in other paths simultaneously. If latency is not a concern, a small bach size (e.g. 10) and in conjunction with the right flush +frequency may provide a reasonable compromise for such scenarios. For meeting stricter latency SLAs, consider setting it to 1. + + +## 2. Flush Tuple Frequency +In low/medium traffic situations or when batch size is too large, the batches may take too long to fill up and consequently the messages could take unacceptably +long time to become visible to downstream components. In such case, periodic flushing of batches is necessary to keep the messages moving and avoid compromising +latencies when batching is enabled. + +When batching has been enabled, special messages called *flush tuples* are inserted periodically into the receive queues of all spout and bolt instances. +This causes each spout/bolt instance to flush all its outstanding batches to their respective downstream components. + +`topology.flush.tuple.freq.millis` : This setting controls how often the flush tuples are generated. Flush tuples are not generated if this configuration is +set to 0 or if (`topology.producer.batch.size`=1 and `topology.transfer.batch.size`=1). + + + Guidance +Flushing interval can be used as tool to retain the higher throughput benefits of batching and avoid batched messages getting stuck for too long waiting for their. +batch to fill. Preferably this value should be larger than the average execute latencies of the bolts in the topology. Trying to flush the queues more frequently than +the amount of time it takes to produce the messages may hurt performance. Understanding the average execute latencies of each bolt w
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158034510 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java --- @@ -122,28 +130,33 @@ public Task(Executor executor, Integer taskId) throws IOException { return new ArrayList<>(0); } + public List getOutgoingTasks(String stream, List values) { if (debug) { LOG.info("Emitting Tuple: taskId={} componentId={} stream={} values={}", taskId, componentId, stream, values); } -List outTasks = new ArrayList<>(); -if (!streamComponentToGrouper.containsKey(stream)) { -throw new IllegalArgumentException("Unknown stream ID: " + stream); -} -if (null != streamComponentToGrouper.get(stream)) { -// null value for __system -for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) { +ArrayList outTasks = new ArrayList<>(); + +// TODO: PERF: expensive hashtable lookup in critical path +ArrayList groupers = streamToGroupers.get(stream); +if (null != groupers) { +for (int i=0; i compTasks = grouper.chooseTasks(taskId, values, loadMapping); -outTasks.addAll(compTasks); +outTasks.addAll(compTasks); // TODO: PERF: this is a perf hit --- End diff -- Same here: may be better to decide. IMHO I'm still not convinced that it can introduce performance hit. Something I can imagine are allocating backed array (only once in method call) and expanding array, but unless we use fan-out in huge size topology, outTasks is expected to be small. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158016550 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java --- @@ -356,9 +356,8 @@ public void handle(TaskInfo taskInfo, Collection dataPoints) { TopologyBuilder builder = new TopologyBuilder(); -int numEach = 4 * parallelism; +int numEach = parallelism; --- End diff -- Nice catch. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158035294 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java --- @@ -177,6 +196,35 @@ public BuiltinMetrics getBuiltInMetrics() { return builtInMetrics; } + +// Non Blocking call. If cannot emmit to destination immediately, such tuples will be added to `pendingEmits` argument --- End diff -- nit: `emmit` -> `emit` ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158041504 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java --- @@ -192,7 +240,7 @@ private TopologyContext mkTopologyContext(StormTopology topology) throws IOExcep ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())), ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()), taskId, -workerData.getPort(), workerData.getTaskIds(), +workerData.getPort(), workerData.getLocalTaskIds(), --- End diff -- Renamed method looks better to clarify the meaning. Nice improvement. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158039708 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java --- @@ -177,6 +196,35 @@ public BuiltinMetrics getBuiltInMetrics() { return builtInMetrics; } + +// Non Blocking call. If cannot emmit to destination immediately, such tuples will be added to `pendingEmits` argument +public void sendUnanchored(String stream, List values, ExecutorTransfer transfer, Queue pendingEmits) { +Tuple tuple = getTuple(stream, values); +List tasks = getOutgoingTasks(stream, values); +for (Integer t : tasks) { +AddressedTuple addressedTuple = new AddressedTuple(t, tuple); +transfer.tryTransfer(addressedTuple, pendingEmits); +} +} + +/** + * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api). + */ +public void sendToEventLogger(Executor executor, List values, --- End diff -- Will the method be called every emit when number of executors of event logger are more than 0? If then we may want to apply more optimizations here (not in this issue but other issue). ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158017461 --- Diff: examples/storm-perf/pom.xml --- @@ -81,7 +81,7 @@ maven-checkstyle-plugin -207 +407 --- End diff -- Kindly reminder, given that we are getting close. ---
[GitHub] storm issue #2445: STORM-2843: [Flux] properties file not found when loading...
Github user vesense commented on the issue: https://github.com/apache/storm/pull/2445 @HeartSaVioR PR updated. Could you take a look again? ---
[GitHub] storm issue #2469: STORM-2861: Explicit reference kafka-schema-registry-clie...
Github user vesense commented on the issue: https://github.com/apache/storm/pull/2469 @ptgoetz @satishd Yes, I know that kafka-avro-serializer depends on kafka-schema-registry-client and avro. There were io.confluent.kafka.schemaregistry.client.XXX not found errors when compiling the storm-hdfs code, and this PR fixed the issue. (To tell the truth, it's a bit strange.) Another benefit is that users can know clearly about which avro version they are using. By default the avro module is from hadoop-common dependencies. ---
[GitHub] storm issue #2433: [STORM-2693] Heartbeats and assignments promotion for sto...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2433 @danny0405 Btw, the build in local passed, but it incurs considerable addition in build time. ``` [INFO] [INFO] Reactor Summary: [INFO] [INFO] Storm .. SUCCESS [ 3.699 s] [INFO] Apache Storm - Checkstyle .. SUCCESS [ 0.708 s] [INFO] multilang-javascript ... SUCCESS [ 0.126 s] [INFO] multilang-python ... SUCCESS [ 0.265 s] [INFO] multilang-ruby . SUCCESS [ 0.093 s] [INFO] maven-shade-clojure-transformer SUCCESS [ 2.413 s] [INFO] storm-maven-plugins SUCCESS [ 2.638 s] [INFO] Storm Client ... SUCCESS [ 44.607 s] [INFO] storm-server ... SUCCESS [02:27 min] [INFO] storm-clojure .. SUCCESS [ 5.257 s] [INFO] Storm Core . SUCCESS [23:34 min] [INFO] Storm Webapp ... SUCCESS [ 11.411 s] [INFO] storm-clojure-test . SUCCESS [ 2.185 s] [INFO] storm-submit-tools . SUCCESS [ 8.076 s] [INFO] flux ... SUCCESS [ 0.093 s] [INFO] flux-wrappers .. SUCCESS [ 0.354 s] [INFO] storm-kafka SUCCESS [01:48 min] [INFO] storm-autocreds SUCCESS [ 3.273 s] [INFO] storm-hdfs . SUCCESS [ 39.636 s] [INFO] storm-hbase SUCCESS [ 4.100 s] [INFO] flux-core .. SUCCESS [ 4.035 s] [INFO] flux-examples .. SUCCESS [ 12.051 s] [INFO] storm-sql-runtime .. SUCCESS [ 2.391 s] [INFO] storm-sql-core . SUCCESS [01:47 min] [INFO] storm-sql-kafka SUCCESS [ 4.736 s] [INFO] storm-redis SUCCESS [ 5.799 s] [INFO] storm-sql-redis SUCCESS [ 8.231 s] [INFO] storm-mongodb .. SUCCESS [ 0.826 s] [INFO] storm-sql-mongodb .. SUCCESS [ 3.936 s] [INFO] storm-sql-hdfs . SUCCESS [ 12.313 s] [INFO] sql SUCCESS [ 0.150 s] [INFO] storm-hive . SUCCESS [ 18.669 s] [INFO] storm-jdbc . SUCCESS [ 1.777 s] [INFO] storm-eventhubs SUCCESS [ 3.725 s] [INFO] storm-elasticsearch SUCCESS [ 20.401 s] [INFO] storm-solr . SUCCESS [ 1.515 s] [INFO] storm-metrics .. SUCCESS [ 0.590 s] [INFO] storm-cassandra SUCCESS [01:01 min] [INFO] storm-mqtt . SUCCESS [ 22.396 s] [INFO] storm-kafka-client . SUCCESS [02:06 min] [INFO] storm-opentsdb . SUCCESS [ 0.664 s] [INFO] storm-kafka-monitor SUCCESS [ 2.430 s] [INFO] storm-kinesis .. SUCCESS [ 1.281 s] [INFO] storm-druid SUCCESS [ 1.958 s] [INFO] storm-jms .. SUCCESS [ 12.099 s] [INFO] storm-pmml . SUCCESS [ 0.267 s] [INFO] storm-rocketmq . SUCCESS [ 11.298 s] [INFO] blobstore-migrator . SUCCESS [ 12.028 s] [INFO] Storm Integration Test . SUCCESS [ 0.696 s] [INFO] storm-starter .. SUCCESS [ 19.945 s] [INFO] storm-loadgen .. SUCCESS [ 2.457 s] [INFO] storm-mongodb-examples . SUCCESS [ 0.771 s] [INFO] storm-redis-examples ... SUCCESS [ 1.096 s] [INFO] storm-opentsdb-examples SUCCESS [ 1.489 s] [INFO] storm-solr-examples SUCCES
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r157942171 --- Diff: external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrJsonMapper.java --- @@ -93,6 +93,10 @@ private SolrJsonMapper(Builder builder) { jsonUpdateUrl = builder.jsonUpdateUrl; } +@Override +public void configure() { --- End diff -- This may not be needed here once it is defined with default method in `SolrMapper`. ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r157944148 --- Diff: external/storm-solr/pom.xml --- @@ -46,19 +46,24 @@ org.apache.solr solr-solrj -5.2.1 -compile +5.5.5 --- End diff -- you may want to have property for org.apache.solr version and use the same in other solr dependencies in this module instead of declaring at each dependency usage. ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r158011321 --- Diff: external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilderV2.java --- @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.solr.schema.builder; + +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer; +import org.apache.solr.client.solrj.request.schema.FieldTypeDefinition; +import org.apache.solr.client.solrj.request.schema.SchemaRequest; +import org.apache.solr.client.solrj.response.schema.SchemaRepresentation; +import org.apache.solr.client.solrj.response.schema.SchemaResponse; +import org.apache.storm.solr.config.SolrConfig; +import org.apache.storm.solr.schema.CopyField; +import org.apache.storm.solr.schema.Field; +import org.apache.storm.solr.schema.FieldType; +import org.apache.storm.solr.schema.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Class that builds the {@link Schema} object from the schema returned by the SchemaRequest + */ +public class RestJsonSchemaBuilderV2 implements SchemaBuilder { +private static final Logger logger = LoggerFactory.getLogger(RestJsonSchemaBuilderV2.class); +private Schema schema = new Schema(); +private SolrConfig solrConfig; +private String collection; + +public RestJsonSchemaBuilderV2(SolrConfig solrConfig, String collection) throws IOException { +this.solrConfig = solrConfig; +this.collection = collection; +} + +@Override +public void buildSchema() throws IOException { +try { +if (solrConfig.enableKerberos()) +HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer()); + +SolrClient solrClient = new CloudSolrClient(solrConfig.getZkHostString()); +SchemaRequest schemaRequest = new SchemaRequest(); +logger.debug("Downloading schema for collection: {}", collection ); +SchemaResponse schemaResponse = schemaRequest.process(solrClient, collection); +logger.debug("SchemaResponse Schema: " + schemaResponse); --- End diff -- Better to have parameterized log statement. ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r157942068 --- Diff: external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java --- @@ -26,6 +26,7 @@ import java.util.List; public interface SolrMapper extends Serializable { +void configure(); --- End diff -- This can break any existing mappers users may have already created. Better to have default method to address that. ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r157941248 --- Diff: external/storm-solr/README.md --- @@ -97,6 +97,29 @@ field separates each value with the token % instead of the default | . To use th .setMultiValueFieldToken("%").build(); ``` +##Working with Kerberized Solr +If your topology is going to interact with kerberized Solr, your bolts/states needs to be authenticated by Solr Server. We can enable +authentication by distributing keytabs for solr user on all worker hosts. We can configure the solt bolt to use keytabs by setting --- End diff -- nit: configure the solr bolt ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r157943487 --- Diff: external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilderV2.java --- @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.solr.schema.builder; + +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer; +import org.apache.solr.client.solrj.request.schema.FieldTypeDefinition; +import org.apache.solr.client.solrj.request.schema.SchemaRequest; +import org.apache.solr.client.solrj.response.schema.SchemaRepresentation; +import org.apache.solr.client.solrj.response.schema.SchemaResponse; +import org.apache.storm.solr.config.SolrConfig; +import org.apache.storm.solr.schema.CopyField; +import org.apache.storm.solr.schema.Field; +import org.apache.storm.solr.schema.FieldType; +import org.apache.storm.solr.schema.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Class that builds the {@link Schema} object from the schema returned by the SchemaRequest + */ +public class RestJsonSchemaBuilderV2 implements SchemaBuilder { +private static final Logger logger = LoggerFactory.getLogger(RestJsonSchemaBuilderV2.class); +private Schema schema = new Schema(); +private SolrConfig solrConfig; +private String collection; + +public RestJsonSchemaBuilderV2(SolrConfig solrConfig, String collection) throws IOException { --- End diff -- This constructor does not really throw any IOException. ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r157941206 --- Diff: external/storm-solr/README.md --- @@ -97,6 +97,29 @@ field separates each value with the token % instead of the default | . To use th .setMultiValueFieldToken("%").build(); ``` +##Working with Kerberized Solr +If your topology is going to interact with kerberized Solr, your bolts/states needs to be authenticated by Solr Server. We can enable --- End diff -- nit: bolts/states need to be ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r157943956 --- Diff: external/storm-solr/pom.xml --- @@ -46,19 +46,24 @@ org.apache.solr solr-solrj -5.2.1 -compile +5.5.5 + + +org.apache.httpcomponents +httpclient --- End diff -- I do not see any httpclient dependency for solr-solrj but org.apache.httpcomponents:httpcore:4.4.1 and org.apache.httpcomponents:httpmime:4.4.1. Am I missing anything here? ---
[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2467#discussion_r157941502 --- Diff: external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java --- @@ -88,11 +92,14 @@ private int capacity() { @Override protected void process(Tuple tuple) { try { +LOG.debug("Processing Tuple: {}", tuple); SolrRequest request = solrMapper.toSolrRequest(tuple); solrClient.request(request, solrMapper.getCollection()); ack(tuple); +LOG.debug("Acked Tuple: {}", tuple); } catch (Exception e) { fail(tuple, e); +LOG.debug("Failed Tuple: {}", tuple, e); --- End diff -- nit: may want to push log statement to `fail` method. ---
[GitHub] storm pull request #2475: Put contextual information in the ThreadContext fo...
GitHub user hmcc opened a pull request: https://github.com/apache/storm/pull/2475 Put contextual information in the ThreadContext for multilang logging. Connected to https://issues.apache.org/jira/browse/STORM-2862; `master` version of apache/storm#2474. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hmcc/storm STORM-2862 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2475.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2475 commit 4cc61f4ccf6504e926a6dafaa30d18ba457b3408 Author: Heather McCartney Date: 2017-12-20T11:55:48Z Put contextual information in the ThreadContext for multilang logging. ---
[GitHub] storm issue #2433: [STORM-2693] Heartbeats and assignments promotion for sto...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2433 @danny0405 Please let me know when you are finished and the PR is ready to review again. Thanks a lot for the patience! ---
[GitHub] storm pull request #2474: STORM-2862: More flexible logging in multilang
GitHub user hmcc opened a pull request: https://github.com/apache/storm/pull/2474 STORM-2862: More flexible logging in multilang Connected to https://issues.apache.org/jira/browse/STORM-2862. Put contextual information in the ThreadContext for multilang logging. I've updated the Python and Ruby resources in multilang, but not the JS one. That's because the JS one is written to support variable arguments (probably because of [STORM-2435](https://issues.apache.org/jira/browse/STORM-2435)). I can't see a way to support optional IDs without breaking the current behaviour. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hmcc/storm STORM-2862-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2474.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2474 commit 92511b9ccf81c23476952aaf61e31556c41920b2 Author: Heather McCartney Date: 2017-12-20T10:14:53Z Put contextual information in the ThreadContext for multilang logging. ---