[jira] [Commented] (STORM-2006) Storm metrics feature improvement: support per-worker level metrics aggregation

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403301#comment-15403301
 ] 

ASF GitHub Bot commented on STORM-2006:
---

Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1595
  
OK. I break down my doc to two pieces and posted to wiki page.

- 
https://cwiki.apache.org/confluence/display/STORM/Limitations+of+current+metrics+feature
- 
https://cwiki.apache.org/confluence/display/STORM/Wishlist+for+new+metrics+feature

Since I'm not working on new metrics feature, I didn't put them into 
proposal / design doc category. I think we will adopt or address many places 
including metrics while evaluating JStorm features from Phase 2, so would like 
to wait for that.


> Storm metrics feature improvement: support per-worker level metrics 
> aggregation
> ---
>
> Key: STORM-2006
> URL: https://issues.apache.org/jira/browse/STORM-2006
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Affects Versions: 1.1.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>
> Storm provides per-task level metrics which could be huge when topology has a 
> number of tasks. 
> Task level metric is useful for determining load balance between tasks, but 
> it doesn't need to be time-series fashion.
> Before introducing topology level component like TopologyMaster for JStorm, 
> we can utilize SystemBolt to aggregate task level metrics to per-worker level 
> metrics.
> We should provide options and this feature should be turned off by default to 
> keep backward compatibility. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm issue #1595: STORM-2006 Storm metrics feature improvement: support per...

2016-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1595
  
OK. I break down my doc to two pieces and posted to wiki page.

- 
https://cwiki.apache.org/confluence/display/STORM/Limitations+of+current+metrics+feature
- 
https://cwiki.apache.org/confluence/display/STORM/Wishlist+for+new+metrics+feature

