[jira] [Commented] (STORM-2006) Storm metrics feature improvement: support per-worker level metrics aggregation
[ https://issues.apache.org/jira/browse/STORM-2006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403301#comment-15403301 ] ASF GitHub Bot commented on STORM-2006: --- Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1595 OK. I break down my doc to two pieces and posted to wiki page. - https://cwiki.apache.org/confluence/display/STORM/Limitations+of+current+metrics+feature - https://cwiki.apache.org/confluence/display/STORM/Wishlist+for+new+metrics+feature Since I'm not working on new metrics feature, I didn't put them into proposal / design doc category. I think we will adopt or address many places including metrics while evaluating JStorm features from Phase 2, so would like to wait for that. > Storm metrics feature improvement: support per-worker level metrics > aggregation > --- > > Key: STORM-2006 > URL: https://issues.apache.org/jira/browse/STORM-2006 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Affects Versions: 1.1.0 >Reporter: Jungtaek Lim >Assignee: Jungtaek Lim > > Storm provides per-task level metrics which could be huge when topology has a > number of tasks. > Task level metric is useful for determining load balance between tasks, but > it doesn't need to be time-series fashion. > Before introducing topology level component like TopologyMaster for JStorm, > we can utilize SystemBolt to aggregate task level metrics to per-worker level > metrics. > We should provide options and this feature should be turned off by default to > keep backward compatibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm issue #1595: STORM-2006 Storm metrics feature improvement: support per...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1595 OK. I break down my doc to two pieces and posted to wiki page. - https://cwiki.apache.org/confluence/display/STORM/Limitations+of+current+metrics+feature - https://cwiki.apache.org/confluence/display/STORM/Wishlist+for+new+metrics+feature Since I'm not working on new metrics feature, I didn't put them into proposal / design doc category. I think we will adopt or address many places including metrics while evaluating JStorm features from Phase 2, so would like to wait for that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403274#comment-15403274 ] ASF GitHub Bot commented on STORM-1277: --- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73087127 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMappending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback () { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73087127 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMappending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback () { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +Long timeDelta = null; +if (tupleInfo.getTimestamp() != 0) { +timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); +} +failSpoutMsg(SpoutExecutor.this,
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403248#comment-15403248 ] ASF GitHub Bot commented on STORM-1277: --- Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73085409 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMappending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback () { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73085409 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMappending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback () { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +Long timeDelta = null; +if (tupleInfo.getTimestamp() != 0) { +timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); +} +failSpoutMsg(SpoutExecutor.this,
Re: [DISCUSSION] Move out porting tests to after JStorm merger (Phase 3 or even 2.x)
Yes they should be included. Maybe all clojure files in src directory should be ported as well, or be moved to test directory if it's OK to not exposed to users. 2016년 8월 2일 (화) 오전 1:12, Abhishek Agarwal님이 작성: > Tests themselves can wait but helper code (testing.clj and testing4j.clj) > is part of storm-core jar (both are assigned to me). That could be ported > at least. These classes depend on other daemons which are yet to be ported. > > On Mon, Aug 1, 2016 at 6:57 PM, Bobby Evans > wrote: > > > The plan was that we would all concentrate on the migration and knock it > > out quickly, but stuff happened and we were not able to concentrate on it > > as fully as I would have liked. I agree that the tests can wait. What > is > > more many of the tests because of local mode have to wait until all of > the > > daemons have been ported over. I really wish there were more hours in > the > > day for me to help out more on this. - Bobby > > > > On Sunday, July 31, 2016 6:51 PM, Jungtaek Lim > > wrote: > > > > > > Hi devs, > > > > Porting to Java seems to be longer than we expected, and now working on > > storm-core requires working for Java and Clojure, say twice. > > One of reason for port is "lowering learning curve to have more > > contributors", but now contributors need to know Clojure to contribute > > storm-core targeted for 1.x and "port" to master. It's even harder for me > > since I need some time to investigate how things are ported. > > > > Bobby suggested "feature freeze" for 1.x and move to 2.x quickly, but IMO > > it will work only when we have due date for 2.0.0 and we all concentrate > on > > this to release ASAP. > > (Sorry I had to break "feature freeze" since metrics improvements are > > needed within months.) > > Since we're individual and also several different teams, setting a due > date > > for this seems to be just ideal. Need to find other ways to make > transition > > faster. > > > > Looking into progress of porting to Java, I found that we have ported > many > > places on source so we only left some (still huge) daemons, but still > have > > lots of files on test. > > Given that ported files should pass Clojure tests, I think porting tests > is > > not urgent and we can move this out of phase 1. > > > > What do you think? > > > > Thanks, > > Jungtaek Lim (HeartSaVioR) > > > > ps. Just two cents, I even think it might be valid way to set milestone > for > > 2.0.0 to only phase 1 (with well tested). If Storm 2.0.0 is just a Java > > port of Storm 1.x, 1.x users can easily move to 2.0.0 and we can minimize > > supporting multi-versions with main language difference. Phase 2 and > other > > improvements can be on top of 2.0.0. (We may need to have 3.0.0 for that > > but I think version bump is not a big deal.) > > > > > > > > > > > > -- > Regards, > Abhishek Agarwal >
Re: [DISCUSSION] Move out porting tests to after JStorm merger (Phase 3 or even 2.x)
Yeah I think your plan would be perfect once we concentrate migration works. But things could be happen for all of us, and individual of us need to focus other thing at that time. In many chance it's not that we could control. Let's back to now and find the way to go forward with our plan for Storm 2.0.0. At least I'd like to see port to be finished ASAP, since it makes huge divergence between branches, and we could even lose the flow when it's getting longer. I'm still not familiar with Clojure so not easy to participate huge part of the work, but would like to help out porting smaller parts, and reviewing relatively bigger part. - Jungtaek Lim (HeartSaVioR) 2016년 8월 1일 (월) 오후 10:28, Bobby Evans님이 작성: > The plan was that we would all concentrate on the migration and knock it > out quickly, but stuff happened and we were not able to concentrate on it > as fully as I would have liked. I agree that the tests can wait. What is > more many of the tests because of local mode have to wait until all of the > daemons have been ported over. I really wish there were more hours in the > day for me to help out more on this. - Bobby > > On Sunday, July 31, 2016 6:51 PM, Jungtaek Lim > wrote: > > > Hi devs, > > Porting to Java seems to be longer than we expected, and now working on > storm-core requires working for Java and Clojure, say twice. > One of reason for port is "lowering learning curve to have more > contributors", but now contributors need to know Clojure to contribute > storm-core targeted for 1.x and "port" to master. It's even harder for me > since I need some time to investigate how things are ported. > > Bobby suggested "feature freeze" for 1.x and move to 2.x quickly, but IMO > it will work only when we have due date for 2.0.0 and we all concentrate on > this to release ASAP. > (Sorry I had to break "feature freeze" since metrics improvements are > needed within months.) > Since we're individual and also several different teams, setting a due date > for this seems to be just ideal. Need to find other ways to make transition > faster. > > Looking into progress of porting to Java, I found that we have ported many > places on source so we only left some (still huge) daemons, but still have > lots of files on test. > Given that ported files should pass Clojure tests, I think porting tests is > not urgent and we can move this out of phase 1. > > What do you think? > > Thanks, > Jungtaek Lim (HeartSaVioR) > > ps. Just two cents, I even think it might be valid way to set milestone for > 2.0.0 to only phase 1 (with well tested). If Storm 2.0.0 is just a Java > port of Storm 1.x, 1.x users can easily move to 2.0.0 and we can minimize > supporting multi-versions with main language difference. Phase 2 and other > improvements can be on top of 2.0.0. (We may need to have 3.0.0 for that > but I think version bump is not a big deal.) > > >
[PROPOSAL] submitting topology with adding jars and maven artifacts
Hi dev, This is proposal review thread for submitting topology with adding jars and maven artifacts. This is also following up discussion thread for [DISCUSSION] Policy of resolving dependencies for non storm-core modules.[1] I've written design doc which also describes motivation on this. https://cwiki.apache.org/confluence/display/STORM/A.+Design+doc%3A+adding+jars+and+maven+artifacts+at+submission Please review this and comment to "this thread" instead of wiki page so that all devs can be notified for the update. Thanks, Jungtaek Lim (HeartSaVioR) [1] http://mail-archives.apache.org/mod_mbox/storm-dev/201607.mbox/%3CCAF5108jByyJLTKrV_P4fS=dj8rsr_o5oubzqbviscggsc1c...@mail.gmail.com%3E
Re: [DISCUSSION] Policy of resolving dependencies for non storm-core modules
Thanks Taylor! Let me open a new thread for proposal and continue discussion from there. Jungtaek Lim (HeartSaVioR) 2016년 8월 2일 (화) 오전 12:10, P. Taylor Goetz님이 작성: > Jungtaek, I’ve given you permissions to the Storm wiki so you can post the > document there. > > -Taylor > > > On Aug 1, 2016, at 12:46 AM, Jungtaek Lim wrote: > > > > Not yet. It would be good to continue discussion on this thread for now. > > > > Thanks, > > Jungtaek Lim (HeartSaVioR) > > > > 2016년 8월 1일 (월) 오후 1:38, Satish Duggana 님이 작성: > > > >> Hi, > >> Did we find a way to comment on the proposal? How can we proceed with > this > >> discussion? > >> > >> Thanks, > >> Satish. > >> > >> > >> On Thu, Jul 28, 2016 at 9:42 AM, Jungtaek Lim > wrote: > >> > >>> I've made a design document for adding jars and maven artifacts at > >>> submission. > >>> Since we don't have any formats for this, I just borrowed KIP format. > >>> That's not we would like to adopt that format, it's just me. > >>> > >>> > >>> > >> > https://docs.google.com/document/d/1f3Ed0Wa8aN2j7gtptT0BOKYMyBdpohLgERg4YebJJ7c/edit?usp=sharing > >>> > >>> Btw, I guess ASF policy requires that discussion history should be > logged > >>> to Apache Infra. > >>> (So I only grant view permission who opens the doc via link.) > >>> > >>> So what's recommended way to do? Would we want to file an issue for > JIRA > >>> with attaching design doc to PDF and discuss there? > >>> > >>> - Jungtaek Lim > >>> > >>> 2016년 7월 23일 (토) 오전 12:06, Jungtaek Lim 님이 작성: > >>> > Thought about it once more, and found that former approach still needs > >> to > add 'provided' scope libraries to extlib directory.. > > Along with thinking former approach, I've experimented a bit of latter > approach by creating POC project. Since I don't know about copy and > use > >>> ASF > project codes for personal use, I'd just share main class to see if > >> this > POC and my theory makes sense for us. > https://gist.github.com/HeartSaVioR/3639a9ee829fe1203b4a085a0fb069d6 > 'zeppelin' package has some classes for transitive dependency resolver > (copied from Zeppelin), and 'spark' package has some classes for > classloader (yes also copied from Spark). > > Please share your experiences if you have knowledges regarding this > >> area. > > - Jungtaek Lim > > 2016년 7월 22일 (금) 오후 10:59, Bobby Evans >님이 > >>> 작성: > > > You can do that with a combination of the distributed cache and > >> setting > > the classpath, just like with hadoop. It is not as clean as it could > >>> be, > > but it does work. - Bobby > > > >On Thursday, July 21, 2016 11:09 PM, Arun Mahadevan < > >>> ar...@apache.org> > > wrote: > > > > > > Shade and relocate the external modules sounds ok as a short term > > solution. > > > > For the long term we should consider something like the second option > >> to > > add external modules without shipping uber jars. > > > > Thanks, > > Arun > > > > On 7/22/16, 6:07 AM, "Jungtaek Lim" wrote: > > > >> Hi devs, > >> > >> AFAIK, we had been struggled to resolve dependency issues for > >>> storm-core. > >> As we all know, the strategy we have been using is shade & > >> relocating. > >> > >> Now State and Storm SQL requires that some of external modules need > >> to > >>> be > >> included to extlib, which is the classpath workers refer. > >> > >> http://issues.apache.org/jira/browse/STORM-1881 > >> https://issues.apache.org/jira/browse/STORM-1435 > >> > >> There're two issues here: > >> - We don't make uber jar for external modules so users need to find > >> and > >> copy dependencies jars to extlib manually. > >> - External modules also use Guava and Jackson and so on which are > >>> origin > > of > >> version conflict issues. > >> > >> So we should apply the shade & relocating strategy for every > external > >> modules (at least storm-redis, storm-kafka, storm-sql-core, > >> storm-sql-kafka), or introduce the way to add the dependency without > > adding > >> them to extlib. (like --packages and --jar for Spark) > >> > >> Please express your opinions about this. > >> > >> Thanks, > >> Jungtaek Lim (HeartSaVioR) > > > > > > > > > > > > > >>> > >> > >
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403098#comment-15403098 ] ASF GitHub Bot commented on STORM-1277: --- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73077004 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMappending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback () { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73077004 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMappending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback () { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +Long timeDelta = null; +if (tupleInfo.getTimestamp() != 0) { +timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); +} +failSpoutMsg(SpoutExecutor.this,
[jira] [Commented] (STORM-1839) Kinesis Spout
[ https://issues.apache.org/jira/browse/STORM-1839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402850#comment-15402850 ] ASF GitHub Bot commented on STORM-1839: --- Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r7301 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java --- @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.storm.spout.SpoutOutputCollector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +class KinesisRecordsManager { +private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class); +// object handling zk interaction +private transient ZKConnection zkConnection; +// object handling interaction with kinesis +private transient KinesisConnection kinesisConnection; +// Kinesis Spout KinesisConfig object +private transient final KinesisConfig kinesisConfig; +// Queue of records per shard fetched from kinesis and are waiting to be emitted +private transient MaptoEmitPerShard = new HashMap<>(); +// Map of records that were fetched from kinesis as a result of failure and are waiting to be emitted +private transient Map failedandFetchedRecords = new HashMap<>(); +// Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the +// sequence number to commit. Logic explained in commit +private transient Map emittedPerShard = new HashMap<>(); +// sorted acked sequence numbers - needed to figure out what sequence number can be committed +private transient Map ackedPerShard = new HashMap<>(); +// sorted failed sequence numbers - needed to figure out what sequence number can be committed +private transient Map failedPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for new messages +private transient Map shardIteratorPerShard = new HashMap<>(); +// last fetched sequence number corresponding to position in shard +private transient Map fetchedSequenceNumberPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for failed messages +private transient Map shardIteratorPerFailedMessage = new HashMap<>(); +// timestamp to decide when to commit to zk again +private transient long lastCommitTime; +// boolean to track deactivated state +private transient boolean deactivated; + +KinesisRecordsManager (KinesisConfig kinesisConfig) { +this.kinesisConfig = kinesisConfig; +this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo()); +this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo()); +} + +void initialize (int myTaskIndex, int totalTasks) { +deactivated = false; +lastCommitTime = System.currentTimeMillis(); +
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r7301 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java --- @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.storm.spout.SpoutOutputCollector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +class KinesisRecordsManager { +private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class); +// object handling zk interaction +private transient ZKConnection zkConnection; +// object handling interaction with kinesis +private transient KinesisConnection kinesisConnection; +// Kinesis Spout KinesisConfig object +private transient final KinesisConfig kinesisConfig; +// Queue of records per shard fetched from kinesis and are waiting to be emitted +private transient MaptoEmitPerShard = new HashMap<>(); +// Map of records that were fetched from kinesis as a result of failure and are waiting to be emitted +private transient Map failedandFetchedRecords = new HashMap<>(); +// Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the +// sequence number to commit. Logic explained in commit +private transient Map emittedPerShard = new HashMap<>(); +// sorted acked sequence numbers - needed to figure out what sequence number can be committed +private transient Map ackedPerShard = new HashMap<>(); +// sorted failed sequence numbers - needed to figure out what sequence number can be committed +private transient Map failedPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for new messages +private transient Map shardIteratorPerShard = new HashMap<>(); +// last fetched sequence number corresponding to position in shard +private transient Map fetchedSequenceNumberPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for failed messages +private transient Map shardIteratorPerFailedMessage = new HashMap<>(); +// timestamp to decide when to commit to zk again +private transient long lastCommitTime; +// boolean to track deactivated state +private transient boolean deactivated; + +KinesisRecordsManager (KinesisConfig kinesisConfig) { +this.kinesisConfig = kinesisConfig; +this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo()); +this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo()); +} + +void initialize (int myTaskIndex, int totalTasks) { +deactivated = false; +lastCommitTime = System.currentTimeMillis(); +kinesisConnection.initialize(); +zkConnection.initialize(); +List shards = kinesisConnection.getShardsForStream(kinesisConfig.getStreamName()); +LOG.info("myTaskIndex is " + myTaskIndex); +LOG.info("totalTasks is
[jira] [Commented] (STORM-1839) Kinesis Spout
[ https://issues.apache.org/jira/browse/STORM-1839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402817#comment-15402817 ] ASF GitHub Bot commented on STORM-1839: --- Github user harshach commented on the issue: https://github.com/apache/storm/pull/1586 +1. Build failures not related to the patch. > Kinesis Spout > - > > Key: STORM-1839 > URL: https://issues.apache.org/jira/browse/STORM-1839 > Project: Apache Storm > Issue Type: Improvement >Reporter: Sriharsha Chintalapani >Assignee: Priyank Shah > > As Storm is increasingly used in Cloud environments. It will great to have a > Kinesis Spout integration in Apache Storm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm issue #1586: STORM-1839: Storm spout implementation for Amazon Kinesis...
Github user harshach commented on the issue: https://github.com/apache/storm/pull/1586 +1. Build failures not related to the patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1602: Fix command to run RollingTopWords example
GitHub user Zintinio opened a pull request: https://github.com/apache/storm/pull/1602 Fix command to run RollingTopWords example Appended target directory to command. Previously resulted in ClassNotFound exception. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Zintinio/storm master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1602.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1602 commit cc6deef506c6575cbaa3c2ef0728c8151018d9e8 Author: Arthur MaciejewiczDate: 2016-08-01T20:50:48Z Fix command to run RollingTopWords example Appended target directory to command. Previously resulted in ClassNotFound exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (STORM-1839) Kinesis Spout
[ https://issues.apache.org/jira/browse/STORM-1839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402724#comment-15402724 ] ASF GitHub Bot commented on STORM-1839: --- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r73043179 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java --- @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.storm.spout.SpoutOutputCollector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +class KinesisRecordsManager { +private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class); +// object handling zk interaction +private transient ZKConnection zkConnection; +// object handling interaction with kinesis +private transient KinesisConnection kinesisConnection; +// Kinesis Spout KinesisConfig object +private transient final KinesisConfig kinesisConfig; +// Queue of records per shard fetched from kinesis and are waiting to be emitted +private transient MaptoEmitPerShard = new HashMap<>(); +// Map of records that were fetched from kinesis as a result of failure and are waiting to be emitted +private transient Map failedandFetchedRecords = new HashMap<>(); +// Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the +// sequence number to commit. Logic explained in commit +private transient Map emittedPerShard = new HashMap<>(); +// sorted acked sequence numbers - needed to figure out what sequence number can be committed +private transient Map ackedPerShard = new HashMap<>(); +// sorted failed sequence numbers - needed to figure out what sequence number can be committed +private transient Map failedPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for new messages +private transient Map shardIteratorPerShard = new HashMap<>(); +// last fetched sequence number corresponding to position in shard +private transient Map fetchedSequenceNumberPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for failed messages +private transient Map shardIteratorPerFailedMessage = new HashMap<>(); +// timestamp to decide when to commit to zk again +private transient long lastCommitTime; +// boolean to track deactivated state +private transient boolean deactivated; + +KinesisRecordsManager (KinesisConfig kinesisConfig) { +this.kinesisConfig = kinesisConfig; +this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo()); +this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo()); +} + +void initialize (int myTaskIndex, int totalTasks) { +deactivated = false; +lastCommitTime = System.currentTimeMillis(); +
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r73043179 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java --- @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.storm.spout.SpoutOutputCollector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +class KinesisRecordsManager { +private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class); +// object handling zk interaction +private transient ZKConnection zkConnection; +// object handling interaction with kinesis +private transient KinesisConnection kinesisConnection; +// Kinesis Spout KinesisConfig object +private transient final KinesisConfig kinesisConfig; +// Queue of records per shard fetched from kinesis and are waiting to be emitted +private transient MaptoEmitPerShard = new HashMap<>(); +// Map of records that were fetched from kinesis as a result of failure and are waiting to be emitted +private transient Map failedandFetchedRecords = new HashMap<>(); +// Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the +// sequence number to commit. Logic explained in commit +private transient Map emittedPerShard = new HashMap<>(); +// sorted acked sequence numbers - needed to figure out what sequence number can be committed +private transient Map ackedPerShard = new HashMap<>(); +// sorted failed sequence numbers - needed to figure out what sequence number can be committed +private transient Map failedPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for new messages +private transient Map shardIteratorPerShard = new HashMap<>(); +// last fetched sequence number corresponding to position in shard +private transient Map fetchedSequenceNumberPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for failed messages +private transient Map shardIteratorPerFailedMessage = new HashMap<>(); +// timestamp to decide when to commit to zk again +private transient long lastCommitTime; +// boolean to track deactivated state +private transient boolean deactivated; + +KinesisRecordsManager (KinesisConfig kinesisConfig) { +this.kinesisConfig = kinesisConfig; +this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo()); +this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo()); +} + +void initialize (int myTaskIndex, int totalTasks) { +deactivated = false; +lastCommitTime = System.currentTimeMillis(); +kinesisConnection.initialize(); +zkConnection.initialize(); +List shards = kinesisConnection.getShardsForStream(kinesisConfig.getStreamName()); +LOG.info("myTaskIndex is " + myTaskIndex); +LOG.info("totalTasks is "
[jira] [Commented] (STORM-1839) Kinesis Spout
[ https://issues.apache.org/jira/browse/STORM-1839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402668#comment-15402668 ] ASF GitHub Bot commented on STORM-1839: --- Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1586 @harshach I added some more comments in ack method and commit method for acking logic. I also moved code interacting with zk and kinesis in to its own class to make KinesisRecordsManager smaller. Please review when you get a chance > Kinesis Spout > - > > Key: STORM-1839 > URL: https://issues.apache.org/jira/browse/STORM-1839 > Project: Apache Storm > Issue Type: Improvement >Reporter: Sriharsha Chintalapani >Assignee: Priyank Shah > > As Storm is increasingly used in Cloud environments. It will great to have a > Kinesis Spout integration in Apache Storm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm issue #1586: STORM-1839: Storm spout implementation for Amazon Kinesis...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1586 @harshach I added some more comments in ack method and commit method for acking logic. I also moved code interacting with zk and kinesis in to its own class to make KinesisRecordsManager smaller. Please review when you get a chance --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [DISCUSSION] Move out porting tests to after JStorm merger (Phase 3 or even 2.x)
Tests themselves can wait but helper code (testing.clj and testing4j.clj) is part of storm-core jar (both are assigned to me). That could be ported at least. These classes depend on other daemons which are yet to be ported. On Mon, Aug 1, 2016 at 6:57 PM, Bobby Evanswrote: > The plan was that we would all concentrate on the migration and knock it > out quickly, but stuff happened and we were not able to concentrate on it > as fully as I would have liked. I agree that the tests can wait. What is > more many of the tests because of local mode have to wait until all of the > daemons have been ported over. I really wish there were more hours in the > day for me to help out more on this. - Bobby > > On Sunday, July 31, 2016 6:51 PM, Jungtaek Lim > wrote: > > > Hi devs, > > Porting to Java seems to be longer than we expected, and now working on > storm-core requires working for Java and Clojure, say twice. > One of reason for port is "lowering learning curve to have more > contributors", but now contributors need to know Clojure to contribute > storm-core targeted for 1.x and "port" to master. It's even harder for me > since I need some time to investigate how things are ported. > > Bobby suggested "feature freeze" for 1.x and move to 2.x quickly, but IMO > it will work only when we have due date for 2.0.0 and we all concentrate on > this to release ASAP. > (Sorry I had to break "feature freeze" since metrics improvements are > needed within months.) > Since we're individual and also several different teams, setting a due date > for this seems to be just ideal. Need to find other ways to make transition > faster. > > Looking into progress of porting to Java, I found that we have ported many > places on source so we only left some (still huge) daemons, but still have > lots of files on test. > Given that ported files should pass Clojure tests, I think porting tests is > not urgent and we can move this out of phase 1. > > What do you think? > > Thanks, > Jungtaek Lim (HeartSaVioR) > > ps. Just two cents, I even think it might be valid way to set milestone for > 2.0.0 to only phase 1 (with well tested). If Storm 2.0.0 is just a Java > port of Storm 1.x, 1.x users can easily move to 2.0.0 and we can minimize > supporting multi-versions with main language difference. Phase 2 and other > improvements can be on top of 2.0.0. (We may need to have 3.0.0 for that > but I think version bump is not a big deal.) > > > > -- Regards, Abhishek Agarwal
Re: [DISCUSSION] Policy of resolving dependencies for non storm-core modules
Jungtaek, I’ve given you permissions to the Storm wiki so you can post the document there. -Taylor > On Aug 1, 2016, at 12:46 AM, Jungtaek Limwrote: > > Not yet. It would be good to continue discussion on this thread for now. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > 2016년 8월 1일 (월) 오후 1:38, Satish Duggana 님이 작성: > >> Hi, >> Did we find a way to comment on the proposal? How can we proceed with this >> discussion? >> >> Thanks, >> Satish. >> >> >> On Thu, Jul 28, 2016 at 9:42 AM, Jungtaek Lim wrote: >> >>> I've made a design document for adding jars and maven artifacts at >>> submission. >>> Since we don't have any formats for this, I just borrowed KIP format. >>> That's not we would like to adopt that format, it's just me. >>> >>> >>> >> https://docs.google.com/document/d/1f3Ed0Wa8aN2j7gtptT0BOKYMyBdpohLgERg4YebJJ7c/edit?usp=sharing >>> >>> Btw, I guess ASF policy requires that discussion history should be logged >>> to Apache Infra. >>> (So I only grant view permission who opens the doc via link.) >>> >>> So what's recommended way to do? Would we want to file an issue for JIRA >>> with attaching design doc to PDF and discuss there? >>> >>> - Jungtaek Lim >>> >>> 2016년 7월 23일 (토) 오전 12:06, Jungtaek Lim 님이 작성: >>> Thought about it once more, and found that former approach still needs >> to add 'provided' scope libraries to extlib directory.. Along with thinking former approach, I've experimented a bit of latter approach by creating POC project. Since I don't know about copy and use >>> ASF project codes for personal use, I'd just share main class to see if >> this POC and my theory makes sense for us. https://gist.github.com/HeartSaVioR/3639a9ee829fe1203b4a085a0fb069d6 'zeppelin' package has some classes for transitive dependency resolver (copied from Zeppelin), and 'spark' package has some classes for classloader (yes also copied from Spark). Please share your experiences if you have knowledges regarding this >> area. - Jungtaek Lim 2016년 7월 22일 (금) 오후 10:59, Bobby Evans 님이 >>> 작성: > You can do that with a combination of the distributed cache and >> setting > the classpath, just like with hadoop. It is not as clean as it could >>> be, > but it does work. - Bobby > >On Thursday, July 21, 2016 11:09 PM, Arun Mahadevan < >>> ar...@apache.org> > wrote: > > > Shade and relocate the external modules sounds ok as a short term > solution. > > For the long term we should consider something like the second option >> to > add external modules without shipping uber jars. > > Thanks, > Arun > > On 7/22/16, 6:07 AM, "Jungtaek Lim" wrote: > >> Hi devs, >> >> AFAIK, we had been struggled to resolve dependency issues for >>> storm-core. >> As we all know, the strategy we have been using is shade & >> relocating. >> >> Now State and Storm SQL requires that some of external modules need >> to >>> be >> included to extlib, which is the classpath workers refer. >> >> http://issues.apache.org/jira/browse/STORM-1881 >> https://issues.apache.org/jira/browse/STORM-1435 >> >> There're two issues here: >> - We don't make uber jar for external modules so users need to find >> and >> copy dependencies jars to extlib manually. >> - External modules also use Guava and Jackson and so on which are >>> origin > of >> version conflict issues. >> >> So we should apply the shade & relocating strategy for every external >> modules (at least storm-redis, storm-kafka, storm-sql-core, >> storm-sql-kafka), or introduce the way to add the dependency without > adding >> them to extlib. (like --packages and --jar for Spark) >> >> Please express your opinions about this. >> >> Thanks, >> Jungtaek Lim (HeartSaVioR) > > > > > >>> >> signature.asc Description: Message signed with OpenPGP using GPGMail
[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1445 I've done reviewing with first pass. Great work. This is rather huge diff so the more reviewers the better reviewing. Please go on reviewing everybody. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402176#comment-15402176 ] ASF GitHub Bot commented on STORM-1277: --- Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1445 I've done reviewing with first pass. Great work. This is rather huge diff so the more reviewers the better reviewing. Please go on reviewing everybody. > port backtype.storm.daemon.executor to java > --- > > Key: STORM-1277 > URL: https://issues.apache.org/jira/browse/STORM-1277 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Robert Joseph Evans >Assignee: Cody > Labels: java-migration, jstorm-merger > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task > kind of. Tasks and executors are combined in jstorm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72987752 --- Diff: storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java --- @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.bolt; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.Task; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BoltOutputCollectorImpl implements IOutputCollector { + +private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class); + +private final BoltExecutor executor; +private final Task taskData; +private final int taskId; +private final Random random; +private final boolean isEventLoggers; +private final boolean isDebug; + +public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random, + boolean isEventLoggers, boolean isDebug) { +this.executor = executor; +this.taskData = taskData; +this.taskId = taskId; +this.random = random; +this.isEventLoggers = isEventLoggers; +this.isDebug = isDebug; +} + +public List emit(String streamId, Collection anchors, List tuple) { +return boltEmit(streamId, anchors, tuple, null); +} + +@Override +public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { +boltEmit(streamId, anchors, tuple, taskId); +} + +private List boltEmit(String streamId, Collection anchors, List values, Integer targetTaskId) { +List outTasks; +if (targetTaskId != null) { +outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values); +} else { +outTasks = taskData.getOutgoingTasks(streamId, values); +} + +for (Integer t : outTasks) { +MapanchorsToIds = new HashMap<>(); +if (anchors != null) { +for (Tuple a : anchors) { +long edgeId = MessageId.generateId(random); +((TupleImpl) a).updateAckVal(edgeId); +for (Long root_id : a.getMessageId().getAnchorsToIds().keySet()) { +putXor(anchorsToIds, root_id, edgeId); +} +} +} +MessageId msgId = MessageId.makeId(anchorsToIds); +TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId); +executor.getExecutorTransfer().transfer(t, tupleExt); +} +if (isEventLoggers) { +executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random); +} +return outTasks; --- End diff -- `(or out-tasks [])` seems to guarantee at least empty list. Does this guarantee the same? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402127#comment-15402127 ] ASF GitHub Bot commented on STORM-1277: --- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72987752 --- Diff: storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java --- @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.bolt; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.Task; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BoltOutputCollectorImpl implements IOutputCollector { + +private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class); + +private final BoltExecutor executor; +private final Task taskData; +private final int taskId; +private final Random random; +private final boolean isEventLoggers; +private final boolean isDebug; + +public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random, + boolean isEventLoggers, boolean isDebug) { +this.executor = executor; +this.taskData = taskData; +this.taskId = taskId; +this.random = random; +this.isEventLoggers = isEventLoggers; +this.isDebug = isDebug; +} + +public List emit(String streamId, Collection anchors, List tuple) { +return boltEmit(streamId, anchors, tuple, null); +} + +@Override +public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { +boltEmit(streamId, anchors, tuple, taskId); +} + +private List boltEmit(String streamId, Collection anchors, List values, Integer targetTaskId) { +List outTasks; +if (targetTaskId != null) { +outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values); +} else { +outTasks = taskData.getOutgoingTasks(streamId, values); +} + +for (Integer t : outTasks) { +MapanchorsToIds = new HashMap<>(); +if (anchors != null) { +for (Tuple a : anchors) { +long edgeId = MessageId.generateId(random); +((TupleImpl) a).updateAckVal(edgeId); +for (Long root_id : a.getMessageId().getAnchorsToIds().keySet()) { +putXor(anchorsToIds, root_id, edgeId); +} +} +} +MessageId msgId = MessageId.makeId(anchorsToIds); +TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId); +executor.getExecutorTransfer().transfer(t, tupleExt); +} +if (isEventLoggers) { +executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random); +} +return outTasks; --- End diff -- `(or out-tasks [])` seems to guarantee at least empty list. Does this guarantee the same? >
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402121#comment-15402121 ] ASF GitHub Bot commented on STORM-1277: --- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72987136 --- Diff: storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java --- @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.bolt; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.Task; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BoltOutputCollectorImpl implements IOutputCollector { + +private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class); + +private final BoltExecutor executor; +private final Task taskData; +private final int taskId; +private final Random random; +private final boolean isEventLoggers; +private final boolean isDebug; + +public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random, + boolean isEventLoggers, boolean isDebug) { +this.executor = executor; +this.taskData = taskData; +this.taskId = taskId; +this.random = random; +this.isEventLoggers = isEventLoggers; +this.isDebug = isDebug; +} + +public List emit(String streamId, Collection anchors, List tuple) { +return boltEmit(streamId, anchors, tuple, null); +} + +@Override +public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { +boltEmit(streamId, anchors, tuple, taskId); +} + +private List boltEmit(String streamId, Collection anchors, List values, Integer targetTaskId) { +List outTasks; +if (targetTaskId != null) { +outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values); +} else { +outTasks = taskData.getOutgoingTasks(streamId, values); +} + +for (Integer t : outTasks) { +MapanchorsToIds = new HashMap<>(); +if (anchors != null) { +for (Tuple a : anchors) { +long edgeId = MessageId.generateId(random); --- End diff -- ``` (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)] - (when (pos? (count root-ids)) - (let [edge-id (MessageId/generateId rand)] - (.updateAckVal a edge-id) - (fast-list-iter [root-id root-ids] - (put-xor! anchors-to-ids root-id edge-id)) ``` if root-ids.size() is 0, ported code behaves differ. It should check
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72987136 --- Diff: storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java --- @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.bolt; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.Task; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BoltOutputCollectorImpl implements IOutputCollector { + +private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class); + +private final BoltExecutor executor; +private final Task taskData; +private final int taskId; +private final Random random; +private final boolean isEventLoggers; +private final boolean isDebug; + +public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random, + boolean isEventLoggers, boolean isDebug) { +this.executor = executor; +this.taskData = taskData; +this.taskId = taskId; +this.random = random; +this.isEventLoggers = isEventLoggers; +this.isDebug = isDebug; +} + +public List emit(String streamId, Collection anchors, List tuple) { +return boltEmit(streamId, anchors, tuple, null); +} + +@Override +public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { +boltEmit(streamId, anchors, tuple, taskId); +} + +private List boltEmit(String streamId, Collection anchors, List values, Integer targetTaskId) { +List outTasks; +if (targetTaskId != null) { +outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values); +} else { +outTasks = taskData.getOutgoingTasks(streamId, values); +} + +for (Integer t : outTasks) { +MapanchorsToIds = new HashMap<>(); +if (anchors != null) { +for (Tuple a : anchors) { +long edgeId = MessageId.generateId(random); --- End diff -- ``` (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)] - (when (pos? (count root-ids)) - (let [edge-id (MessageId/generateId rand)] - (.updateAckVal a edge-id) - (fast-list-iter [root-id root-ids] - (put-xor! anchors-to-ids root-id edge-id)) ``` if root-ids.size() is 0, ported code behaves differ. It should check size first, and skip all if size is 0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402096#comment-15402096 ] ASF GitHub Bot commented on STORM-1277: --- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72983906 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMappending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback () { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72983906 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMappending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback () { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +Long timeDelta = null; +if (tupleInfo.getTimestamp() != 0) { +timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); +} +failSpoutMsg(SpoutExecutor.this,
[jira] [Created] (STORM-2011) Add JSON Serialization for Java Objects
Ellison Anne Williams created STORM-2011: Summary: Add JSON Serialization for Java Objects Key: STORM-2011 URL: https://issues.apache.org/jira/browse/STORM-2011 Project: Apache Storm Issue Type: Bug Reporter: Ellison Anne Williams Fill in the JsonSerializer class - perhaps use gson... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (STORM-2011) Add JSON Serialization for Java Objects
[ https://issues.apache.org/jira/browse/STORM-2011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ellison Anne Williams closed STORM-2011. Resolution: Fixed Accidentally added this in the wrong JIRA project... Sorry! > Add JSON Serialization for Java Objects > > > Key: STORM-2011 > URL: https://issues.apache.org/jira/browse/STORM-2011 > Project: Apache Storm > Issue Type: Bug >Reporter: Ellison Anne Williams > > Fill in the JsonSerializer class - perhaps use gson... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] Move out porting tests to after JStorm merger (Phase 3 or even 2.x)
The plan was that we would all concentrate on the migration and knock it out quickly, but stuff happened and we were not able to concentrate on it as fully as I would have liked. I agree that the tests can wait. What is more many of the tests because of local mode have to wait until all of the daemons have been ported over. I really wish there were more hours in the day for me to help out more on this. - Bobby On Sunday, July 31, 2016 6:51 PM, Jungtaek Limwrote: Hi devs, Porting to Java seems to be longer than we expected, and now working on storm-core requires working for Java and Clojure, say twice. One of reason for port is "lowering learning curve to have more contributors", but now contributors need to know Clojure to contribute storm-core targeted for 1.x and "port" to master. It's even harder for me since I need some time to investigate how things are ported. Bobby suggested "feature freeze" for 1.x and move to 2.x quickly, but IMO it will work only when we have due date for 2.0.0 and we all concentrate on this to release ASAP. (Sorry I had to break "feature freeze" since metrics improvements are needed within months.) Since we're individual and also several different teams, setting a due date for this seems to be just ideal. Need to find other ways to make transition faster. Looking into progress of porting to Java, I found that we have ported many places on source so we only left some (still huge) daemons, but still have lots of files on test. Given that ported files should pass Clojure tests, I think porting tests is not urgent and we can move this out of phase 1. What do you think? Thanks, Jungtaek Lim (HeartSaVioR) ps. Just two cents, I even think it might be valid way to set milestone for 2.0.0 to only phase 1 (with well tested). If Storm 2.0.0 is just a Java port of Storm 1.x, 1.x users can easily move to 2.0.0 and we can minimize supporting multi-versions with main language difference. Phase 2 and other improvements can be on top of 2.0.0. (We may need to have 3.0.0 for that but I think version bump is not a big deal.)
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401872#comment-15401872 ] ASF GitHub Bot commented on STORM-1277: --- Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72961638 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMappending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback () { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72961638 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMappending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback () { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +Long timeDelta = null; +if (tupleInfo.getTimestamp() != 0) { +timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); +} +failSpoutMsg(SpoutExecutor.this,
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401868#comment-15401868 ] ASF GitHub Bot commented on STORM-1277: --- Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72961410 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + +protected final Map workerData; +protected final WorkerTopologyContext workerTopologyContext; +protected final List executorId; +protected final List taskIds; +protected final String componentId; +protected final AtomicBoolean openOrPrepareWasCalled; +protected final Map stormConf; +protected final Map conf; +protected final String stormId; +protected final HashMap sharedExecutorData; +protected final AtomicBoolean stormActive; +protected final AtomicReference
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72961410 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + +protected final Map workerData; +protected final WorkerTopologyContext workerTopologyContext; +protected final List executorId; +protected final List taskIds; +protected final String componentId; +protected final AtomicBoolean openOrPrepareWasCalled; +protected final Map stormConf; +protected final Map conf; +protected final String stormId; +protected final HashMap sharedExecutorData; +protected final AtomicBoolean stormActive; +protected final AtomicReference
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401651#comment-15401651 ] ASF GitHub Bot commented on STORM-1277: --- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72934070 --- Diff: storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java --- @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.bolt; + +import clojure.lang.Atom; +import com.google.common.collect.ImmutableMap; +import java.util.List; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.executor.Executor; +import org.apache.storm.hooks.info.BoltExecuteInfo; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.task.IBolt; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.Callable; + +public class BoltExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class); + +private final Callable executeSampler; + +public BoltExecutor(Map workerData, List executorId, Mapcredentials) { +super(workerData, executorId, credentials); +this.executeSampler = ConfigUtils.mkStatsSampler(stormConf); +} + +@Override +public void init(Map idToTask) { +this.idToTask = idToTask; +LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet()); +for (Map.Entry entry : idToTask.entrySet()) { +Task taskData = entry.getValue(); +IBolt boltObject = (IBolt) taskData.getTaskObject(); +TopologyContext userContext = taskData.getUserContext(); +taskData.getBuiltInMetrics().registerAll(stormConf, userContext); +if (boltObject instanceof ICredentialsListener) { +((ICredentialsListener) boltObject).setCredentials(credentials); +} +if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) { +Map map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue, +"transfer", (DisruptorQueue) workerData.get("transfer-queue")); +BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext); + +Map cachedNodePortToSocket = (Map) ((Atom) workerData.get("cached-node+port->socket")).deref(); + BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, stormConf, userContext); + BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.get("receiver"), stormConf, userContext); +} else { +Map map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue); +BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext); +} + +IOutputCollector outputCollector = new BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, isEventLoggers, isDebug); +boltObject.prepare(stormConf, userContext, new OutputCollector(outputCollector)); +} +openOrPrepareWasCalled.set(true); +
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72934070 --- Diff: storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java --- @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.bolt; + +import clojure.lang.Atom; +import com.google.common.collect.ImmutableMap; +import java.util.List; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.executor.Executor; +import org.apache.storm.hooks.info.BoltExecuteInfo; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.task.IBolt; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.Callable; + +public class BoltExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class); + +private final Callable executeSampler; + +public BoltExecutor(Map workerData, List executorId, Mapcredentials) { +super(workerData, executorId, credentials); +this.executeSampler = ConfigUtils.mkStatsSampler(stormConf); +} + +@Override +public void init(Map idToTask) { +this.idToTask = idToTask; +LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet()); +for (Map.Entry entry : idToTask.entrySet()) { +Task taskData = entry.getValue(); +IBolt boltObject = (IBolt) taskData.getTaskObject(); +TopologyContext userContext = taskData.getUserContext(); +taskData.getBuiltInMetrics().registerAll(stormConf, userContext); +if (boltObject instanceof ICredentialsListener) { +((ICredentialsListener) boltObject).setCredentials(credentials); +} +if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) { +Map map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue, +"transfer", (DisruptorQueue) workerData.get("transfer-queue")); +BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext); + +Map cachedNodePortToSocket = (Map) ((Atom) workerData.get("cached-node+port->socket")).deref(); + BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, stormConf, userContext); + BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.get("receiver"), stormConf, userContext); +} else { +Map map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue); +BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext); +} + +IOutputCollector outputCollector = new BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, isEventLoggers, isDebug); +boltObject.prepare(stormConf, userContext, new OutputCollector(outputCollector)); +} +openOrPrepareWasCalled.set(true); +LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet()); +setupMetrics(); +} + +@Override +public Object call() throws Exception { +while (!stormActive.get()) { --- End diff --
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72933619 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + +protected final Map workerData; +protected final WorkerTopologyContext workerTopologyContext; +protected final List executorId; +protected final List taskIds; +protected final String componentId; +protected final AtomicBoolean openOrPrepareWasCalled; +protected final Map stormConf; +protected final Map conf; +protected final String stormId; +protected final HashMap sharedExecutorData; +protected final AtomicBoolean stormActive; +protected final AtomicReference
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401649#comment-15401649 ] ASF GitHub Bot commented on STORM-1277: --- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72933619 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + +protected final Map workerData; +protected final WorkerTopologyContext workerTopologyContext; +protected final List executorId; +protected final List taskIds; +protected final String componentId; +protected final AtomicBoolean openOrPrepareWasCalled; +protected final Map stormConf; +protected final Map conf; +protected final String stormId; +protected final HashMap sharedExecutorData; +protected final AtomicBoolean stormActive; +protected final AtomicReference
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401640#comment-15401640 ] ASF GitHub Bot commented on STORM-1277: --- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72933087 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMappending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback () { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72933087 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMappending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback () { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +Long timeDelta = null; +if (tupleInfo.getTimestamp() != 0) { +timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); +} +failSpoutMsg(SpoutExecutor.this,
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401621#comment-15401621 ] ASF GitHub Bot commented on STORM-1277: --- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72931639 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + +protected final Map workerData; +protected final WorkerTopologyContext workerTopologyContext; +protected final List executorId; +protected final List taskIds; +protected final String componentId; +protected final AtomicBoolean openOrPrepareWasCalled; +protected final Map stormConf; +protected final Map conf; +protected final String stormId; +protected final HashMap sharedExecutorData; +protected final AtomicBoolean stormActive; +protected final AtomicReference
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72931639 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + +protected final Map workerData; +protected final WorkerTopologyContext workerTopologyContext; +protected final List executorId; +protected final List taskIds; +protected final String componentId; +protected final AtomicBoolean openOrPrepareWasCalled; +protected final Map stormConf; +protected final Map conf; +protected final String stormId; +protected final HashMap sharedExecutorData; +protected final AtomicBoolean stormActive; +protected final AtomicReference
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401611#comment-15401611 ] ASF GitHub Bot commented on STORM-1277: --- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72930327 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class SpoutOutputCollectorImpl implements ISpoutOutputCollector { + +private final SpoutExecutor executor; +private final Task taskData; +private final int taskId; +private final MutableLong emittedCount; +private final boolean hasAckers; +private final Random random; +private final Boolean isEventLoggers; +private final Boolean isDebug; +private final RotatingMappending; + +@SuppressWarnings("unused") +public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData, int taskId, +MutableLong emittedCount, boolean hasAckers, Random random, +Boolean isEventLoggers, Boolean isDebug, RotatingMap pending) { +this.executor = executor; +this.taskData = taskData; +this.taskId = taskId; +this.emittedCount = emittedCount; +this.hasAckers = hasAckers; +this.random = random; +this.isEventLoggers = isEventLoggers; +this.isDebug = isDebug; +this.pending = pending; +} + +@Override +public List emit(String streamId, List tuple, Object messageId) { +return sendSpoutMsg(streamId, tuple, messageId, null); +} + +@Override +public void emitDirect(int taskId, String streamId, List tuple, Object messageId) { +sendSpoutMsg(streamId, tuple, messageId, taskId); +} + +@Override +public long getPendingCount() { +return pending.size(); +} + +@Override +public void reportError(Throwable error) { +executor.getReportError().report(error); +} + +private List sendSpoutMsg(String stream, List values, Object messageId, Integer outTaskId) { +emittedCount.increment(); + +List outTasks; +if (outTaskId != null) { +outTasks = taskData.getOutgoingTasks(outTaskId, stream, values); +} else { +outTasks = taskData.getOutgoingTasks(stream, values); +} + +List ackSeq = new ArrayList<>(); +boolean needAck = (messageId != null) && hasAckers; + +long rootId = MessageId.generateId(random); +for (Integer t : outTasks) { +MessageId msgId; +if (needAck) { +long as = MessageId.generateId(random); +msgId = MessageId.makeRootId(rootId, as); +ackSeq.add(as); +} else { +msgId = MessageId.makeUnanchored(); +} + +TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId);
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72930327 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class SpoutOutputCollectorImpl implements ISpoutOutputCollector { + +private final SpoutExecutor executor; +private final Task taskData; +private final int taskId; +private final MutableLong emittedCount; +private final boolean hasAckers; +private final Random random; +private final Boolean isEventLoggers; +private final Boolean isDebug; +private final RotatingMappending; + +@SuppressWarnings("unused") +public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData, int taskId, +MutableLong emittedCount, boolean hasAckers, Random random, +Boolean isEventLoggers, Boolean isDebug, RotatingMap pending) { +this.executor = executor; +this.taskData = taskData; +this.taskId = taskId; +this.emittedCount = emittedCount; +this.hasAckers = hasAckers; +this.random = random; +this.isEventLoggers = isEventLoggers; +this.isDebug = isDebug; +this.pending = pending; +} + +@Override +public List emit(String streamId, List tuple, Object messageId) { +return sendSpoutMsg(streamId, tuple, messageId, null); +} + +@Override +public void emitDirect(int taskId, String streamId, List tuple, Object messageId) { +sendSpoutMsg(streamId, tuple, messageId, taskId); +} + +@Override +public long getPendingCount() { +return pending.size(); +} + +@Override +public void reportError(Throwable error) { +executor.getReportError().report(error); +} + +private List sendSpoutMsg(String stream, List values, Object messageId, Integer outTaskId) { +emittedCount.increment(); + +List outTasks; +if (outTaskId != null) { +outTasks = taskData.getOutgoingTasks(outTaskId, stream, values); +} else { +outTasks = taskData.getOutgoingTasks(stream, values); +} + +List ackSeq = new ArrayList<>(); +boolean needAck = (messageId != null) && hasAckers; + +long rootId = MessageId.generateId(random); +for (Integer t : outTasks) { +MessageId msgId; +if (needAck) { +long as = MessageId.generateId(random); +msgId = MessageId.makeRootId(rootId, as); +ackSeq.add(as); +} else { +msgId = MessageId.makeUnanchored(); +} + +TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId); +executor.getExecutorTransfer().transfer(t, tuple); +} +if (isEventLoggers) { +executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), messageId, random);