Since I'm not working on new metrics feature, I didn't put them into 
proposal / design doc category. I think we will adopt or address many places 
including metrics while evaluating JStorm features from Phase 2, so would like 
to wait for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403274#comment-15403274
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73087127
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73087127
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+Long timeDelta = null;
+if (tupleInfo.getTimestamp() != 0) {
+timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+}
+failSpoutMsg(SpoutExecutor.this, 

[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403248#comment-15403248
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73085409
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73085409
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+Long timeDelta = null;
+if (tupleInfo.getTimestamp() != 0) {
+timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+}
+failSpoutMsg(SpoutExecutor.this, 

Re: [DISCUSSION] Move out porting tests to after JStorm merger (Phase 3 or even 2.x)

2016-08-01 Thread Jungtaek Lim
Yes they should be included. Maybe all clojure files in src directory
should be ported as well, or be moved to test directory if it's OK to not
exposed to users.

2016년 8월 2일 (화) 오전 1:12, Abhishek Agarwal 님이 작성:

> Tests themselves can wait but helper code (testing.clj and testing4j.clj)
> is part of storm-core jar (both are assigned to me). That could be ported
> at least. These classes depend on other daemons which are yet to be ported.
>
> On Mon, Aug 1, 2016 at 6:57 PM, Bobby Evans 
> wrote:
>
> > The plan was that we would all concentrate on the migration and knock it
> > out quickly, but stuff happened and we were not able to concentrate on it
> > as fully as I would have liked.  I agree that the tests can wait.  What
> is
> > more many of the tests because of local mode have to wait until all of
> the
> > daemons have been ported over.  I really wish there were more hours in
> the
> > day for me to help out more on this. - Bobby
> >
> > On Sunday, July 31, 2016 6:51 PM, Jungtaek Lim 
> > wrote:
> >
> >
> >  Hi devs,
> >
> > Porting to Java seems to be longer than we expected, and now working on
> > storm-core requires working for Java and Clojure, say twice.
> > One of reason for port is "lowering learning curve to have more
> > contributors", but now contributors need to know Clojure to contribute
> > storm-core targeted for 1.x and "port" to master. It's even harder for me
> > since I need some time to investigate how things are ported.
> >
> > Bobby suggested "feature freeze" for 1.x and move to 2.x quickly, but IMO
> > it will work only when we have due date for 2.0.0 and we all concentrate
> on
> > this to release ASAP.
> > (Sorry I had to break "feature freeze" since metrics improvements are
> > needed within months.)
> > Since we're individual and also several different teams, setting a due
> date
> > for this seems to be just ideal. Need to find other ways to make
> transition
> > faster.
> >
> > Looking into progress of porting to Java, I found that we have ported
> many
> > places on source so we only left some (still huge) daemons, but still
> have
> > lots of files on test.
> > Given that ported files should pass Clojure tests, I think porting tests
> is
> > not urgent and we can move this out of phase 1.
> >
> > What do you think?
> >
> > Thanks,
> > Jungtaek Lim (HeartSaVioR)
> >
> > ps. Just two cents, I even think it might be valid way to set milestone
> for
> > 2.0.0 to only phase 1 (with well tested). If Storm 2.0.0 is just a Java
> > port of Storm 1.x, 1.x users can easily move to 2.0.0 and we can minimize
> > supporting multi-versions with main language difference. Phase 2 and
> other
> > improvements can be on top of 2.0.0. (We may need to have 3.0.0 for that
> > but I think version bump is not a big deal.)
> >
> >
> >
> >
>
>
>
> --
> Regards,
> Abhishek Agarwal
>


Re: [DISCUSSION] Move out porting tests to after JStorm merger (Phase 3 or even 2.x)

2016-08-01 Thread Jungtaek Lim
Yeah I think your plan would be perfect once we concentrate migration
works. But things could be happen for all of us, and individual of us need
to focus other thing at that time. In many chance it's not that we could
control.

Let's back to now and find the way to go forward with our plan for Storm
2.0.0. At least I'd like to see port to be finished ASAP, since it makes
huge divergence between branches, and we could even lose the flow when it's
getting longer.

I'm still not familiar with Clojure so not easy to participate huge part of
the work, but would like to help out porting smaller parts, and reviewing
relatively bigger part.

- Jungtaek Lim (HeartSaVioR)

2016년 8월 1일 (월) 오후 10:28, Bobby Evans 님이 작성:

> The plan was that we would all concentrate on the migration and knock it
> out quickly, but stuff happened and we were not able to concentrate on it
> as fully as I would have liked.  I agree that the tests can wait.  What is
> more many of the tests because of local mode have to wait until all of the
> daemons have been ported over.  I really wish there were more hours in the
> day for me to help out more on this. - Bobby
>
> On Sunday, July 31, 2016 6:51 PM, Jungtaek Lim 
> wrote:
>
>
>  Hi devs,
>
> Porting to Java seems to be longer than we expected, and now working on
> storm-core requires working for Java and Clojure, say twice.
> One of reason for port is "lowering learning curve to have more
> contributors", but now contributors need to know Clojure to contribute
> storm-core targeted for 1.x and "port" to master. It's even harder for me
> since I need some time to investigate how things are ported.
>
> Bobby suggested "feature freeze" for 1.x and move to 2.x quickly, but IMO
> it will work only when we have due date for 2.0.0 and we all concentrate on
> this to release ASAP.
> (Sorry I had to break "feature freeze" since metrics improvements are
> needed within months.)
> Since we're individual and also several different teams, setting a due date
> for this seems to be just ideal. Need to find other ways to make transition
> faster.
>
> Looking into progress of porting to Java, I found that we have ported many
> places on source so we only left some (still huge) daemons, but still have
> lots of files on test.
> Given that ported files should pass Clojure tests, I think porting tests is
> not urgent and we can move this out of phase 1.
>
> What do you think?
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> ps. Just two cents, I even think it might be valid way to set milestone for
> 2.0.0 to only phase 1 (with well tested). If Storm 2.0.0 is just a Java
> port of Storm 1.x, 1.x users can easily move to 2.0.0 and we can minimize
> supporting multi-versions with main language difference. Phase 2 and other
> improvements can be on top of 2.0.0. (We may need to have 3.0.0 for that
> but I think version bump is not a big deal.)
>
>
>


[PROPOSAL] submitting topology with adding jars and maven artifacts

2016-08-01 Thread Jungtaek Lim
Hi dev,

This is proposal review thread for submitting topology with adding jars and
maven artifacts. This is also following up discussion thread for [DISCUSSION]
Policy of resolving dependencies for non storm-core modules.[1]

I've written design doc which also describes motivation on this.
https://cwiki.apache.org/confluence/display/STORM/A.+Design+doc%3A+adding+jars+and+maven+artifacts+at+submission

Please review this and comment to "this thread" instead of wiki page so
that all devs can be notified for the update.

Thanks,
Jungtaek Lim (HeartSaVioR)

[1]
http://mail-archives.apache.org/mod_mbox/storm-dev/201607.mbox/%3CCAF5108jByyJLTKrV_P4fS=dj8rsr_o5oubzqbviscggsc1c...@mail.gmail.com%3E


Re: [DISCUSSION] Policy of resolving dependencies for non storm-core modules

2016-08-01 Thread Jungtaek Lim
Thanks Taylor!

Let me open a new thread for proposal and continue discussion from there.

Jungtaek Lim (HeartSaVioR)

2016년 8월 2일 (화) 오전 12:10, P. Taylor Goetz 님이 작성:

> Jungtaek, I’ve given you permissions to the Storm wiki so you can post the
> document there.
>
> -Taylor
>
> > On Aug 1, 2016, at 12:46 AM, Jungtaek Lim  wrote:
> >
> > Not yet. It would be good to continue discussion on this thread for now.
> >
> > Thanks,
> > Jungtaek Lim (HeartSaVioR)
> >
> > 2016년 8월 1일 (월) 오후 1:38, Satish Duggana 님이 작성:
> >
> >> Hi,
> >> Did we find a way to comment on the proposal? How can we proceed with
> this
> >> discussion?
> >>
> >> Thanks,
> >> Satish.
> >>
> >>
> >> On Thu, Jul 28, 2016 at 9:42 AM, Jungtaek Lim 
> wrote:
> >>
> >>> I've made a design document for adding jars and maven artifacts at
> >>> submission.
> >>> Since we don't have any formats for this, I just borrowed KIP format.
> >>> That's not we would like to adopt that format, it's just me.
> >>>
> >>>
> >>>
> >>
> https://docs.google.com/document/d/1f3Ed0Wa8aN2j7gtptT0BOKYMyBdpohLgERg4YebJJ7c/edit?usp=sharing
> >>>
> >>> Btw, I guess ASF policy requires that discussion history should be
> logged
> >>> to Apache Infra.
> >>> (So I only grant view permission who opens the doc via link.)
> >>>
> >>> So what's recommended way to do? Would we want to file an issue for
> JIRA
> >>> with attaching design doc to PDF and discuss there?
> >>>
> >>> - Jungtaek Lim
> >>>
> >>> 2016년 7월 23일 (토) 오전 12:06, Jungtaek Lim 님이 작성:
> >>>
>  Thought about it once more, and found that former approach still needs
> >> to
>  add 'provided' scope libraries to extlib directory..
> 
>  Along with thinking former approach, I've experimented a bit of latter
>  approach by creating POC project. Since I don't know about copy and
> use
> >>> ASF
>  project codes for personal use, I'd just share main class to see if
> >> this
>  POC and my theory makes sense for us.
>  https://gist.github.com/HeartSaVioR/3639a9ee829fe1203b4a085a0fb069d6
>  'zeppelin' package has some classes for transitive dependency resolver
>  (copied from Zeppelin), and 'spark' package has some classes for
>  classloader (yes also copied from Spark).
> 
>  Please share your experiences if you have knowledges regarding this
> >> area.
> 
>  - Jungtaek Lim
> 
>  2016년 7월 22일 (금) 오후 10:59, Bobby Evans  >님이
> >>> 작성:
> 
> > You can do that with a combination of the distributed cache and
> >> setting
> > the classpath, just like with hadoop.  It is not as clean as it could
> >>> be,
> > but it does work. - Bobby
> >
> >On Thursday, July 21, 2016 11:09 PM, Arun Mahadevan <
> >>> ar...@apache.org>
> > wrote:
> >
> >
> > Shade and relocate the external modules sounds ok as a short term
> > solution.
> >
> > For the long term we should consider something like the second option
> >> to
> > add external modules without shipping uber jars.
> >
> > Thanks,
> > Arun
> >
> > On 7/22/16, 6:07 AM, "Jungtaek Lim"  wrote:
> >
> >> Hi devs,
> >>
> >> AFAIK, we had been struggled to resolve dependency issues for
> >>> storm-core.
> >> As we all know, the strategy we have been using is shade &
> >> relocating.
> >>
> >> Now State and Storm SQL requires that some of external modules need
> >> to
> >>> be
> >> included to extlib, which is the classpath workers refer.
> >>
> >> http://issues.apache.org/jira/browse/STORM-1881
> >> https://issues.apache.org/jira/browse/STORM-1435
> >>
> >> There're two issues here:
> >> - We don't make uber jar for external modules so users need to find
> >> and
> >> copy dependencies jars to extlib manually.
> >> - External modules also use Guava and Jackson and so on which are
> >>> origin
> > of
> >> version conflict issues.
> >>
> >> So we should apply the shade & relocating strategy for every
> external
> >> modules (at least storm-redis, storm-kafka, storm-sql-core,
> >> storm-sql-kafka), or introduce the way to add the dependency without
> > adding
> >> them to extlib. (like --packages and --jar for Spark)
> >>
> >> Please express your opinions about this.
> >>
> >> Thanks,
> >> Jungtaek Lim (HeartSaVioR)
> >
> >
> >
> >
> >
> 
> 
> >>>
> >>
>
>


[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403098#comment-15403098
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73077004
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73077004
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+Long timeDelta = null;
+if (tupleInfo.getTimestamp() != 0) {
+timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+}
+failSpoutMsg(SpoutExecutor.this, 

[jira] [Commented] (STORM-1839) Kinesis Spout

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402850#comment-15402850
 ] 

ASF GitHub Bot commented on STORM-1839:
---

Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r7301
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
 ---
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordsManager.class);
+// object handling zk interaction
+private transient ZKConnection zkConnection;
+// object handling interaction with kinesis
+private transient KinesisConnection kinesisConnection;
+// Kinesis Spout KinesisConfig object
+private transient final KinesisConfig kinesisConfig;
+// Queue of records per shard fetched from kinesis and are waiting to 
be emitted
+private transient Map toEmitPerShard = new 
HashMap<>();
+// Map of records  that were fetched from kinesis as a result of 
failure and are waiting to be emitted
+private transient Map 
failedandFetchedRecords = new HashMap<>();
+// Sequence numbers per shard that have been emitted. LinkedHashSet as 
we need to remove on ack or fail. At the same time order is needed to figure 
out the
+// sequence number to commit. Logic explained in commit
+private transient Map emittedPerShard = 
new HashMap<>();
+// sorted acked sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map ackedPerShard = new 
HashMap<>();
+// sorted failed sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map failedPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for new messages
+private transient Map shardIteratorPerShard = new 
HashMap<>();
+// last fetched sequence number corresponding to position in shard
+private transient Map fetchedSequenceNumberPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for failed 
messages
+private transient Map 
shardIteratorPerFailedMessage = new HashMap<>();
+// timestamp to decide when to commit to zk again
+private transient long lastCommitTime;
+// boolean to track deactivated state
+private transient boolean deactivated;
+
+KinesisRecordsManager (KinesisConfig kinesisConfig) {
+this.kinesisConfig = kinesisConfig;
+this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+this.kinesisConnection = new 
KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
+}
+
+void initialize (int myTaskIndex, int totalTasks) {
+deactivated = false;
+lastCommitTime = System.currentTimeMillis();
+

[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-08-01 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r7301
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
 ---
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordsManager.class);
+// object handling zk interaction
+private transient ZKConnection zkConnection;
+// object handling interaction with kinesis
+private transient KinesisConnection kinesisConnection;
+// Kinesis Spout KinesisConfig object
+private transient final KinesisConfig kinesisConfig;
+// Queue of records per shard fetched from kinesis and are waiting to 
be emitted
+private transient Map toEmitPerShard = new 
HashMap<>();
+// Map of records  that were fetched from kinesis as a result of 
failure and are waiting to be emitted
+private transient Map 
failedandFetchedRecords = new HashMap<>();
+// Sequence numbers per shard that have been emitted. LinkedHashSet as 
we need to remove on ack or fail. At the same time order is needed to figure 
out the
+// sequence number to commit. Logic explained in commit
+private transient Map emittedPerShard = 
new HashMap<>();
+// sorted acked sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map ackedPerShard = new 
HashMap<>();
+// sorted failed sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map failedPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for new messages
+private transient Map shardIteratorPerShard = new 
HashMap<>();
+// last fetched sequence number corresponding to position in shard
+private transient Map fetchedSequenceNumberPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for failed 
messages
+private transient Map 
shardIteratorPerFailedMessage = new HashMap<>();
+// timestamp to decide when to commit to zk again
+private transient long lastCommitTime;
+// boolean to track deactivated state
+private transient boolean deactivated;
+
+KinesisRecordsManager (KinesisConfig kinesisConfig) {
+this.kinesisConfig = kinesisConfig;
+this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+this.kinesisConnection = new 
KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
+}
+
+void initialize (int myTaskIndex, int totalTasks) {
+deactivated = false;
+lastCommitTime = System.currentTimeMillis();
+kinesisConnection.initialize();
+zkConnection.initialize();
+List shards = 
kinesisConnection.getShardsForStream(kinesisConfig.getStreamName());
+LOG.info("myTaskIndex is " + myTaskIndex);
+LOG.info("totalTasks is 

[jira] [Commented] (STORM-1839) Kinesis Spout

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402817#comment-15402817
 ] 

ASF GitHub Bot commented on STORM-1839:
---

Github user harshach commented on the issue:

https://github.com/apache/storm/pull/1586
  
+1. Build failures not related to the patch.


> Kinesis Spout
> -
>
> Key: STORM-1839
> URL: https://issues.apache.org/jira/browse/STORM-1839
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Sriharsha Chintalapani
>Assignee: Priyank Shah
>
> As Storm is increasingly used in Cloud environments. It will great to have a 
> Kinesis Spout integration in Apache Storm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm issue #1586: STORM-1839: Storm spout implementation for Amazon Kinesis...

2016-08-01 Thread harshach
Github user harshach commented on the issue:

https://github.com/apache/storm/pull/1586
  
+1. Build failures not related to the patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1602: Fix command to run RollingTopWords example

2016-08-01 Thread Zintinio
GitHub user Zintinio opened a pull request:

https://github.com/apache/storm/pull/1602

Fix command to run RollingTopWords example

Appended target directory to command. Previously resulted in ClassNotFound 
exception.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Zintinio/storm master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1602.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1602


commit cc6deef506c6575cbaa3c2ef0728c8151018d9e8
Author: Arthur Maciejewicz 
Date:   2016-08-01T20:50:48Z

Fix command to run RollingTopWords example

Appended target directory to command. Previously resulted in ClassNotFound 
exception.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1839) Kinesis Spout

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402724#comment-15402724
 ] 

ASF GitHub Bot commented on STORM-1839:
---

Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r73043179
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
 ---
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordsManager.class);
+// object handling zk interaction
+private transient ZKConnection zkConnection;
+// object handling interaction with kinesis
+private transient KinesisConnection kinesisConnection;
+// Kinesis Spout KinesisConfig object
+private transient final KinesisConfig kinesisConfig;
+// Queue of records per shard fetched from kinesis and are waiting to 
be emitted
+private transient Map toEmitPerShard = new 
HashMap<>();
+// Map of records  that were fetched from kinesis as a result of 
failure and are waiting to be emitted
+private transient Map 
failedandFetchedRecords = new HashMap<>();
+// Sequence numbers per shard that have been emitted. LinkedHashSet as 
we need to remove on ack or fail. At the same time order is needed to figure 
out the
+// sequence number to commit. Logic explained in commit
+private transient Map emittedPerShard = 
new HashMap<>();
+// sorted acked sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map ackedPerShard = new 
HashMap<>();
+// sorted failed sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map failedPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for new messages
+private transient Map shardIteratorPerShard = new 
HashMap<>();
+// last fetched sequence number corresponding to position in shard
+private transient Map fetchedSequenceNumberPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for failed 
messages
+private transient Map 
shardIteratorPerFailedMessage = new HashMap<>();
+// timestamp to decide when to commit to zk again
+private transient long lastCommitTime;
+// boolean to track deactivated state
+private transient boolean deactivated;
+
+KinesisRecordsManager (KinesisConfig kinesisConfig) {
+this.kinesisConfig = kinesisConfig;
+this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+this.kinesisConnection = new 
KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
+}
+
+void initialize (int myTaskIndex, int totalTasks) {
+deactivated = false;
+lastCommitTime = System.currentTimeMillis();
+

[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-08-01 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r73043179
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
 ---
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordsManager.class);
+// object handling zk interaction
+private transient ZKConnection zkConnection;
+// object handling interaction with kinesis
+private transient KinesisConnection kinesisConnection;
+// Kinesis Spout KinesisConfig object
+private transient final KinesisConfig kinesisConfig;
+// Queue of records per shard fetched from kinesis and are waiting to 
be emitted
+private transient Map toEmitPerShard = new 
HashMap<>();
+// Map of records  that were fetched from kinesis as a result of 
failure and are waiting to be emitted
+private transient Map 
failedandFetchedRecords = new HashMap<>();
+// Sequence numbers per shard that have been emitted. LinkedHashSet as 
we need to remove on ack or fail. At the same time order is needed to figure 
out the
+// sequence number to commit. Logic explained in commit
+private transient Map emittedPerShard = 
new HashMap<>();
+// sorted acked sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map ackedPerShard = new 
HashMap<>();
+// sorted failed sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map failedPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for new messages
+private transient Map shardIteratorPerShard = new 
HashMap<>();
+// last fetched sequence number corresponding to position in shard
+private transient Map fetchedSequenceNumberPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for failed 
messages
+private transient Map 
shardIteratorPerFailedMessage = new HashMap<>();
+// timestamp to decide when to commit to zk again
+private transient long lastCommitTime;
+// boolean to track deactivated state
+private transient boolean deactivated;
+
+KinesisRecordsManager (KinesisConfig kinesisConfig) {
+this.kinesisConfig = kinesisConfig;
+this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+this.kinesisConnection = new 
KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
+}
+
+void initialize (int myTaskIndex, int totalTasks) {
+deactivated = false;
+lastCommitTime = System.currentTimeMillis();
+kinesisConnection.initialize();
+zkConnection.initialize();
+List shards = 
kinesisConnection.getShardsForStream(kinesisConfig.getStreamName());
+LOG.info("myTaskIndex is " + myTaskIndex);
+LOG.info("totalTasks is " 

[jira] [Commented] (STORM-1839) Kinesis Spout

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402668#comment-15402668
 ] 

ASF GitHub Bot commented on STORM-1839:
---

Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1586
  
@harshach I added some more comments in ack method and commit method for 
acking logic. I also moved code interacting with zk and kinesis in to its own 
class to make KinesisRecordsManager smaller. Please review when you get a chance


> Kinesis Spout
> -
>
> Key: STORM-1839
> URL: https://issues.apache.org/jira/browse/STORM-1839
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Sriharsha Chintalapani
>Assignee: Priyank Shah
>
> As Storm is increasingly used in Cloud environments. It will great to have a 
> Kinesis Spout integration in Apache Storm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm issue #1586: STORM-1839: Storm spout implementation for Amazon Kinesis...

2016-08-01 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1586
  
@harshach I added some more comments in ack method and commit method for 
acking logic. I also moved code interacting with zk and kinesis in to its own 
class to make KinesisRecordsManager smaller. Please review when you get a chance


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSSION] Move out porting tests to after JStorm merger (Phase 3 or even 2.x)

2016-08-01 Thread Abhishek Agarwal
Tests themselves can wait but helper code (testing.clj and testing4j.clj)
is part of storm-core jar (both are assigned to me). That could be ported
at least. These classes depend on other daemons which are yet to be ported.

On Mon, Aug 1, 2016 at 6:57 PM, Bobby Evans 
wrote:

> The plan was that we would all concentrate on the migration and knock it
> out quickly, but stuff happened and we were not able to concentrate on it
> as fully as I would have liked.  I agree that the tests can wait.  What is
> more many of the tests because of local mode have to wait until all of the
> daemons have been ported over.  I really wish there were more hours in the
> day for me to help out more on this. - Bobby
>
> On Sunday, July 31, 2016 6:51 PM, Jungtaek Lim 
> wrote:
>
>
>  Hi devs,
>
> Porting to Java seems to be longer than we expected, and now working on
> storm-core requires working for Java and Clojure, say twice.
> One of reason for port is "lowering learning curve to have more
> contributors", but now contributors need to know Clojure to contribute
> storm-core targeted for 1.x and "port" to master. It's even harder for me
> since I need some time to investigate how things are ported.
>
> Bobby suggested "feature freeze" for 1.x and move to 2.x quickly, but IMO
> it will work only when we have due date for 2.0.0 and we all concentrate on
> this to release ASAP.
> (Sorry I had to break "feature freeze" since metrics improvements are
> needed within months.)
> Since we're individual and also several different teams, setting a due date
> for this seems to be just ideal. Need to find other ways to make transition
> faster.
>
> Looking into progress of porting to Java, I found that we have ported many
> places on source so we only left some (still huge) daemons, but still have
> lots of files on test.
> Given that ported files should pass Clojure tests, I think porting tests is
> not urgent and we can move this out of phase 1.
>
> What do you think?
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> ps. Just two cents, I even think it might be valid way to set milestone for
> 2.0.0 to only phase 1 (with well tested). If Storm 2.0.0 is just a Java
> port of Storm 1.x, 1.x users can easily move to 2.0.0 and we can minimize
> supporting multi-versions with main language difference. Phase 2 and other
> improvements can be on top of 2.0.0. (We may need to have 3.0.0 for that
> but I think version bump is not a big deal.)
>
>
>
>



-- 
Regards,
Abhishek Agarwal


Re: [DISCUSSION] Policy of resolving dependencies for non storm-core modules

2016-08-01 Thread P. Taylor Goetz
Jungtaek, I’ve given you permissions to the Storm wiki so you can post the 
document there.

-Taylor

> On Aug 1, 2016, at 12:46 AM, Jungtaek Lim  wrote:
> 
> Not yet. It would be good to continue discussion on this thread for now.
> 
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> 
> 2016년 8월 1일 (월) 오후 1:38, Satish Duggana 님이 작성:
> 
>> Hi,
>> Did we find a way to comment on the proposal? How can we proceed with this
>> discussion?
>> 
>> Thanks,
>> Satish.
>> 
>> 
>> On Thu, Jul 28, 2016 at 9:42 AM, Jungtaek Lim  wrote:
>> 
>>> I've made a design document for adding jars and maven artifacts at
>>> submission.
>>> Since we don't have any formats for this, I just borrowed KIP format.
>>> That's not we would like to adopt that format, it's just me.
>>> 
>>> 
>>> 
>> https://docs.google.com/document/d/1f3Ed0Wa8aN2j7gtptT0BOKYMyBdpohLgERg4YebJJ7c/edit?usp=sharing
>>> 
>>> Btw, I guess ASF policy requires that discussion history should be logged
>>> to Apache Infra.
>>> (So I only grant view permission who opens the doc via link.)
>>> 
>>> So what's recommended way to do? Would we want to file an issue for JIRA
>>> with attaching design doc to PDF and discuss there?
>>> 
>>> - Jungtaek Lim
>>> 
>>> 2016년 7월 23일 (토) 오전 12:06, Jungtaek Lim 님이 작성:
>>> 
 Thought about it once more, and found that former approach still needs
>> to
 add 'provided' scope libraries to extlib directory..
 
 Along with thinking former approach, I've experimented a bit of latter
 approach by creating POC project. Since I don't know about copy and use
>>> ASF
 project codes for personal use, I'd just share main class to see if
>> this
 POC and my theory makes sense for us.
 https://gist.github.com/HeartSaVioR/3639a9ee829fe1203b4a085a0fb069d6
 'zeppelin' package has some classes for transitive dependency resolver
 (copied from Zeppelin), and 'spark' package has some classes for
 classloader (yes also copied from Spark).
 
 Please share your experiences if you have knowledges regarding this
>> area.
 
 - Jungtaek Lim
 
 2016년 7월 22일 (금) 오후 10:59, Bobby Evans 님이
>>> 작성:
 
> You can do that with a combination of the distributed cache and
>> setting
> the classpath, just like with hadoop.  It is not as clean as it could
>>> be,
> but it does work. - Bobby
> 
>On Thursday, July 21, 2016 11:09 PM, Arun Mahadevan <
>>> ar...@apache.org>
> wrote:
> 
> 
> Shade and relocate the external modules sounds ok as a short term
> solution.
> 
> For the long term we should consider something like the second option
>> to
> add external modules without shipping uber jars.
> 
> Thanks,
> Arun
> 
> On 7/22/16, 6:07 AM, "Jungtaek Lim"  wrote:
> 
>> Hi devs,
>> 
>> AFAIK, we had been struggled to resolve dependency issues for
>>> storm-core.
>> As we all know, the strategy we have been using is shade &
>> relocating.
>> 
>> Now State and Storm SQL requires that some of external modules need
>> to
>>> be
>> included to extlib, which is the classpath workers refer.
>> 
>> http://issues.apache.org/jira/browse/STORM-1881
>> https://issues.apache.org/jira/browse/STORM-1435
>> 
>> There're two issues here:
>> - We don't make uber jar for external modules so users need to find
>> and
>> copy dependencies jars to extlib manually.
>> - External modules also use Guava and Jackson and so on which are
>>> origin
> of
>> version conflict issues.
>> 
>> So we should apply the shade & relocating strategy for every external
>> modules (at least storm-redis, storm-kafka, storm-sql-core,
>> storm-sql-kafka), or introduce the way to add the dependency without
> adding
>> them to extlib. (like --packages and --jar for Spark)
>> 
>> Please express your opinions about this.
>> 
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
> 
> 
> 
> 
> 
 
 
>>> 
>> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java

2016-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1445
  
I've done reviewing with first pass. Great work.
This is rather huge diff so the more reviewers the better reviewing. Please 
go on reviewing everybody.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402176#comment-15402176
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1445
  
I've done reviewing with first pass. Great work.
This is rather huge diff so the more reviewers the better reviewing. Please 
go on reviewing everybody.


> port backtype.storm.daemon.executor to java
> ---
>
> Key: STORM-1277
> URL: https://issues.apache.org/jira/browse/STORM-1277
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Cody
>  Labels: java-migration, jstorm-merger
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task
>  kind of.  Tasks and executors are combined in jstorm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72987752
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java 
---
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BoltOutputCollectorImpl implements IOutputCollector {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
+
+private final BoltExecutor executor;
+private final Task taskData;
+private final int taskId;
+private final Random random;
+private final boolean isEventLoggers;
+private final boolean isDebug;
+
+public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, 
int taskId, Random random,
+   boolean isEventLoggers, boolean 
isDebug) {
+this.executor = executor;
+this.taskData = taskData;
+this.taskId = taskId;
+this.random = random;
+this.isEventLoggers = isEventLoggers;
+this.isDebug = isDebug;
+}
+
+public List emit(String streamId, Collection anchors, 
List tuple) {
+return boltEmit(streamId, anchors, tuple, null);
+}
+
+@Override
+public void emitDirect(int taskId, String streamId, Collection 
anchors, List tuple) {
+boltEmit(streamId, anchors, tuple, taskId);
+}
+
+private List boltEmit(String streamId, Collection 
anchors, List values, Integer targetTaskId) {
+List outTasks;
+if (targetTaskId != null) {
+outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, 
values);
+} else {
+outTasks = taskData.getOutgoingTasks(streamId, values);
+}
+
+for (Integer t : outTasks) {
+Map anchorsToIds = new HashMap<>();
+if (anchors != null) {
+for (Tuple a : anchors) {
+long edgeId = MessageId.generateId(random);
+((TupleImpl) a).updateAckVal(edgeId);
+for (Long root_id : 
a.getMessageId().getAnchorsToIds().keySet()) {
+putXor(anchorsToIds, root_id, edgeId);
+}
+}
+}
+MessageId msgId = MessageId.makeId(anchorsToIds);
+TupleImpl tupleExt = new 
TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
+executor.getExecutorTransfer().transfer(t, tupleExt);
+}
+if (isEventLoggers) {
+executor.sendToEventLogger(executor, taskData, values, 
executor.getComponentId(), null, random);
+}
+return outTasks;
--- End diff --

`(or out-tasks [])` seems to guarantee at least empty list. Does this 
guarantee the same?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact 

[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402127#comment-15402127
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72987752
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java 
---
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BoltOutputCollectorImpl implements IOutputCollector {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
+
+private final BoltExecutor executor;
+private final Task taskData;
+private final int taskId;
+private final Random random;
+private final boolean isEventLoggers;
+private final boolean isDebug;
+
+public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, 
int taskId, Random random,
+   boolean isEventLoggers, boolean 
isDebug) {
+this.executor = executor;
+this.taskData = taskData;
+this.taskId = taskId;
+this.random = random;
+this.isEventLoggers = isEventLoggers;
+this.isDebug = isDebug;
+}
+
+public List emit(String streamId, Collection anchors, 
List tuple) {
+return boltEmit(streamId, anchors, tuple, null);
+}
+
+@Override
+public void emitDirect(int taskId, String streamId, Collection 
anchors, List tuple) {
+boltEmit(streamId, anchors, tuple, taskId);
+}
+
+private List boltEmit(String streamId, Collection 
anchors, List values, Integer targetTaskId) {
+List outTasks;
+if (targetTaskId != null) {
+outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, 
values);
+} else {
+outTasks = taskData.getOutgoingTasks(streamId, values);
+}
+
+for (Integer t : outTasks) {
+Map anchorsToIds = new HashMap<>();
+if (anchors != null) {
+for (Tuple a : anchors) {
+long edgeId = MessageId.generateId(random);
+((TupleImpl) a).updateAckVal(edgeId);
+for (Long root_id : 
a.getMessageId().getAnchorsToIds().keySet()) {
+putXor(anchorsToIds, root_id, edgeId);
+}
+}
+}
+MessageId msgId = MessageId.makeId(anchorsToIds);
+TupleImpl tupleExt = new 
TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
+executor.getExecutorTransfer().transfer(t, tupleExt);
+}
+if (isEventLoggers) {
+executor.sendToEventLogger(executor, taskData, values, 
executor.getComponentId(), null, random);
+}
+return outTasks;
--- End diff --

`(or out-tasks [])` seems to guarantee at least empty list. Does this 
guarantee the same?


> 

[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402121#comment-15402121
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72987136
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java 
---
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BoltOutputCollectorImpl implements IOutputCollector {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
+
+private final BoltExecutor executor;
+private final Task taskData;
+private final int taskId;
+private final Random random;
+private final boolean isEventLoggers;
+private final boolean isDebug;
+
+public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, 
int taskId, Random random,
+   boolean isEventLoggers, boolean 
isDebug) {
+this.executor = executor;
+this.taskData = taskData;
+this.taskId = taskId;
+this.random = random;
+this.isEventLoggers = isEventLoggers;
+this.isDebug = isDebug;
+}
+
+public List emit(String streamId, Collection anchors, 
List tuple) {
+return boltEmit(streamId, anchors, tuple, null);
+}
+
+@Override
+public void emitDirect(int taskId, String streamId, Collection 
anchors, List tuple) {
+boltEmit(streamId, anchors, tuple, taskId);
+}
+
+private List boltEmit(String streamId, Collection 
anchors, List values, Integer targetTaskId) {
+List outTasks;
+if (targetTaskId != null) {
+outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, 
values);
+} else {
+outTasks = taskData.getOutgoingTasks(streamId, values);
+}
+
+for (Integer t : outTasks) {
+Map anchorsToIds = new HashMap<>();
+if (anchors != null) {
+for (Tuple a : anchors) {
+long edgeId = MessageId.generateId(random);
--- End diff --

```
(let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
 -  
 (when (pos? (count root-ids))
 -  
   (let [edge-id (MessageId/generateId rand)]
 -  
 (.updateAckVal a edge-id)
 -  
 (fast-list-iter [root-id root-ids]
 -  
 (put-xor! anchors-to-ids root-id 
edge-id))
```

if root-ids.size() is 0, ported code behaves differ. It should check 

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72987136
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java 
---
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BoltOutputCollectorImpl implements IOutputCollector {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
+
+private final BoltExecutor executor;
+private final Task taskData;
+private final int taskId;
+private final Random random;
+private final boolean isEventLoggers;
+private final boolean isDebug;
+
+public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, 
int taskId, Random random,
+   boolean isEventLoggers, boolean 
isDebug) {
+this.executor = executor;
+this.taskData = taskData;
+this.taskId = taskId;
+this.random = random;
+this.isEventLoggers = isEventLoggers;
+this.isDebug = isDebug;
+}
+
+public List emit(String streamId, Collection anchors, 
List tuple) {
+return boltEmit(streamId, anchors, tuple, null);
+}
+
+@Override
+public void emitDirect(int taskId, String streamId, Collection 
anchors, List tuple) {
+boltEmit(streamId, anchors, tuple, taskId);
+}
+
+private List boltEmit(String streamId, Collection 
anchors, List values, Integer targetTaskId) {
+List outTasks;
+if (targetTaskId != null) {
+outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, 
values);
+} else {
+outTasks = taskData.getOutgoingTasks(streamId, values);
+}
+
+for (Integer t : outTasks) {
+Map anchorsToIds = new HashMap<>();
+if (anchors != null) {
+for (Tuple a : anchors) {
+long edgeId = MessageId.generateId(random);
--- End diff --

```
(let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
 -  
 (when (pos? (count root-ids))
 -  
   (let [edge-id (MessageId/generateId rand)]
 -  
 (.updateAckVal a edge-id)
 -  
 (fast-list-iter [root-id root-ids]
 -  
 (put-xor! anchors-to-ids root-id 
edge-id))
```

if root-ids.size() is 0, ported code behaves differ. It should check size 
first, and skip all if size is 0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is 

[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402096#comment-15402096
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72983906
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72983906
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+Long timeDelta = null;
+if (tupleInfo.getTimestamp() != 0) {
+timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+}
+failSpoutMsg(SpoutExecutor.this, 

[jira] [Created] (STORM-2011) Add JSON Serialization for Java Objects

2016-08-01 Thread Ellison Anne Williams (JIRA)
Ellison Anne Williams created STORM-2011:


 Summary: Add JSON Serialization for Java Objects 
 Key: STORM-2011
 URL: https://issues.apache.org/jira/browse/STORM-2011
 Project: Apache Storm
  Issue Type: Bug
Reporter: Ellison Anne Williams


Fill in the JsonSerializer class - perhaps use gson...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (STORM-2011) Add JSON Serialization for Java Objects

2016-08-01 Thread Ellison Anne Williams (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ellison Anne Williams closed STORM-2011.

Resolution: Fixed

Accidentally added this in the wrong JIRA project... Sorry!

> Add JSON Serialization for Java Objects 
> 
>
> Key: STORM-2011
> URL: https://issues.apache.org/jira/browse/STORM-2011
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Ellison Anne Williams
>
> Fill in the JsonSerializer class - perhaps use gson...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] Move out porting tests to after JStorm merger (Phase 3 or even 2.x)

2016-08-01 Thread Bobby Evans
The plan was that we would all concentrate on the migration and knock it out 
quickly, but stuff happened and we were not able to concentrate on it as fully 
as I would have liked.  I agree that the tests can wait.  What is more many of 
the tests because of local mode have to wait until all of the daemons have been 
ported over.  I really wish there were more hours in the day for me to help out 
more on this. - Bobby 

On Sunday, July 31, 2016 6:51 PM, Jungtaek Lim  wrote:
 

 Hi devs,

Porting to Java seems to be longer than we expected, and now working on
storm-core requires working for Java and Clojure, say twice.
One of reason for port is "lowering learning curve to have more
contributors", but now contributors need to know Clojure to contribute
storm-core targeted for 1.x and "port" to master. It's even harder for me
since I need some time to investigate how things are ported.

Bobby suggested "feature freeze" for 1.x and move to 2.x quickly, but IMO
it will work only when we have due date for 2.0.0 and we all concentrate on
this to release ASAP.
(Sorry I had to break "feature freeze" since metrics improvements are
needed within months.)
Since we're individual and also several different teams, setting a due date
for this seems to be just ideal. Need to find other ways to make transition
faster.

Looking into progress of porting to Java, I found that we have ported many
places on source so we only left some (still huge) daemons, but still have
lots of files on test.
Given that ported files should pass Clojure tests, I think porting tests is
not urgent and we can move this out of phase 1.

What do you think?

Thanks,
Jungtaek Lim (HeartSaVioR)

ps. Just two cents, I even think it might be valid way to set milestone for
2.0.0 to only phase 1 (with well tested). If Storm 2.0.0 is just a Java
port of Storm 1.x, 1.x users can easily move to 2.0.0 and we can minimize
supporting multi-versions with main language difference. Phase 2 and other
improvements can be on top of 2.0.0. (We may need to have 3.0.0 for that
but I think version bump is not a big deal.)


  

[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401872#comment-15401872
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72961638
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72961638
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+Long timeDelta = null;
+if (tupleInfo.getTimestamp() != 0) {
+timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+}
+failSpoutMsg(SpoutExecutor.this, 

[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401868#comment-15401868
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72961410
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
+
+protected final Map workerData;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final List executorId;
+protected final List taskIds;
+protected final String componentId;
+protected final AtomicBoolean openOrPrepareWasCalled;
+protected final Map stormConf;
+protected final Map conf;
+protected final String stormId;
+protected final HashMap sharedExecutorData;
+protected final AtomicBoolean stormActive;
+protected final AtomicReference> 
stormComponentDebug;
+protected final Runnable suicideFn;
+protected final IStormClusterState 

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72961410
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
+
+protected final Map workerData;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final List executorId;
+protected final List taskIds;
+protected final String componentId;
+protected final AtomicBoolean openOrPrepareWasCalled;
+protected final Map stormConf;
+protected final Map conf;
+protected final String stormId;
+protected final HashMap sharedExecutorData;
+protected final AtomicBoolean stormActive;
+protected final AtomicReference> 
stormComponentDebug;
+protected final Runnable suicideFn;
+protected final IStormClusterState stormClusterState;
+protected final Map taskToComponent;
+protected CommonStats stats;
+protected final Map>> 
intervalToTaskToMetricToRegistry;
+

[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401651#comment-15401651
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72934070
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java ---
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import clojure.lang.Atom;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class BoltExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BoltExecutor.class);
+
+private final Callable executeSampler;
+
+public BoltExecutor(Map workerData, List executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.executeSampler = ConfigUtils.mkStatsSampler(stormConf);
+}
+
+@Override
+public void init(Map idToTask) {
+this.idToTask = idToTask;
+LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet());
+for (Map.Entry entry : idToTask.entrySet()) {
+Task taskData = entry.getValue();
+IBolt boltObject = (IBolt) taskData.getTaskObject();
+TopologyContext userContext = taskData.getUserContext();
+taskData.getBuiltInMetrics().registerAll(stormConf, 
userContext);
+if (boltObject instanceof ICredentialsListener) {
+((ICredentialsListener) 
boltObject).setCredentials(credentials);
+}
+if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
+Map map = 
ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue,
+"transfer", (DisruptorQueue) 
workerData.get("transfer-queue"));
+BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, 
userContext);
+
+Map cachedNodePortToSocket = (Map) ((Atom) 
workerData.get("cached-node+port->socket")).deref();
+
BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, 
stormConf, userContext);
+
BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.get("receiver"), 
stormConf, userContext);
+} else {
+Map map = 
ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, 
userContext);
+}
+
+IOutputCollector outputCollector = new 
BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, isEventLoggers, 
isDebug);
+boltObject.prepare(stormConf, userContext, new 
OutputCollector(outputCollector));
+}
+openOrPrepareWasCalled.set(true);
+

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72934070
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java ---
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import clojure.lang.Atom;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class BoltExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BoltExecutor.class);
+
+private final Callable executeSampler;
+
+public BoltExecutor(Map workerData, List executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.executeSampler = ConfigUtils.mkStatsSampler(stormConf);
+}
+
+@Override
+public void init(Map idToTask) {
+this.idToTask = idToTask;
+LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet());
+for (Map.Entry entry : idToTask.entrySet()) {
+Task taskData = entry.getValue();
+IBolt boltObject = (IBolt) taskData.getTaskObject();
+TopologyContext userContext = taskData.getUserContext();
+taskData.getBuiltInMetrics().registerAll(stormConf, 
userContext);
+if (boltObject instanceof ICredentialsListener) {
+((ICredentialsListener) 
boltObject).setCredentials(credentials);
+}
+if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
+Map map = 
ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue,
+"transfer", (DisruptorQueue) 
workerData.get("transfer-queue"));
+BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, 
userContext);
+
+Map cachedNodePortToSocket = (Map) ((Atom) 
workerData.get("cached-node+port->socket")).deref();
+
BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, 
stormConf, userContext);
+
BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.get("receiver"), 
stormConf, userContext);
+} else {
+Map map = 
ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, 
userContext);
+}
+
+IOutputCollector outputCollector = new 
BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, isEventLoggers, 
isDebug);
+boltObject.prepare(stormConf, userContext, new 
OutputCollector(outputCollector));
+}
+openOrPrepareWasCalled.set(true);
+LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet());
+setupMetrics();
+}
+
+@Override
+public Object call() throws Exception {
+while (!stormActive.get()) {
--- End diff --


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72933619
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
+
+protected final Map workerData;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final List executorId;
+protected final List taskIds;
+protected final String componentId;
+protected final AtomicBoolean openOrPrepareWasCalled;
+protected final Map stormConf;
+protected final Map conf;
+protected final String stormId;
+protected final HashMap sharedExecutorData;
+protected final AtomicBoolean stormActive;
+protected final AtomicReference> 
stormComponentDebug;
+protected final Runnable suicideFn;
+protected final IStormClusterState stormClusterState;
+protected final Map taskToComponent;
+protected CommonStats stats;
+protected final Map>> 
intervalToTaskToMetricToRegistry;
+

[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401649#comment-15401649
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72933619
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
+
+protected final Map workerData;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final List executorId;
+protected final List taskIds;
+protected final String componentId;
+protected final AtomicBoolean openOrPrepareWasCalled;
+protected final Map stormConf;
+protected final Map conf;
+protected final String stormId;
+protected final HashMap sharedExecutorData;
+protected final AtomicBoolean stormActive;
+protected final AtomicReference> 
stormComponentDebug;
+protected final Runnable suicideFn;
+protected final IStormClusterState 

[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401640#comment-15401640
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72933087
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72933087
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+Long timeDelta = null;
+if (tupleInfo.getTimestamp() != 0) {
+timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+}
+failSpoutMsg(SpoutExecutor.this, 

[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401621#comment-15401621
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72931639
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
+
+protected final Map workerData;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final List executorId;
+protected final List taskIds;
+protected final String componentId;
+protected final AtomicBoolean openOrPrepareWasCalled;
+protected final Map stormConf;
+protected final Map conf;
+protected final String stormId;
+protected final HashMap sharedExecutorData;
+protected final AtomicBoolean stormActive;
+protected final AtomicReference> 
stormComponentDebug;
+protected final Runnable suicideFn;
+protected final IStormClusterState 

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72931639
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
+
+protected final Map workerData;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final List executorId;
+protected final List taskIds;
+protected final String componentId;
+protected final AtomicBoolean openOrPrepareWasCalled;
+protected final Map stormConf;
+protected final Map conf;
+protected final String stormId;
+protected final HashMap sharedExecutorData;
+protected final AtomicBoolean stormActive;
+protected final AtomicReference> 
stormComponentDebug;
+protected final Runnable suicideFn;
+protected final IStormClusterState stormClusterState;
+protected final Map taskToComponent;
+protected CommonStats stats;
+protected final Map>> 
intervalToTaskToMetricToRegistry;
+

[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

2016-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15401611#comment-15401611
 ] 

ASF GitHub Bot commented on STORM-1277:
---

Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72930327
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
+
+private final SpoutExecutor executor;
+private final Task taskData;
+private final int taskId;
+private final MutableLong emittedCount;
+private final boolean hasAckers;
+private final Random random;
+private final Boolean isEventLoggers;
+private final Boolean isDebug;
+private final RotatingMap pending;
+
+@SuppressWarnings("unused")
+public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, 
Task taskData, int taskId,
+MutableLong emittedCount, boolean 
hasAckers, Random random,
+Boolean isEventLoggers, Boolean 
isDebug, RotatingMap pending) {
+this.executor = executor;
+this.taskData = taskData;
+this.taskId = taskId;
+this.emittedCount = emittedCount;
+this.hasAckers = hasAckers;
+this.random = random;
+this.isEventLoggers = isEventLoggers;
+this.isDebug = isDebug;
+this.pending = pending;
+}
+
+@Override
+public List emit(String streamId, List tuple, Object 
messageId) {
+return sendSpoutMsg(streamId, tuple, messageId, null);
+}
+
+@Override
+public void emitDirect(int taskId, String streamId, List 
tuple, Object messageId) {
+sendSpoutMsg(streamId, tuple, messageId, taskId);
+}
+
+@Override
+public long getPendingCount() {
+return pending.size();
+}
+
+@Override
+public void reportError(Throwable error) {
+executor.getReportError().report(error);
+}
+
+private List sendSpoutMsg(String stream, List values, 
Object messageId, Integer outTaskId) {
+emittedCount.increment();
+
+List outTasks;
+if (outTaskId != null) {
+outTasks = taskData.getOutgoingTasks(outTaskId, stream, 
values);
+} else {
+outTasks = taskData.getOutgoingTasks(stream, values);
+}
+
+List ackSeq = new ArrayList<>();
+boolean needAck = (messageId != null) && hasAckers;
+
+long rootId = MessageId.generateId(random);
+for (Integer t : outTasks) {
+MessageId msgId;
+if (needAck) {
+long as = MessageId.generateId(random);
+msgId = MessageId.makeRootId(rootId, as);
+ackSeq.add(as);
+} else {
+msgId = MessageId.makeUnanchored();
+}
+
+TupleImpl tuple = new 
TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, 
msgId);
 

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72930327
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
+
+private final SpoutExecutor executor;
+private final Task taskData;
+private final int taskId;
+private final MutableLong emittedCount;
+private final boolean hasAckers;
+private final Random random;
+private final Boolean isEventLoggers;
+private final Boolean isDebug;
+private final RotatingMap pending;
+
+@SuppressWarnings("unused")
+public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, 
Task taskData, int taskId,
+MutableLong emittedCount, boolean 
hasAckers, Random random,
+Boolean isEventLoggers, Boolean 
isDebug, RotatingMap pending) {
+this.executor = executor;
+this.taskData = taskData;
+this.taskId = taskId;
+this.emittedCount = emittedCount;
+this.hasAckers = hasAckers;
+this.random = random;
+this.isEventLoggers = isEventLoggers;
+this.isDebug = isDebug;
+this.pending = pending;
+}
+
+@Override
+public List emit(String streamId, List tuple, Object 
messageId) {
+return sendSpoutMsg(streamId, tuple, messageId, null);
+}
+
+@Override
+public void emitDirect(int taskId, String streamId, List 
tuple, Object messageId) {
+sendSpoutMsg(streamId, tuple, messageId, taskId);
+}
+
+@Override
+public long getPendingCount() {
+return pending.size();
+}
+
+@Override
+public void reportError(Throwable error) {
+executor.getReportError().report(error);
+}
+
+private List sendSpoutMsg(String stream, List values, 
Object messageId, Integer outTaskId) {
+emittedCount.increment();
+
+List outTasks;
+if (outTaskId != null) {
+outTasks = taskData.getOutgoingTasks(outTaskId, stream, 
values);
+} else {
+outTasks = taskData.getOutgoingTasks(stream, values);
+}
+
+List ackSeq = new ArrayList<>();
+boolean needAck = (messageId != null) && hasAckers;
+
+long rootId = MessageId.generateId(random);
+for (Integer t : outTasks) {
+MessageId msgId;
+if (needAck) {
+long as = MessageId.generateId(random);
+msgId = MessageId.makeRootId(rootId, as);
+ackSeq.add(as);
+} else {
+msgId = MessageId.makeUnanchored();
+}
+
+TupleImpl tuple = new 
TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, 
msgId);
+executor.getExecutorTransfer().transfer(t, tuple);
+}
+if (isEventLoggers) {
+executor.sendToEventLogger(executor, taskData, values, 
executor.getComponentId(), messageId, random);