[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-17 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45036785
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
 ---
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.storm.cassandra.context.BaseBeanFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default interface to build cassandra Cluster from the a Storm Topology 
configuration.
+ */
+public class ClusterFactory extends BaseBeanFactory {
+
+/**
+ * Creates a new Cluster based on the specified configuration.
+ * @param stormConf the storm configuration.
+ * @return a new a new {@link com.datastax.driver.core.Cluster} 
instance.
+ */
+@Override
+protected Cluster make(Map stormConf) {
+CassandraConf cassandraConf = new CassandraConf(stormConf);
+
+Cluster.Builder cluster = Cluster.builder()
+.withoutJMXReporting()
+.withoutMetrics()
+.addContactPoints(cassandraConf.getNodes())
+.withPort(cassandraConf.getPort())
+
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
--- End diff --

DefaultRetryPolicy should be used as default policy which is conservative. 
DowngradingConsistencyRetryPolicy is kind of aggressive and this should not be 
the default setting. You may want to take this policy from conf.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1075) Storm Cassandra connector

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008338#comment-15008338
 ] 

ASF GitHub Bot commented on STORM-1075:
---

Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45036785
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
 ---
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.storm.cassandra.context.BaseBeanFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default interface to build cassandra Cluster from the a Storm Topology 
configuration.
+ */
+public class ClusterFactory extends BaseBeanFactory {
+
+/**
+ * Creates a new Cluster based on the specified configuration.
+ * @param stormConf the storm configuration.
+ * @return a new a new {@link com.datastax.driver.core.Cluster} 
instance.
+ */
+@Override
+protected Cluster make(Map stormConf) {
+CassandraConf cassandraConf = new CassandraConf(stormConf);
+
+Cluster.Builder cluster = Cluster.builder()
+.withoutJMXReporting()
+.withoutMetrics()
+.addContactPoints(cassandraConf.getNodes())
+.withPort(cassandraConf.getPort())
+
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
--- End diff --

DefaultRetryPolicy should be used as default policy which is conservative. 
DowngradingConsistencyRetryPolicy is kind of aggressive and this should not be 
the default setting. You may want to take this policy from conf.


> Storm Cassandra connector
> -
>
> Key: STORM-1075
> URL: https://issues.apache.org/jira/browse/STORM-1075
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1075) Storm Cassandra connector

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008339#comment-15008339
 ] 

ASF GitHub Bot commented on STORM-1075:
---

Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45036898
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
 ---
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.storm.cassandra.context.BaseBeanFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default interface to build cassandra Cluster from the a Storm Topology 
configuration.
+ */
+public class ClusterFactory extends BaseBeanFactory {
+
+/**
+ * Creates a new Cluster based on the specified configuration.
+ * @param stormConf the storm configuration.
+ * @return a new a new {@link com.datastax.driver.core.Cluster} 
instance.
+ */
+@Override
+protected Cluster make(Map stormConf) {
+CassandraConf cassandraConf = new CassandraConf(stormConf);
+
+Cluster.Builder cluster = Cluster.builder()
+.withoutJMXReporting()
+.withoutMetrics()
+.addContactPoints(cassandraConf.getNodes())
+.withPort(cassandraConf.getPort())
+
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
+.withReconnectionPolicy(new 
ExponentialReconnectionPolicy(100L, TimeUnit.MINUTES.toMillis(1)))
--- End diff --

Better not to have hard coded timeout values. Create respective properties 
in config.


> Storm Cassandra connector
> -
>
> Key: STORM-1075
> URL: https://issues.apache.org/jira/browse/STORM-1075
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-17 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45036898
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
 ---
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.storm.cassandra.context.BaseBeanFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default interface to build cassandra Cluster from the a Storm Topology 
configuration.
+ */
+public class ClusterFactory extends BaseBeanFactory {
+
+/**
+ * Creates a new Cluster based on the specified configuration.
+ * @param stormConf the storm configuration.
+ * @return a new a new {@link com.datastax.driver.core.Cluster} 
instance.
+ */
+@Override
+protected Cluster make(Map stormConf) {
+CassandraConf cassandraConf = new CassandraConf(stormConf);
+
+Cluster.Builder cluster = Cluster.builder()
+.withoutJMXReporting()
+.withoutMetrics()
+.addContactPoints(cassandraConf.getNodes())
+.withPort(cassandraConf.getPort())
+
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
+.withReconnectionPolicy(new 
ExponentialReconnectionPolicy(100L, TimeUnit.MINUTES.toMillis(1)))
--- End diff --

Better not to have hard coded timeout values. Create respective properties 
in config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (STORM-1210) Set Output Stream id in KafkaSpout

2015-11-17 Thread Zhiqiang He (JIRA)
Zhiqiang He created STORM-1210:
--

 Summary: Set Output Stream id in KafkaSpout
 Key: STORM-1210
 URL: https://issues.apache.org/jira/browse/STORM-1210
 Project: Apache Storm
  Issue Type: New Feature
  Components: storm-kafka
Affects Versions: 0.11.0
Reporter: Zhiqiang He
Assignee: Zhiqiang He
Priority: Minor


topicAsStreamId can only set output stream id to topic name. In some case ,we 
need to set output stream id to other name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1210] Set Output Stream id in KafkaSpou...

2015-11-17 Thread Zhiqiang-He
GitHub user Zhiqiang-He opened a pull request:

https://github.com/apache/storm/pull/885

[STORM-1210] Set Output Stream id in KafkaSpout

topicAsStreamId can only set output stream id to topic name. In some case 
,we need to set output stream id to other name.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Zhiqiang-He/storm-0914-edit master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/885.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #885


commit c1a89d9597dfcbfeca260d4264a4b2e374c419be
Author: Zhiqiang He 
Date:   2015-10-21T02:09:12Z

Merge pull request #1 from apache/master

merge 1021

commit 5261160f84bcb0ef73acf4a632d421f7e2ee901d
Author: Zhiqiang He 
Date:   2015-11-17T02:57:09Z

Merge pull request #2 from apache/master

merge 1117

commit 87a4a8b4382de5617d463b23188f18e4f7e7f03c
Author: Zhiqiang-He 
Date:   2015-11-17T09:14:54Z

set output stream id in kafkaSpout




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-17 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45037103
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
 ---
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client.impl;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Simple class to wrap cassandra {@link com.datastax.driver.core.Cluster} 
instance.
+ */
+public class DefaultClient implements SimpleClient, Closeable, 
Serializable {
+
+private final static Logger LOG = 
LoggerFactory.getLogger(DefaultClient.class);
+
+private String keyspace;
+
+private Cluster cluster;
+
+private Session session;
+
+/**
+ * Create a new {@link DefaultClient} instance.
+ * 
+ * @param cluster a cassandra cluster client.
+ */
+public DefaultClient(Cluster cluster, String keyspace) {
+Preconditions.checkNotNull(cluster, "Cluster cannot be 'null");
+this.cluster = cluster;
+this.keyspace = keyspace;
+
+}
+
+public Set getAllHosts() {
+Metadata metadata = getMetadata();
+return metadata.getAllHosts();
+}
+
+public Metadata getMetadata() {
+return cluster.getMetadata();
+}
+
+
+private String getExecutorName() {
+Thread thread = Thread.currentThread();
+return thread.getName();
+}
+/**
+ * {@inheritDoc}
+ */
+@Override
+public synchronized Session connect() throws NoHostAvailableException {
+if( isDisconnected() ) {
+LOG.info(String.format("Connected to cluster: %s", 
cluster.getClusterName()));
--- End diff --

Can you replace String.format with LOG.info(String format, Object... 
arguments) as it does not build the string if the respective logging level is 
not enabled. It saves building strings unnecessarily. You may want to refactor 
it at other log usages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1210) Set Output Stream id in KafkaSpout

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008346#comment-15008346
 ] 

ASF GitHub Bot commented on STORM-1210:
---

GitHub user Zhiqiang-He opened a pull request:

https://github.com/apache/storm/pull/885

[STORM-1210] Set Output Stream id in KafkaSpout

topicAsStreamId can only set output stream id to topic name. In some case 
,we need to set output stream id to other name.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Zhiqiang-He/storm-0914-edit master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/885.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #885


commit c1a89d9597dfcbfeca260d4264a4b2e374c419be
Author: Zhiqiang He 
Date:   2015-10-21T02:09:12Z

Merge pull request #1 from apache/master

merge 1021

commit 5261160f84bcb0ef73acf4a632d421f7e2ee901d
Author: Zhiqiang He 
Date:   2015-11-17T02:57:09Z

Merge pull request #2 from apache/master

merge 1117

commit 87a4a8b4382de5617d463b23188f18e4f7e7f03c
Author: Zhiqiang-He 
Date:   2015-11-17T09:14:54Z

set output stream id in kafkaSpout




> Set Output Stream id in KafkaSpout
> --
>
> Key: STORM-1210
> URL: https://issues.apache.org/jira/browse/STORM-1210
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-kafka
>Affects Versions: 0.11.0
>Reporter: Zhiqiang He
>Assignee: Zhiqiang He
>Priority: Minor
>
> topicAsStreamId can only set output stream id to topic name. In some case ,we 
> need to set output stream id to other name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-17 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45037220
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
 ---
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import org.apache.storm.cassandra.BaseExecutionResultHandler;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.ExecutionResultHandler;
+import org.apache.storm.cassandra.client.CassandraConf;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.executor.AsyncExecutor;
+import org.apache.storm.cassandra.executor.AsyncExecutorProvider;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A base cassandra bolt.
+ *
+ * Default {@link backtype.storm.topology.base.BaseRichBolt}
+ */
+public abstract class BaseCassandraBolt extends BaseRichBolt {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BaseCassandraBolt.class);
+
+protected OutputCollector outputCollector;
+
+protected SimpleClientProvider clientProvider;
+protected SimpleClient client;
+protected Session session;
+protected Map stormConfig;
+
+protected CassandraConf cassandraConfConfig;
+
+private CQLStatementTupleMapper mapper;
+private ExecutionResultHandler resultHandler;
+
+transient private  Map outputsFields = new HashMap<>();
+
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ * @param mapper
+ */
+public BaseCassandraBolt(CQLStatementTupleMapper mapper, 
SimpleClientProvider clientProvider) {
+this.mapper = mapper;
+this.clientProvider = clientProvider;
+}
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ * @param tupleMapper
+ */
+public BaseCassandraBolt(CQLStatementTupleMapper tupleMapper) {
+this(tupleMapper, new CassandraContext());
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void prepare(Map stormConfig, TopologyContext topologyContext, 
OutputCollector outputCollector) {
+this.outputCollector = outputCollector;
+this.stormConfig = stormConfig;
+this.cassandraConfConfig = new CassandraConf(stormConfig);
+this.client = clientProvider.getClient(this.stormConfig);
+try {
+session = client.connect();
+} catch (NoHostAvailableException e) {
+outputCollector.reportError(e);
+}
+}
+
+public BaseCassandraBolt withResultHandler(ExecutionResultHandler 
resultHandler) {
+this.resultHandler = resultHandler;
+return this;
+}
+
+public BaseCassandraBolt withOutputFields(Fields fields) {
+this.outputsFields.put(null, fields);
--- End diff --

Not a good practice to pass null to a map. It seems this is passed to set 
fields for default stream. You should use 
backtype.storm.utils.Utils.DEFAULT_STREAM_ID as key instead of null.



[jira] [Commented] (STORM-1210) Set Output Stream id in KafkaSpout

2015-11-17 Thread Zhiqiang He (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008348#comment-15008348
 ] 

Zhiqiang He commented on STORM-1210:


https://github.com/apache/storm/pull/885

> Set Output Stream id in KafkaSpout
> --
>
> Key: STORM-1210
> URL: https://issues.apache.org/jira/browse/STORM-1210
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-kafka
>Affects Versions: 0.11.0
>Reporter: Zhiqiang He
>Assignee: Zhiqiang He
>Priority: Minor
>
> topicAsStreamId can only set output stream id to topic name. In some case ,we 
> need to set output stream id to other name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1075) Storm Cassandra connector

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008345#comment-15008345
 ] 

ASF GitHub Bot commented on STORM-1075:
---

Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45037103
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java
 ---
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.client.impl;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.base.Preconditions;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Simple class to wrap cassandra {@link com.datastax.driver.core.Cluster} 
instance.
+ */
+public class DefaultClient implements SimpleClient, Closeable, 
Serializable {
+
+private final static Logger LOG = 
LoggerFactory.getLogger(DefaultClient.class);
+
+private String keyspace;
+
+private Cluster cluster;
+
+private Session session;
+
+/**
+ * Create a new {@link DefaultClient} instance.
+ * 
+ * @param cluster a cassandra cluster client.
+ */
+public DefaultClient(Cluster cluster, String keyspace) {
+Preconditions.checkNotNull(cluster, "Cluster cannot be 'null");
+this.cluster = cluster;
+this.keyspace = keyspace;
+
+}
+
+public Set getAllHosts() {
+Metadata metadata = getMetadata();
+return metadata.getAllHosts();
+}
+
+public Metadata getMetadata() {
+return cluster.getMetadata();
+}
+
+
+private String getExecutorName() {
+Thread thread = Thread.currentThread();
+return thread.getName();
+}
+/**
+ * {@inheritDoc}
+ */
+@Override
+public synchronized Session connect() throws NoHostAvailableException {
+if( isDisconnected() ) {
+LOG.info(String.format("Connected to cluster: %s", 
cluster.getClusterName()));
--- End diff --

Can you replace String.format with LOG.info(String format, Object... 
arguments) as it does not build the string if the respective logging level is 
not enabled. It saves building strings unnecessarily. You may want to refactor 
it at other log usages.


> Storm Cassandra connector
> -
>
> Key: STORM-1075
> URL: https://issues.apache.org/jira/browse/STORM-1075
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1075) Storm Cassandra connector

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008352#comment-15008352
 ] 

ASF GitHub Bot commented on STORM-1075:
---

Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45037220
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
 ---
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import org.apache.storm.cassandra.BaseExecutionResultHandler;
+import org.apache.storm.cassandra.CassandraContext;
+import org.apache.storm.cassandra.ExecutionResultHandler;
+import org.apache.storm.cassandra.client.CassandraConf;
+import org.apache.storm.cassandra.client.SimpleClient;
+import org.apache.storm.cassandra.client.SimpleClientProvider;
+import org.apache.storm.cassandra.executor.AsyncExecutor;
+import org.apache.storm.cassandra.executor.AsyncExecutorProvider;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A base cassandra bolt.
+ *
+ * Default {@link backtype.storm.topology.base.BaseRichBolt}
+ */
+public abstract class BaseCassandraBolt extends BaseRichBolt {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BaseCassandraBolt.class);
+
+protected OutputCollector outputCollector;
+
+protected SimpleClientProvider clientProvider;
+protected SimpleClient client;
+protected Session session;
+protected Map stormConfig;
+
+protected CassandraConf cassandraConfConfig;
+
+private CQLStatementTupleMapper mapper;
+private ExecutionResultHandler resultHandler;
+
+transient private  Map outputsFields = new HashMap<>();
+
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ * @param mapper
+ */
+public BaseCassandraBolt(CQLStatementTupleMapper mapper, 
SimpleClientProvider clientProvider) {
+this.mapper = mapper;
+this.clientProvider = clientProvider;
+}
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ * @param tupleMapper
+ */
+public BaseCassandraBolt(CQLStatementTupleMapper tupleMapper) {
+this(tupleMapper, new CassandraContext());
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void prepare(Map stormConfig, TopologyContext topologyContext, 
OutputCollector outputCollector) {
+this.outputCollector = outputCollector;
+this.stormConfig = stormConfig;
+this.cassandraConfConfig = new CassandraConf(stormConfig);
+this.client = clientProvider.getClient(this.stormConfig);
+try {
+session = client.connect();
+} catch (NoHostAvailableException e) {
+outputCollector.reportError(e);
+}
+}
+
+public BaseCassandraBolt withResultHandler(ExecutionResultHandler 
resultHandler) {
+this.resultHandler = resultHandler;
+return this;
+}
+
+public BaseCassandraBolt withOutputFields(Fields fields) {
+

[jira] [Commented] (STORM-1075) Storm Cassandra connector

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008359#comment-15008359
 ] 

ASF GitHub Bot commented on STORM-1075:
---

Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45037454
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
 ---
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Service to asynchronously executes cassandra statements.
+ */
+public class AsyncExecutor implements Serializable {
+
+private final static Logger LOG = 
LoggerFactory.getLogger(AsyncExecutor.class);
+
+protected Session session;
+
+protected ExecutorService executorService;
+
+protected AsyncResultHandler handler;
+
+private Map, Boolean> pending = new 
ConcurrentHashMap<>( );
--- End diff --

Do we really need a map for this? AtomicInteger may be sufficient for now. 
We can add map implementation if we need it.


> Storm Cassandra connector
> -
>
> Key: STORM-1075
> URL: https://issues.apache.org/jira/browse/STORM-1075
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-17 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45037454
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
 ---
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Service to asynchronously executes cassandra statements.
+ */
+public class AsyncExecutor implements Serializable {
+
+private final static Logger LOG = 
LoggerFactory.getLogger(AsyncExecutor.class);
+
+protected Session session;
+
+protected ExecutorService executorService;
+
+protected AsyncResultHandler handler;
+
+private Map, Boolean> pending = new 
ConcurrentHashMap<>( );
--- End diff --

Do we really need a map for this? AtomicInteger may be sufficient for now. 
We can add map implementation if we need it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1075) Storm Cassandra connector

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008361#comment-15008361
 ] 

ASF GitHub Bot commented on STORM-1075:
---

Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45037532
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
 ---
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Service to asynchronously executes cassandra statements.
+ */
+public class AsyncExecutor implements Serializable {
+
+private final static Logger LOG = 
LoggerFactory.getLogger(AsyncExecutor.class);
+
+protected Session session;
+
+protected ExecutorService executorService;
+
+protected AsyncResultHandler handler;
+
+private Map, Boolean> pending = new 
ConcurrentHashMap<>( );
+
+/**
+ * Creates a new {@link AsyncExecutor} instance.
+ */
+protected AsyncExecutor(Session session, AsyncResultHandler 
handler) {
+this(session, newSingleThreadExecutor(), handler);
+}
+
+/**
+ * Creates a new {@link AsyncExecutor} instance.
+ *
+ * @param session The cassandra session.
+ * @param executorService The executor service responsible to execute 
handler.
+ */
+private AsyncExecutor(Session session, ExecutorService 
executorService, AsyncResultHandler handler) {
+this.session   = session;
+this.executorService = executorService;
+this.handler = handler;
+}
+
+protected static ExecutorService newSingleThreadExecutor() {
+return Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().setNameFormat("cassandra-async-handler-%d").build());
+}
+
+/**
+ * Asynchronously executes all statements associated to the specified 
input. The input will be passed to
+ * the {@link #handler} once all queries succeed or failed.
+ */
+public List> execAsync(List statements, 
final T input) {
+
+List> settableFutures = new 
ArrayList<>(statements.size());
+
+for(Statement s : statements)
+settableFutures.add(execAsync(s, input, 
AsyncResultHandler.NO_OP_HANDLER));
+
+ListenableFuture> allAsList = 
Futures.allAsList(settableFutures);
+Futures.addCallback(allAsList, new FutureCallback>(){
+@Override
+public void onSuccess(List inputs) {
+handler.success(input);
+}
+
+@Override
+public void onFailure(Throwable t) {
+handler.failure(t, input);
+}
+}, executorService);
+return settableFutures;
+}
+
+/**
+ * Asynchronously executes the specified batch statement. Inputs will 
be passed to
+ * the {@link #handler} once query succeed or failed.
+ */
+public SettableFuture execAsync(final Statement statement, final T 
inputs) {
+return execAsync(statement, inputs, handler);
+}
+/**
+ * Asynchronously executes the specified batch statement. Inputs will 
be passed to
+ * 

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-17 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45037532
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java
 ---
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.executor;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Service to asynchronously executes cassandra statements.
+ */
+public class AsyncExecutor implements Serializable {
+
+private final static Logger LOG = 
LoggerFactory.getLogger(AsyncExecutor.class);
+
+protected Session session;
+
+protected ExecutorService executorService;
+
+protected AsyncResultHandler handler;
+
+private Map, Boolean> pending = new 
ConcurrentHashMap<>( );
+
+/**
+ * Creates a new {@link AsyncExecutor} instance.
+ */
+protected AsyncExecutor(Session session, AsyncResultHandler 
handler) {
+this(session, newSingleThreadExecutor(), handler);
+}
+
+/**
+ * Creates a new {@link AsyncExecutor} instance.
+ *
+ * @param session The cassandra session.
+ * @param executorService The executor service responsible to execute 
handler.
+ */
+private AsyncExecutor(Session session, ExecutorService 
executorService, AsyncResultHandler handler) {
+this.session   = session;
+this.executorService = executorService;
+this.handler = handler;
+}
+
+protected static ExecutorService newSingleThreadExecutor() {
+return Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().setNameFormat("cassandra-async-handler-%d").build());
+}
+
+/**
+ * Asynchronously executes all statements associated to the specified 
input. The input will be passed to
+ * the {@link #handler} once all queries succeed or failed.
+ */
+public List> execAsync(List statements, 
final T input) {
+
+List> settableFutures = new 
ArrayList<>(statements.size());
+
+for(Statement s : statements)
+settableFutures.add(execAsync(s, input, 
AsyncResultHandler.NO_OP_HANDLER));
+
+ListenableFuture> allAsList = 
Futures.allAsList(settableFutures);
+Futures.addCallback(allAsList, new FutureCallback>(){
+@Override
+public void onSuccess(List inputs) {
+handler.success(input);
+}
+
+@Override
+public void onFailure(Throwable t) {
+handler.failure(t, input);
+}
+}, executorService);
+return settableFutures;
+}
+
+/**
+ * Asynchronously executes the specified batch statement. Inputs will 
be passed to
+ * the {@link #handler} once query succeed or failed.
+ */
+public SettableFuture execAsync(final Statement statement, final T 
inputs) {
+return execAsync(statement, inputs, handler);
+}
+/**
+ * Asynchronously executes the specified batch statement. Inputs will 
be passed to
+ * the {@link #handler} once query succeed or failed.
+ */
+public SettableFuture execAsync(final Statement statement, final T 
inputs, final AsyncResultHandler handler) {
+final SettableFuture settableFuture = SettableFuture.creat

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-17 Thread fhussonnois
Github user fhussonnois commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45039507
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
 ---
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+import backtype.storm.topology.FailedException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *
+ * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses 
Murmur3 algorithm to choose the target task of a tuple.
+ *
+ * This stream grouping may be used to optimise writes to Apache Cassandra.
+ */
+public class Murmur3StreamGrouping implements CustomStreamGrouping {
--- End diff --

@harshach, there will be no performance issue using shuffle, field or 
murmur3. This strategy should be used with the BatchCassandraBolt in order to 
group all tuples that will be written into a same partition to the same storm 
task.

In fact, to stream the tuples according to cassandra (data location) we 
need to retrieve ip addresses of each task within the CustomStreamGrouping. I 
don't know if that is possible ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1075) Storm Cassandra connector

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008392#comment-15008392
 ] 

ASF GitHub Bot commented on STORM-1075:
---

Github user fhussonnois commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45039507
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
 ---
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+import backtype.storm.topology.FailedException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *
+ * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses 
Murmur3 algorithm to choose the target task of a tuple.
+ *
+ * This stream grouping may be used to optimise writes to Apache Cassandra.
+ */
+public class Murmur3StreamGrouping implements CustomStreamGrouping {
--- End diff --

@harshach, there will be no performance issue using shuffle, field or 
murmur3. This strategy should be used with the BatchCassandraBolt in order to 
group all tuples that will be written into a same partition to the same storm 
task.

In fact, to stream the tuples according to cassandra (data location) we 
need to retrieve ip addresses of each task within the CustomStreamGrouping. I 
don't know if that is possible ?


> Storm Cassandra connector
> -
>
> Key: STORM-1075
> URL: https://issues.apache.org/jira/browse/STORM-1075
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-1211) Add trident state and query support for cassandra connector

2015-11-17 Thread Satish Duggana (JIRA)
Satish Duggana created STORM-1211:
-

 Summary: Add trident state and query support for cassandra 
connector
 Key: STORM-1211
 URL: https://issues.apache.org/jira/browse/STORM-1211
 Project: Apache Storm
  Issue Type: Improvement
Reporter: Satish Duggana
Assignee: Satish Duggana






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-1212) Add synchronous invocation support for Cassandra connector.

2015-11-17 Thread Satish Duggana (JIRA)
Satish Duggana created STORM-1212:
-

 Summary: Add synchronous invocation support for Cassandra 
connector.
 Key: STORM-1212
 URL: https://issues.apache.org/jira/browse/STORM-1212
 Project: Apache Storm
  Issue Type: Improvement
Reporter: Satish Duggana
Assignee: Satish Duggana






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1075) Storm Cassandra connector

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008431#comment-15008431
 ] 

ASF GitHub Bot commented on STORM-1075:
---

Github user satishd commented on the pull request:

https://github.com/apache/storm/pull/827#issuecomment-157326983
  
STORM-1211 and STORM-1212 are filed for trident state/query support and 
synchronous bolt support. I have partially worked on these as part of this 
JIRA. I will update those JIRAs with pull requests once I have them ready.


> Storm Cassandra connector
> -
>
> Key: STORM-1075
> URL: https://issues.apache.org/jira/browse/STORM-1075
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-17 Thread satishd
Github user satishd commented on the pull request:

https://github.com/apache/storm/pull/827#issuecomment-157326983
  
STORM-1211 and STORM-1212 are filed for trident state/query support and 
synchronous bolt support. I have partially worked on these as part of this 
JIRA. I will update those JIRAs with pull requests once I have them ready.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: add storm-id to worker log filename

2015-11-17 Thread jessicasco
GitHub user jessicasco opened a pull request:

https://github.com/apache/storm/pull/886

add storm-id to worker log filename

add storm-id to worker log filename, so we can view logs group by the 
topology-id

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jessicasco/storm master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/886.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #886


commit c4022d233d7d60b4200ee8a8c762ca7f2dea9ee6
Author: 钱龙 
Date:   2015-11-17T10:21:21Z

add storm-id to worker log filename




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-17 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45045556
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
 ---
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Time;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class BatchCassandraWriterBolt extends 
BaseCassandraBolt> {
+
+private final static Logger LOG = 
LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
+
+public static final int DEFAULT_EMIT_FREQUENCY = 2;
+
+private static final int QUEUE_MAX_SIZE = 1000;
+
+private LinkedBlockingQueue queue;
+
+private int tickFrequencyInSeconds;
+
+private long lastModifiedTimesMillis;
+
+private String componentID;
+
+private AsyncResultHandler> asyncResultHandler;
+
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
+this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
+}
+
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, 
int tickFrequencyInSeconds) {
+super(tupleMapper);
+this.tickFrequencyInSeconds = tickFrequencyInSeconds;
+}
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void prepare(Map stormConfig, TopologyContext topologyContext, 
OutputCollector outputCollector) {
+super.prepare(stormConfig, topologyContext, outputCollector);
+this.componentID = topologyContext.getThisComponentId();
+this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
--- End diff --

queue size should be configurable at bolt like withQueueSize or 
withBatchSize.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1075) Storm Cassandra connector

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008475#comment-15008475
 ] 

ASF GitHub Bot commented on STORM-1075:
---

Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45045556
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
 ---
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Time;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class BatchCassandraWriterBolt extends 
BaseCassandraBolt> {
+
+private final static Logger LOG = 
LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
+
+public static final int DEFAULT_EMIT_FREQUENCY = 2;
+
+private static final int QUEUE_MAX_SIZE = 1000;
+
+private LinkedBlockingQueue queue;
+
+private int tickFrequencyInSeconds;
+
+private long lastModifiedTimesMillis;
+
+private String componentID;
+
+private AsyncResultHandler> asyncResultHandler;
+
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
+this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
+}
+
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, 
int tickFrequencyInSeconds) {
+super(tupleMapper);
+this.tickFrequencyInSeconds = tickFrequencyInSeconds;
+}
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void prepare(Map stormConfig, TopologyContext topologyContext, 
OutputCollector outputCollector) {
+super.prepare(stormConfig, topologyContext, outputCollector);
+this.componentID = topologyContext.getThisComponentId();
+this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
--- End diff --

queue size should be configurable at bolt like withQueueSize or 
withBatchSize.


> Storm Cassandra connector
> -
>
> Key: STORM-1075
> URL: https://issues.apache.org/jira/browse/STORM-1075
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1210] Set Output Stream id in KafkaSpou...

2015-11-17 Thread wuchong
Github user wuchong commented on the pull request:

https://github.com/apache/storm/pull/885#issuecomment-157341386
  
Looks good to me.

But the condition code is a little reduplicate and verbose , I prefer to 
check only one variable to set streamId with reseting outputStreamId in 
SpoutConfig constructor.

```java
this.outputStreamId = topicAsStreamId ? topic : this.outputStreamId;
```

Not a big deal, just something trivial .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1210) Set Output Stream id in KafkaSpout

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008508#comment-15008508
 ] 

ASF GitHub Bot commented on STORM-1210:
---

Github user wuchong commented on the pull request:

https://github.com/apache/storm/pull/885#issuecomment-157341386
  
Looks good to me.

But the condition code is a little reduplicate and verbose , I prefer to 
check only one variable to set streamId with reseting outputStreamId in 
SpoutConfig constructor.

```java
this.outputStreamId = topicAsStreamId ? topic : this.outputStreamId;
```

Not a big deal, just something trivial .


> Set Output Stream id in KafkaSpout
> --
>
> Key: STORM-1210
> URL: https://issues.apache.org/jira/browse/STORM-1210
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-kafka
>Affects Versions: 0.11.0
>Reporter: Zhiqiang He
>Assignee: Zhiqiang He
>Priority: Minor
>
> topicAsStreamId can only set output stream id to topic name. In some case ,we 
> need to set output stream id to other name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1210] Set Output Stream id in KafkaSpou...

2015-11-17 Thread wuchong
Github user wuchong commented on the pull request:

https://github.com/apache/storm/pull/885#issuecomment-157342872
  
Looks good to me.

But the condition code (`topicAsStreamId` and `outputStreamId`) is a little 
reduplicate as the logic is almost the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1210) Set Output Stream id in KafkaSpout

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008514#comment-15008514
 ] 

ASF GitHub Bot commented on STORM-1210:
---

Github user wuchong commented on the pull request:

https://github.com/apache/storm/pull/885#issuecomment-157342872
  
Looks good to me.

But the condition code (`topicAsStreamId` and `outputStreamId`) is a little 
reduplicate as the logic is almost the same.


> Set Output Stream id in KafkaSpout
> --
>
> Key: STORM-1210
> URL: https://issues.apache.org/jira/browse/STORM-1210
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-kafka
>Affects Versions: 0.11.0
>Reporter: Zhiqiang He
>Assignee: Zhiqiang He
>Priority: Minor
>
> topicAsStreamId can only set output stream id to topic name. In some case ,we 
> need to set output stream id to other name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1210] Set Output Stream id in KafkaSpou...

2015-11-17 Thread Zhiqiang-He
Github user Zhiqiang-He commented on the pull request:

https://github.com/apache/storm/pull/885#issuecomment-157344649
  
@wuchong 
Yes, you are right. but for compatibility reasons, I add this argument 
based on topicAsStreamId .



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1210) Set Output Stream id in KafkaSpout

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008518#comment-15008518
 ] 

ASF GitHub Bot commented on STORM-1210:
---

Github user Zhiqiang-He commented on the pull request:

https://github.com/apache/storm/pull/885#issuecomment-157344649
  
@wuchong 
Yes, you are right. but for compatibility reasons, I add this argument 
based on topicAsStreamId .



> Set Output Stream id in KafkaSpout
> --
>
> Key: STORM-1210
> URL: https://issues.apache.org/jira/browse/STORM-1210
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-kafka
>Affects Versions: 0.11.0
>Reporter: Zhiqiang He
>Assignee: Zhiqiang He
>Priority: Minor
>
> topicAsStreamId can only set output stream id to topic name. In some case ,we 
> need to set output stream id to other name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-1213) Remove sigar binaries from source tree

2015-11-17 Thread P. Taylor Goetz (JIRA)
P. Taylor Goetz created STORM-1213:
--

 Summary: Remove sigar binaries from source tree
 Key: STORM-1213
 URL: https://issues.apache.org/jira/browse/STORM-1213
 Project: Apache Storm
  Issue Type: Bug
Affects Versions: 0.11.0
Reporter: P. Taylor Goetz
Assignee: P. Taylor Goetz


In {{external/storm-metrics}} sigar native binaries were added to the source 
tree. Since Apache releases are source-only, these binaries can't be included 
in a release.

My initial thought was just to exclude the binaries from the source 
distribution, but that would mean that distributions built from a source 
tarball would not match the convenience binaries from a release (the sigar 
native binaries would not be included.

The solution I came up with was to leverage the fact that pre-built native 
binaries are included in the sigar maven distribution 
({{sigar-x.x.x-native.jar}}) and use the maven dependency plugin to unpack them 
into place during the build, rather than check them into git. One benefit is 
that it will ensure the versions of the sigar jar and the native binaries 
match. Another is that mavens checksum/signature checking mechanism will also 
be applied.

This isn't an ideal solution since the {{sigar-x.x.x-native.jar}} only includes 
binaries for linux, OSX, and solaris (notably missing windows DLLs), whereas 
the non-maven sigar download includes support for a wider range of OSes and 
architectures.

I view this as an interim measure until we can find a better way to include the 
native binaries in the build process, rather than checking them into the source 
tree -- which would be a blocker for releasing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1213: Remove sigar binaries from source ...

2015-11-17 Thread ptgoetz
GitHub user ptgoetz opened a pull request:

https://github.com/apache/storm/pull/887

STORM-1213: Remove sigar binaries from source tree

See https://issues.apache.org/jira/browse/STORM-1213 for description.

Also removes the LICENSE entry for sigar, since I believe that is 
unnecessary since it is Apache licensed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ptgoetz/storm storm-metrics-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/887.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #887


commit 83eced3329dc62c685aa6784792506766343fd52
Author: P. Taylor Goetz 
Date:   2015-11-16T22:17:19Z

remove sigar binaries from source tree and delete unnecessary LICENSE entry




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1213) Remove sigar binaries from source tree

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008771#comment-15008771
 ] 

ASF GitHub Bot commented on STORM-1213:
---

GitHub user ptgoetz opened a pull request:

https://github.com/apache/storm/pull/887

STORM-1213: Remove sigar binaries from source tree

See https://issues.apache.org/jira/browse/STORM-1213 for description.

Also removes the LICENSE entry for sigar, since I believe that is 
unnecessary since it is Apache licensed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ptgoetz/storm storm-metrics-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/887.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #887


commit 83eced3329dc62c685aa6784792506766343fd52
Author: P. Taylor Goetz 
Date:   2015-11-16T22:17:19Z

remove sigar binaries from source tree and delete unnecessary LICENSE entry




> Remove sigar binaries from source tree
> --
>
> Key: STORM-1213
> URL: https://issues.apache.org/jira/browse/STORM-1213
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 0.11.0
>Reporter: P. Taylor Goetz
>Assignee: P. Taylor Goetz
>
> In {{external/storm-metrics}} sigar native binaries were added to the source 
> tree. Since Apache releases are source-only, these binaries can't be included 
> in a release.
> My initial thought was just to exclude the binaries from the source 
> distribution, but that would mean that distributions built from a source 
> tarball would not match the convenience binaries from a release (the sigar 
> native binaries would not be included.
> The solution I came up with was to leverage the fact that pre-built native 
> binaries are included in the sigar maven distribution 
> ({{sigar-x.x.x-native.jar}}) and use the maven dependency plugin to unpack 
> them into place during the build, rather than check them into git. One 
> benefit is that it will ensure the versions of the sigar jar and the native 
> binaries match. Another is that mavens checksum/signature checking mechanism 
> will also be applied.
> This isn't an ideal solution since the {{sigar-x.x.x-native.jar}} only 
> includes binaries for linux, OSX, and solaris (notably missing windows DLLs), 
> whereas the non-maven sigar download includes support for a wider range of 
> OSes and architectures.
> I view this as an interim measure until we can find a better way to include 
> the native binaries in the build process, rather than checking them into the 
> source tree -- which would be a blocker for releasing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-904) move storm bin commands to java and provide appropriate bindings for windows and linux

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008779#comment-15008779
 ] 

ASF GitHub Bot commented on STORM-904:
--

Github user ptgoetz commented on the pull request:

https://github.com/apache/storm/pull/662#issuecomment-157395059
  
Any update on this PR? It would seem it at least needs an upmerge and 
additional review.


> move storm bin commands to java and provide appropriate bindings for windows 
> and linux
> --
>
> Key: STORM-904
> URL: https://issues.apache.org/jira/browse/STORM-904
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Priyank Shah
>
> Currently we have python and .cmd implementation for windows. This is 
> becoming increasing difficult upkeep both versions. Lets make all the main 
> code of starting daemons etc. to java and provider wrapper scripts in shell 
> and batch for linux and windows respectively. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-904: Move bin/storm command line to java...

2015-11-17 Thread ptgoetz
Github user ptgoetz commented on the pull request:

https://github.com/apache/storm/pull/662#issuecomment-157395059
  
Any update on this PR? It would seem it at least needs an upmerge and 
additional review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1028: Eventhub spout meta data

2015-11-17 Thread ptgoetz
Github user ptgoetz commented on the pull request:

https://github.com/apache/storm/pull/651#issuecomment-157396019
  
@tandrup Any update on this? Any thoughts regarding @mooso's comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1028) Eventhub spout meta data

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008787#comment-15008787
 ] 

ASF GitHub Bot commented on STORM-1028:
---

Github user ptgoetz commented on the pull request:

https://github.com/apache/storm/pull/651#issuecomment-157396019
  
@tandrup Any update on this? Any thoughts regarding @mooso's comment?


> Eventhub spout meta data
> 
>
> Key: STORM-1028
> URL: https://issues.apache.org/jira/browse/STORM-1028
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-eventhubs, storm-kafka
>Reporter: Mads Mætzke Tandrup
>
> Event hub (and Kafka) play well into event source architectures as event 
> ingest point for later Storm processing to downstream stateful consumers.
> Advanced event stream processing, such as replaying parts of a stream, 
> requires that the downstream consumers can synchronise different "stream 
> runs" to their stateful view, which itself can be seen as an aggregation of 
> all previous events. To set up the right context for re-processing the stream 
> in a deterministic way, they need to sync their view with the incoming old 
> data. To be able to do this, they need knowledge of the event sequenceNumber 
> and partition.
> For example, if you have a bolt that calculates total_order_amount for a 
> stream of orders, and emits order tuples with the total_order_amount 
> calculated for all previous orders, replaying an order event should not 
> change total_order_amount. I.e. orders with a higher sequenceNumber than the 
> order being processed should not be included in total_order_amount.
> This synchronisation can be achieved if the bolt has access to the parition 
> and sequenceNumber from eventHub.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1075) Storm Cassandra connector

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008801#comment-15008801
 ] 

ASF GitHub Bot commented on STORM-1075:
---

Github user fhussonnois commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45070726
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
 ---
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Time;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class BatchCassandraWriterBolt extends 
BaseCassandraBolt> {
+
+private final static Logger LOG = 
LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
+
+public static final int DEFAULT_EMIT_FREQUENCY = 2;
+
+private static final int QUEUE_MAX_SIZE = 1000;
+
+private LinkedBlockingQueue queue;
+
+private int tickFrequencyInSeconds;
+
+private long lastModifiedTimesMillis;
+
+private String componentID;
+
+private AsyncResultHandler> asyncResultHandler;
+
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
+this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
+}
+
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, 
int tickFrequencyInSeconds) {
+super(tupleMapper);
+this.tickFrequencyInSeconds = tickFrequencyInSeconds;
+}
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void prepare(Map stormConfig, TopologyContext topologyContext, 
OutputCollector outputCollector) {
+super.prepare(stormConfig, topologyContext, outputCollector);
+this.componentID = topologyContext.getThisComponentId();
+this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
+this.lastModifiedTimesMillis = now();
+}
+
+@Override
+protected AsyncResultHandler> getAsyncHandler() {
+if( asyncResultHandler == null) {
+asyncResultHandler = new 
BatchAsyncResultHandler(getResultHandler());
+}
+return asyncResultHandler;
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+protected void process(Tuple input) {
+if( ! queue.offer(input) ) {
+LOG.info(logPrefix() + "The message queue is full, preparing 
batch statement...");
+prepareAndExecuteStatement();
+queue.add(input);
+}
+}
+/**
+ * {@inheritDoc}
+ */
+@Override
+protected void tick() {
--- End diff --

what names should be used ? onTickTuple, onSystemTuple... 


> Storm Cassandra connector
> -
>
> Key: STORM-1075
> URL: https://issues.apache.org/jira/browse/STORM-1075
> Project: Apache Storm
>  Issue Type: Improvement
> 

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

2015-11-17 Thread fhussonnois
Github user fhussonnois commented on a diff in the pull request:

https://github.com/apache/storm/pull/827#discussion_r45070726
  
--- Diff: 
external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
 ---
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.cassandra.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Time;
+import com.datastax.driver.core.Statement;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class BatchCassandraWriterBolt extends 
BaseCassandraBolt> {
+
+private final static Logger LOG = 
LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
+
+public static final int DEFAULT_EMIT_FREQUENCY = 2;
+
+private static final int QUEUE_MAX_SIZE = 1000;
+
+private LinkedBlockingQueue queue;
+
+private int tickFrequencyInSeconds;
+
+private long lastModifiedTimesMillis;
+
+private String componentID;
+
+private AsyncResultHandler> asyncResultHandler;
+
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
+this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
+}
+
+/**
+ * Creates a new {@link CassandraWriterBolt} instance.
+ *
+ * @param tupleMapper
+ */
+public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, 
int tickFrequencyInSeconds) {
+super(tupleMapper);
+this.tickFrequencyInSeconds = tickFrequencyInSeconds;
+}
+/**
+ * {@inheritDoc}
+ */
+@Override
+public void prepare(Map stormConfig, TopologyContext topologyContext, 
OutputCollector outputCollector) {
+super.prepare(stormConfig, topologyContext, outputCollector);
+this.componentID = topologyContext.getThisComponentId();
+this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
+this.lastModifiedTimesMillis = now();
+}
+
+@Override
+protected AsyncResultHandler> getAsyncHandler() {
+if( asyncResultHandler == null) {
+asyncResultHandler = new 
BatchAsyncResultHandler(getResultHandler());
+}
+return asyncResultHandler;
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+protected void process(Tuple input) {
+if( ! queue.offer(input) ) {
+LOG.info(logPrefix() + "The message queue is full, preparing 
batch statement...");
+prepareAndExecuteStatement();
+queue.add(input);
+}
+}
+/**
+ * {@inheritDoc}
+ */
+@Override
+protected void tick() {
--- End diff --

what names should be used ? onTickTuple, onSystemTuple... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1028: Eventhub spout meta data

2015-11-17 Thread rsltrifork
Github user rsltrifork commented on the pull request:

https://github.com/apache/storm/pull/651#issuecomment-157402566
  
Mooso is right, and his suggestion is a cleaner implementation. I suggest 
we use his implementation of IEventDataScheme and EventDataScheme. 
Mooso can you make a PR for your copy of these?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1028) Eventhub spout meta data

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008826#comment-15008826
 ] 

ASF GitHub Bot commented on STORM-1028:
---

Github user rsltrifork commented on the pull request:

https://github.com/apache/storm/pull/651#issuecomment-157402566
  
Mooso is right, and his suggestion is a cleaner implementation. I suggest 
we use his implementation of IEventDataScheme and EventDataScheme. 
Mooso can you make a PR for your copy of these?


> Eventhub spout meta data
> 
>
> Key: STORM-1028
> URL: https://issues.apache.org/jira/browse/STORM-1028
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-eventhubs, storm-kafka
>Reporter: Mads Mætzke Tandrup
>
> Event hub (and Kafka) play well into event source architectures as event 
> ingest point for later Storm processing to downstream stateful consumers.
> Advanced event stream processing, such as replaying parts of a stream, 
> requires that the downstream consumers can synchronise different "stream 
> runs" to their stateful view, which itself can be seen as an aggregation of 
> all previous events. To set up the right context for re-processing the stream 
> in a deterministic way, they need to sync their view with the incoming old 
> data. To be able to do this, they need knowledge of the event sequenceNumber 
> and partition.
> For example, if you have a bolt that calculates total_order_amount for a 
> stream of orders, and emits order tuples with the total_order_amount 
> calculated for all previous orders, replaying an order event should not 
> change total_order_amount. I.e. orders with a higher sequenceNumber than the 
> order being processed should not be included in total_order_amount.
> This synchronisation can be achieved if the bolt has access to the parition 
> and sequenceNumber from eventHub.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1213) Remove sigar binaries from source tree

2015-11-17 Thread Robert Joseph Evans (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008838#comment-15008838
 ] 

Robert Joseph Evans commented on STORM-1213:


How is having a binary different from having a gif or a png in a release?  
Especially a png generated from something else like an SVG file, which we have 
had in the past for the docs?  If you want to break windows I am not going to 
stand in your way, but it would be nice to know some more details about what 
constitutes a "source only release".

> Remove sigar binaries from source tree
> --
>
> Key: STORM-1213
> URL: https://issues.apache.org/jira/browse/STORM-1213
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 0.11.0
>Reporter: P. Taylor Goetz
>Assignee: P. Taylor Goetz
>
> In {{external/storm-metrics}} sigar native binaries were added to the source 
> tree. Since Apache releases are source-only, these binaries can't be included 
> in a release.
> My initial thought was just to exclude the binaries from the source 
> distribution, but that would mean that distributions built from a source 
> tarball would not match the convenience binaries from a release (the sigar 
> native binaries would not be included.
> The solution I came up with was to leverage the fact that pre-built native 
> binaries are included in the sigar maven distribution 
> ({{sigar-x.x.x-native.jar}}) and use the maven dependency plugin to unpack 
> them into place during the build, rather than check them into git. One 
> benefit is that it will ensure the versions of the sigar jar and the native 
> binaries match. Another is that mavens checksum/signature checking mechanism 
> will also be applied.
> This isn't an ideal solution since the {{sigar-x.x.x-native.jar}} only 
> includes binaries for linux, OSX, and solaris (notably missing windows DLLs), 
> whereas the non-maven sigar download includes support for a wider range of 
> OSes and architectures.
> I view this as an interim measure until we can find a better way to include 
> the native binaries in the build process, rather than checking them into the 
> source tree -- which would be a blocker for releasing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1016) Generate trident bolt ids with sorted group names

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008846#comment-15008846
 ] 

ASF GitHub Bot commented on STORM-1016:
---

Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/706#issuecomment-157405332
  
+1


> Generate trident bolt ids with sorted group names
> -
>
> Key: STORM-1016
> URL: https://issues.apache.org/jira/browse/STORM-1016
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Jiasheng Wang
>
> In genBoltId() method of TridentTopology, groups are stored in HashSet. 
> So everytime I submit my trident topology, I get different id for my 
> component.
> For example, this time my bolt ids are (b-0-abc, b-1-def), next time they are 
> (b-1-abc, b-0-def). 
> It makes harder for users to track their metrics, and the bolt names in UI 
> are changing too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1016: Generate trident bolt ids with sor...

2015-11-17 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/706#issuecomment-157405332
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] 1.0 Release (was Re: [DISCUSS] Initial 0.11.0 Release)

2015-11-17 Thread Bobby Evans
I have a patch that is close, but like I said on the JIRA 
https://issues.apache.org/jira/browse/STORM-1202 we are not going to be able to 
make a storm-compat.jar work.  Instead I have a binary that uses the shade code 
to rewrite the jar before it runs to match the new namespace.  I am just doing 
some cleanup on the code and testing now before I put up the pull request 
either today or tomorrow.  It too will be something that we can remove for a 
2.0 release.
 - Bobby 


 On Monday, November 16, 2015 7:23 PM, Harsha  wrote:
   

 +1 on Bobby's suggestion on moving the packages to storm-compat and have
it part of lib folder. 
Moving 1.0 with org.apache.storm will make it easier in the future
rather than wait for 2.0 release we should make 
this change now and in 2.0 we can remove the storm-compat jar.

Thanks,
Harsha


On Thu, Nov 12, 2015, at 07:15 AM, Bobby Evans wrote:
> I agree that having the old package names for a 1.0 release is a little
> odd, but I also am nervous about breaking backwards compatibility for our
> customers in a very significant way.  The upgrade for us from 0.9.x to
> 0.10.x has gone fairly smoothly.  Most customers read the announcement
> and recompiled their code against 0.10, but followed our instructions so
> that their topologies could run on both 0.9.x and 0.10.x.  Having the
> ability for a topology to run on both an old cluster and a new cluster is
> very important for us, because of failover.  If we want to minimize
> downtime we need to have the ability to fail a topology over from one
> cluster to another, usually running on the other side of the
> country/world.  For stability reasons we do not want to upgrade both
> clusters at once, so we need to have confidence that a topology can run
> on both clusters.  Maintaining two versions of a topology is a huge pain
> as well.
> Perhaps what we can do for 1.0 is to move all of the packages to
> org.apache.storm, but provide a storm-compat package that will still have
> the old user facing APIs in it, that are actually (for the most part)
> subclasses/interfaces of the org.apache versions.  I am not sure this
> will work perfectly, but I think I will give it a try.
>  - Bobby 
> 
> 
>      On Thursday, November 12, 2015 9:04 AM, Aaron. Dossett
>       wrote:
>    
> 
>  As a user/developer this sounds great.  The only item that gives me
>  pause
> is #2.  Still seeing backtype.* package names would be unexpected (for
> me)
> for a 1.0 release.  That small reservation aside, I am +1 (non-binding).
> 
> On 11/11/15, 2:45 PM, "임정택"  wrote:
> 
> >+1
> >
> >Jungtaek Lim (HeartSaVioR)
> >
> >2015-11-12 7:21 GMT+09:00 P. Taylor Goetz :
> >
> >> Changing subject in order to consolidate discussion of a 1.0 release in
> >> one thread (there was some additional discussion in the thread regarding
> >> the JStorm code merge).
> >>
> >> I just want to make sure I’m accurately capturing the sentiment of the
> >> community with regard to a 1.0 release. Please correct me if my summary
> >> seems off-base or jump in with an opinion.
> >>
> >> In summary:
> >>
> >> 1. What we have been calling “0.11.0” will become the Storm 1.0 release.
> >> 2. We will NOT be migrating package names for this release (i.e.
> >> “backtype.storm” —> “org.apache.storm”).
> >> 3. Post 1.0 release we will go into feature freeze for core
> >>functionality
> >> to facilitate the JStorm merge.
> >> 4. During the feature freeze only fixes for high priority bugs in core
> >> functionality will be accepted (no new features).
> >> 5. During the feature freeze, enhancements to “external” modules can be
> >> accepted.
> >> 6. We will stop using the “beta” flag in favor of purely numeric version
> >> numbers. Stable vs. non-stable (development) releases can be indicated
> >>on
> >> the download page.
> >>
> >> Do we all agree?
> >>
> >> -Taylor
> >>
> >> > On Nov 11, 2015, at 4:10 PM, P. Taylor Goetz 
> >>wrote:
> >> >
> >> >
> >> >> On Nov 11, 2015, at 9:28 AM, Bobby Evans
> >>
> >> wrote:
> >> >>
> >> >> I would like to see STORM-1190, STORM-1155, STORM-1198, STORM-1196,
> >> STORM-885, STORM-876, STORM-1145, STORM-831, and STORM-874 added to the
> >> list.
> >> >>
> >> >> Some of them are more important then others but they are all things I
> >> would like to see in a 0.11.0 release.
> >> >
> >> >
> >> > Cool. Thanks for listing them out. I will add them so they get
> >>tracked.
> >> >
> >> >
> >> >> On a side note I don't think the beta releases have been helpful.  I
> >> would much rather just have a 0.11.0 go to 0.11.1 ... instead of
> >> 0.11.0-beta1, 0.11.0-beta2.  To me the beta label is not that helpful,
> >>but
> >> it is not that big of a deal for me.
> >> >
> >> > In my mind, having releases tagged as “beta” were a way for us to say
> >>to
> >> users “here’s a preview release that will allow you to kick the tires on
> >> upcoming features, but be aware that there might be bugs. Let us know if
> >> you find any so we can fix them.”
> >> >
> >> > I think the i

[GitHub] storm pull request: STORM-1016: Generate trident bolt ids with sor...

2015-11-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/706


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (STORM-1016) Generate trident bolt ids with sorted group names

2015-11-17 Thread Robert Joseph Evans (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated STORM-1016:
---
Assignee: Jiasheng Wang

> Generate trident bolt ids with sorted group names
> -
>
> Key: STORM-1016
> URL: https://issues.apache.org/jira/browse/STORM-1016
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Jiasheng Wang
>Assignee: Jiasheng Wang
>
> In genBoltId() method of TridentTopology, groups are stored in HashSet. 
> So everytime I submit my trident topology, I get different id for my 
> component.
> For example, this time my bolt ids are (b-0-abc, b-1-def), next time they are 
> (b-1-abc, b-0-def). 
> It makes harder for users to track their metrics, and the bolt names in UI 
> are changing too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1016) Generate trident bolt ids with sorted group names

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008864#comment-15008864
 ] 

ASF GitHub Bot commented on STORM-1016:
---

Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/706


> Generate trident bolt ids with sorted group names
> -
>
> Key: STORM-1016
> URL: https://issues.apache.org/jira/browse/STORM-1016
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Jiasheng Wang
>
> In genBoltId() method of TridentTopology, groups are stored in HashSet. 
> So everytime I submit my trident topology, I get different id for my 
> component.
> For example, this time my bolt ids are (b-0-abc, b-1-def), next time they are 
> (b-1-abc, b-0-def). 
> It makes harder for users to track their metrics, and the bolt names in UI 
> are changing too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (STORM-1016) Generate trident bolt ids with sorted group names

2015-11-17 Thread Robert Joseph Evans (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans resolved STORM-1016.

   Resolution: Fixed
Fix Version/s: 0.11.0

Thanks [~victor-wong],

I merged this into master.  Keep up the good work.

> Generate trident bolt ids with sorted group names
> -
>
> Key: STORM-1016
> URL: https://issues.apache.org/jira/browse/STORM-1016
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Jiasheng Wang
>Assignee: Jiasheng Wang
> Fix For: 0.11.0
>
>
> In genBoltId() method of TridentTopology, groups are stored in HashSet. 
> So everytime I submit my trident topology, I get different id for my 
> component.
> For example, this time my bolt ids are (b-0-abc, b-1-def), next time they are 
> (b-1-abc, b-0-def). 
> It makes harder for users to track their metrics, and the bolt names in UI 
> are changing too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1208] Guard against NPE, and avoid usin...

2015-11-17 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/881#discussion_r45077073
  
--- Diff: 
storm-core/src/jvm/backtype/storm/metric/internal/LatencyStatAndMetric.java ---
@@ -242,7 +248,7 @@ private synchronized void rotate(long lat, long count, 
long timeSpent, long targ
 totalCount += countBuckets[i];
 timeNeeded -= bucketTime[i];
 }
-return ((double)totalLat)/totalCount;
+return totalCount > 0 ? ((double)totalLat)/totalCount : 0.0;
--- End diff --

Yes we should do this.

In fact, while reading the documentation again, it seems we must guard 
against both values of `NaN` and `Infinity`.

```
user=> (/ 0. 0.)
NaN
user=> (/ 1. 0.)
Infinity
```

We have only seen the NaN case recently, but it would not hurt to handle 
both cases, since the dividend and divisor could be 0 independently, especially 
since the metrics are sampled independently.


I will update the code to refactor and to check both.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1208) UI: NPE seen when aggregating bolt streams stats

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008882#comment-15008882
 ] 

ASF GitHub Bot commented on STORM-1208:
---

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/881#discussion_r45077073
  
--- Diff: 
storm-core/src/jvm/backtype/storm/metric/internal/LatencyStatAndMetric.java ---
@@ -242,7 +248,7 @@ private synchronized void rotate(long lat, long count, 
long timeSpent, long targ
 totalCount += countBuckets[i];
 timeNeeded -= bucketTime[i];
 }
-return ((double)totalLat)/totalCount;
+return totalCount > 0 ? ((double)totalLat)/totalCount : 0.0;
--- End diff --

Yes we should do this.

In fact, while reading the documentation again, it seems we must guard 
against both values of `NaN` and `Infinity`.

```
user=> (/ 0. 0.)
NaN
user=> (/ 1. 0.)
Infinity
```

We have only seen the NaN case recently, but it would not hurt to handle 
both cases, since the dividend and divisor could be 0 independently, especially 
since the metrics are sampled independently.


I will update the code to refactor and to check both.



> UI: NPE seen when aggregating bolt streams stats
> 
>
> Key: STORM-1208
> URL: https://issues.apache.org/jira/browse/STORM-1208
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Affects Versions: 0.11.0
>Reporter: Derek Dagit
>Assignee: Derek Dagit
>
> A stack trace is seen on the UI via its thrift connection to nimbus.
> On nimbus, a stack trace similar to the following is seen:
> {noformat}
> 2015-11-09 19:26:48.921 o.a.t.s.TThreadPoolServer [ERROR] Error occurred 
> during processing of message.
> java.lang.NullPointerException
> at 
> backtype.storm.stats$agg_bolt_streams_lat_and_count$iter__2219__2223$fn__2224.invoke(stats.clj:346)
>  ~[storm-core-0.10.1.jar:0.10.1]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.6.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.6.0.jar:?]
> at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.6.0.jar:?]
> at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.6.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) 
> ~[clojure-1.6.0.jar:?]
> at clojure.core.protocols$fn__6078.invoke(protocols.clj:54) 
> ~[clojure-1.6.0.jar:?]
> at 
> clojure.core.protocols$fn__6031$G__6026__6044.invoke(protocols.clj:13) 
> ~[clojure-1.6.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6289) ~[clojure-1.6.0.jar:?]
> at clojure.core$into.invoke(core.clj:6341) ~[clojure-1.6.0.jar:?]
> at 
> backtype.storm.stats$agg_bolt_streams_lat_and_count.invoke(stats.clj:344) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at 
> backtype.storm.stats$agg_pre_merge_comp_page_bolt.invoke(stats.clj:439) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at backtype.storm.stats$fn__2578.invoke(stats.clj:1093) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at clojure.lang.MultiFn.invoke(MultiFn.java:241) 
> ~[clojure-1.6.0.jar:?]
> at clojure.lang.AFn.applyToHelper(AFn.java:165) ~[clojure-1.6.0.jar:?]
> at clojure.lang.AFn.applyTo(AFn.java:144) ~[clojure-1.6.0.jar:?]
> at clojure.core$apply.invoke(core.clj:628) ~[clojure-1.6.0.jar:?]
> at clojure.core$partial$fn__4230.doInvoke(core.clj:2470) 
> ~[clojure-1.6.0.jar:?]
> at clojure.lang.RestFn.invoke(RestFn.java:421) ~[clojure-1.6.0.jar:?]
> at clojure.core.protocols$fn__6086.invoke(protocols.clj:143) 
> ~[clojure-1.6.0.jar:?]
> at 
> clojure.core.protocols$fn__6057$G__6052__6066.invoke(protocols.clj:19) 
> ~[clojure-1.6.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) 
> ~[clojure-1.6.0.jar:?]
> at clojure.core.protocols$fn__6078.invoke(protocols.clj:54) 
> ~[clojure-1.6.0.jar:?]
> at 
> clojure.core.protocols$fn__6031$G__6026__6044.invoke(protocols.clj:13) 
> ~[clojure-1.6.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6289) ~[clojure-1.6.0.jar:?]
> at 
> backtype.storm.stats$aggregate_comp_stats_STAR_.invoke(stats.clj:1106) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at clojure.lang.AFn.applyToHelper(AFn.java:165) ~[clojure-1.6.0.jar:?]
> at clojure.lang.AFn.applyTo(AFn.java:144) ~[clojure-1.6.0.jar:?]
> at clojure.core$apply.invoke(core.clj:624) ~[clojure-1.6.0.jar:?]
> at backtype.storm.stats$fn__2589.doInvoke(stats.clj:1127) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at clojure.lang.RestFn.invoke(RestFn.java:436) ~[clojure-1.6.0.jar:?]
> at clojure.lang.MultiFn.invoke(MultiFn.java:236) 
>

[jira] [Commented] (STORM-1213) Remove sigar binaries from source tree

2015-11-17 Thread P. Taylor Goetz (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008912#comment-15008912
 ] 

P. Taylor Goetz commented on STORM-1213:


A compiled binary is by definition not open source, and is very different from 
an image file (which doesn't require compilation). This is why the RAT (Release 
Audit Tool) will flag binaries, but not fail the build -- RAT can't distinguish 
between an image file and a compiled binary.

See http://incubator.apache.org/guides/releasemanagement.html#check-list : 
"Release consists of source code only, no binaries." ... "This package may not 
contain compiled components (such as "jar" files) because compiled components 
are not open source, even if they were built from open source."

also http://www.apache.org/dev/release.html#what-must-every-release-contain : 
"It is also necessary for the PMC to ensure that the source package is 
sufficient to build any binary artifacts associated with the release."

This should be as second nature to us as checking for Apache license headers in 
new source files.

And I don't _want to break windows_. I want to be able to create a release that 
complies with ASF policy, which we can't if we include those binaries. As I 
mentioned in the original description, this is an interim solution (not ideal) 
that allows us to release. I hope we can find a better way.

> Remove sigar binaries from source tree
> --
>
> Key: STORM-1213
> URL: https://issues.apache.org/jira/browse/STORM-1213
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 0.11.0
>Reporter: P. Taylor Goetz
>Assignee: P. Taylor Goetz
>
> In {{external/storm-metrics}} sigar native binaries were added to the source 
> tree. Since Apache releases are source-only, these binaries can't be included 
> in a release.
> My initial thought was just to exclude the binaries from the source 
> distribution, but that would mean that distributions built from a source 
> tarball would not match the convenience binaries from a release (the sigar 
> native binaries would not be included.
> The solution I came up with was to leverage the fact that pre-built native 
> binaries are included in the sigar maven distribution 
> ({{sigar-x.x.x-native.jar}}) and use the maven dependency plugin to unpack 
> them into place during the build, rather than check them into git. One 
> benefit is that it will ensure the versions of the sigar jar and the native 
> binaries match. Another is that mavens checksum/signature checking mechanism 
> will also be applied.
> This isn't an ideal solution since the {{sigar-x.x.x-native.jar}} only 
> includes binaries for linux, OSX, and solaris (notably missing windows DLLs), 
> whereas the non-maven sigar download includes support for a wider range of 
> OSes and architectures.
> I view this as an interim measure until we can find a better way to include 
> the native binaries in the build process, rather than checking them into the 
> source tree -- which would be a blocker for releasing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1157] Adding dynamic profiling for work...

2015-11-17 Thread kishorvpatil
Github user kishorvpatil commented on a diff in the pull request:

https://github.com/apache/storm/pull/842#discussion_r45083275
  
--- Diff: storm-core/src/clj/backtype/storm/ui/core.clj ---
@@ -986,6 +1021,127 @@
 (log-message "Setting topology " id " log config " new-log-config)
 (.setLogConfig nimbus id new-log-config)
 (json-response (log-config id) (m "callback")
+
+  (GET "/api/v1/topology/:id/profiling/start/:host-port/:timeout"
+   [:as {:keys [servlet-request]} id host-port timeout & m]
+   (thrift/with-configured-nimbus-connection nimbus
+ (let [user (.getUserName http-creds-handler servlet-request)
+   topology-conf (from-json
+  (.getTopologyConf ^Nimbus$Client nimbus id))]
+   (assert-authorized-user "setWorkerProfiler" (topology-config 
id))
+   (assert-authorized-profiler-action "start"))
+
+ (let [[host, port] (split host-port #":")
+   nodeinfo (NodeInfo. host (set [(Long. port)]))
+   timestamp (+ (System/currentTimeMillis) (* 6 (Long. 
timeout)))
+   request (ProfileRequest. nodeinfo
+ProfileAction/JPROFILE_STOP)]
+   (.set_time_stamp request timestamp)
+   (.setWorkerProfiler nimbus id request)
+   (json-response {"status" "ok"
+   "id" host-port
+   "timeout" timeout
+   "dumplink" (worker-dump-link
+   host
+   port
+   id)}
+  (m "callback")
+
+  (GET "/api/v1/topology/:id/profiling/stop/:host-port"
--- End diff --

As rest of the communication between nimbus and supervisor for assignments 
- going through ZK answers all Thread concurrency issues. As user ProfileAction 
is written to ZK by nimbus ( which performs 2 phase commit). The supervisor can 
read old value or new depending on whether it reads is before or after the 
commit phase.. The supervisor reads it every 30 seconds. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1157) Dynamic Worker Profiler - jmap, jstack, profiling and restarting worker

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008976#comment-15008976
 ] 

ASF GitHub Bot commented on STORM-1157:
---

Github user kishorvpatil commented on a diff in the pull request:

https://github.com/apache/storm/pull/842#discussion_r45083275
  
--- Diff: storm-core/src/clj/backtype/storm/ui/core.clj ---
@@ -986,6 +1021,127 @@
 (log-message "Setting topology " id " log config " new-log-config)
 (.setLogConfig nimbus id new-log-config)
 (json-response (log-config id) (m "callback")
+
+  (GET "/api/v1/topology/:id/profiling/start/:host-port/:timeout"
+   [:as {:keys [servlet-request]} id host-port timeout & m]
+   (thrift/with-configured-nimbus-connection nimbus
+ (let [user (.getUserName http-creds-handler servlet-request)
+   topology-conf (from-json
+  (.getTopologyConf ^Nimbus$Client nimbus id))]
+   (assert-authorized-user "setWorkerProfiler" (topology-config 
id))
+   (assert-authorized-profiler-action "start"))
+
+ (let [[host, port] (split host-port #":")
+   nodeinfo (NodeInfo. host (set [(Long. port)]))
+   timestamp (+ (System/currentTimeMillis) (* 6 (Long. 
timeout)))
+   request (ProfileRequest. nodeinfo
+ProfileAction/JPROFILE_STOP)]
+   (.set_time_stamp request timestamp)
+   (.setWorkerProfiler nimbus id request)
+   (json-response {"status" "ok"
+   "id" host-port
+   "timeout" timeout
+   "dumplink" (worker-dump-link
+   host
+   port
+   id)}
+  (m "callback")
+
+  (GET "/api/v1/topology/:id/profiling/stop/:host-port"
--- End diff --

As rest of the communication between nimbus and supervisor for assignments 
- going through ZK answers all Thread concurrency issues. As user ProfileAction 
is written to ZK by nimbus ( which performs 2 phase commit). The supervisor can 
read old value or new depending on whether it reads is before or after the 
commit phase.. The supervisor reads it every 30 seconds. 


> Dynamic Worker Profiler - jmap, jstack, profiling and restarting worker
> ---
>
> Key: STORM-1157
> URL: https://issues.apache.org/jira/browse/STORM-1157
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Kishor Patil
>Assignee: Kishor Patil
> Fix For: 0.11.0
>
>
> In multi-tenant mode, storm launches long-running JVMs across cluster without 
> sudo access to user. Self-serving of Java heap-dumps, jstacks and java 
> profiling of these JVMs would improve users' ability to analyze and debug 
> issues when monitoring it actively.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1213) Remove sigar binaries from source tree

2015-11-17 Thread Robert Joseph Evans (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008977#comment-15008977
 ] 

Robert Joseph Evans commented on STORM-1213:


[~ptgoetz],

Sorry if this came off badly.  Looking at the guidelines I see you are 100% 
correct.

> Remove sigar binaries from source tree
> --
>
> Key: STORM-1213
> URL: https://issues.apache.org/jira/browse/STORM-1213
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 0.11.0
>Reporter: P. Taylor Goetz
>Assignee: P. Taylor Goetz
>
> In {{external/storm-metrics}} sigar native binaries were added to the source 
> tree. Since Apache releases are source-only, these binaries can't be included 
> in a release.
> My initial thought was just to exclude the binaries from the source 
> distribution, but that would mean that distributions built from a source 
> tarball would not match the convenience binaries from a release (the sigar 
> native binaries would not be included.
> The solution I came up with was to leverage the fact that pre-built native 
> binaries are included in the sigar maven distribution 
> ({{sigar-x.x.x-native.jar}}) and use the maven dependency plugin to unpack 
> them into place during the build, rather than check them into git. One 
> benefit is that it will ensure the versions of the sigar jar and the native 
> binaries match. Another is that mavens checksum/signature checking mechanism 
> will also be applied.
> This isn't an ideal solution since the {{sigar-x.x.x-native.jar}} only 
> includes binaries for linux, OSX, and solaris (notably missing windows DLLs), 
> whereas the non-maven sigar download includes support for a wider range of 
> OSes and architectures.
> I view this as an interim measure until we can find a better way to include 
> the native binaries in the build process, rather than checking them into the 
> source tree -- which would be a blocker for releasing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-126] Add Lifecycle support API for work...

2015-11-17 Thread schonfeld
Github user schonfeld commented on the pull request:

https://github.com/apache/storm/pull/884#issuecomment-157439890
  
Added some tests around `ThriftTopologyUtils`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1208] Guard against NPE, and avoid usin...

2015-11-17 Thread d2r
Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/881#issuecomment-157440029
  
@hustfxj , @kishorvpatil , @zhuoliu , @redsanket,

Added checking for `Infinity` plus some refactoring as per @hustfxj's 
review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1208) UI: NPE seen when aggregating bolt streams stats

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009053#comment-15009053
 ] 

ASF GitHub Bot commented on STORM-1208:
---

Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/881#issuecomment-157440029
  
@hustfxj , @kishorvpatil , @zhuoliu , @redsanket,

Added checking for `Infinity` plus some refactoring as per @hustfxj's 
review.


> UI: NPE seen when aggregating bolt streams stats
> 
>
> Key: STORM-1208
> URL: https://issues.apache.org/jira/browse/STORM-1208
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Affects Versions: 0.11.0
>Reporter: Derek Dagit
>Assignee: Derek Dagit
>
> A stack trace is seen on the UI via its thrift connection to nimbus.
> On nimbus, a stack trace similar to the following is seen:
> {noformat}
> 2015-11-09 19:26:48.921 o.a.t.s.TThreadPoolServer [ERROR] Error occurred 
> during processing of message.
> java.lang.NullPointerException
> at 
> backtype.storm.stats$agg_bolt_streams_lat_and_count$iter__2219__2223$fn__2224.invoke(stats.clj:346)
>  ~[storm-core-0.10.1.jar:0.10.1]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.6.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.6.0.jar:?]
> at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.6.0.jar:?]
> at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.6.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) 
> ~[clojure-1.6.0.jar:?]
> at clojure.core.protocols$fn__6078.invoke(protocols.clj:54) 
> ~[clojure-1.6.0.jar:?]
> at 
> clojure.core.protocols$fn__6031$G__6026__6044.invoke(protocols.clj:13) 
> ~[clojure-1.6.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6289) ~[clojure-1.6.0.jar:?]
> at clojure.core$into.invoke(core.clj:6341) ~[clojure-1.6.0.jar:?]
> at 
> backtype.storm.stats$agg_bolt_streams_lat_and_count.invoke(stats.clj:344) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at 
> backtype.storm.stats$agg_pre_merge_comp_page_bolt.invoke(stats.clj:439) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at backtype.storm.stats$fn__2578.invoke(stats.clj:1093) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at clojure.lang.MultiFn.invoke(MultiFn.java:241) 
> ~[clojure-1.6.0.jar:?]
> at clojure.lang.AFn.applyToHelper(AFn.java:165) ~[clojure-1.6.0.jar:?]
> at clojure.lang.AFn.applyTo(AFn.java:144) ~[clojure-1.6.0.jar:?]
> at clojure.core$apply.invoke(core.clj:628) ~[clojure-1.6.0.jar:?]
> at clojure.core$partial$fn__4230.doInvoke(core.clj:2470) 
> ~[clojure-1.6.0.jar:?]
> at clojure.lang.RestFn.invoke(RestFn.java:421) ~[clojure-1.6.0.jar:?]
> at clojure.core.protocols$fn__6086.invoke(protocols.clj:143) 
> ~[clojure-1.6.0.jar:?]
> at 
> clojure.core.protocols$fn__6057$G__6052__6066.invoke(protocols.clj:19) 
> ~[clojure-1.6.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) 
> ~[clojure-1.6.0.jar:?]
> at clojure.core.protocols$fn__6078.invoke(protocols.clj:54) 
> ~[clojure-1.6.0.jar:?]
> at 
> clojure.core.protocols$fn__6031$G__6026__6044.invoke(protocols.clj:13) 
> ~[clojure-1.6.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6289) ~[clojure-1.6.0.jar:?]
> at 
> backtype.storm.stats$aggregate_comp_stats_STAR_.invoke(stats.clj:1106) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at clojure.lang.AFn.applyToHelper(AFn.java:165) ~[clojure-1.6.0.jar:?]
> at clojure.lang.AFn.applyTo(AFn.java:144) ~[clojure-1.6.0.jar:?]
> at clojure.core$apply.invoke(core.clj:624) ~[clojure-1.6.0.jar:?]
> at backtype.storm.stats$fn__2589.doInvoke(stats.clj:1127) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at clojure.lang.RestFn.invoke(RestFn.java:436) ~[clojure-1.6.0.jar:?]
> at clojure.lang.MultiFn.invoke(MultiFn.java:236) 
> ~[clojure-1.6.0.jar:?]
> at backtype.storm.stats$agg_comp_execs_stats.invoke(stats.clj:1303) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at 
> backtype.storm.daemon.nimbus$fn__5893$exec_fn__1502__auto__$reify__5917.getComponentPageInfo(nimbus.clj:1715)
>  ~[storm-core-0.10.1.jar:0.10.1]
> at 
> backtype.storm.generated.Nimbus$Processor$getComponentPageInfo.getResult(Nimbus.java:3677)
>  ~[storm-core-0.10.1.jar:0.10.1]
> at 
> backtype.storm.generated.Nimbus$Processor$getComponentPageInfo.getResult(Nimbus.java:3661)
>  ~[storm-core-0.10.1.jar:0.10.1]
> at 
> org.apache.thrift7.ProcessFunction.process(ProcessFunction.java:39) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at org.apache.thrift7.TBaseProcessor.process(TBaseProcessor.java:39) 
> ~[storm-core-0.10.1.jar:0.10.1]
> at 
> backtype.s

[jira] [Commented] (STORM-126) Add Lifecycle support API for worker nodes

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009050#comment-15009050
 ] 

ASF GitHub Bot commented on STORM-126:
--

Github user schonfeld commented on the pull request:

https://github.com/apache/storm/pull/884#issuecomment-157439890
  
Added some tests around `ThriftTopologyUtils`


> Add Lifecycle support API for worker nodes
> --
>
> Key: STORM-126
> URL: https://issues.apache.org/jira/browse/STORM-126
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: James Xu
>Assignee: Michael Schonfeld
>Priority: Minor
> Fix For: 0.11.0
>
>
> https://github.com/nathanmarz/storm/issues/155
> Storm is already used in variety of environments. It is important that Storm 
> provides some form of "lifecycle" API specified at Topology builder level to 
> be called when worker nodes start and stop.
> It is a very crucial functional piece that is missing from Storm. Many 
> project have to integrate, for example, with various container-like 
> frameworks like Spring or Google Guice that need to be started at stopped in 
> a controlled fashion before worker nodes begin or finish their work.
> I think something like a WorkerContextListener interface with two methods: 
> onStartup(SomeContextClass) and onShutdown(SomeContextClass) could go a very 
> long way for allowing to user's to plugin various third-party libraries 
> easily.
> Then, the TopologyBuilder needs to be modified to accept classes that 
> implement this interface.
> SomeContextClass does not need to be much more than a Map for now. But it is 
> important to have it as it allows propagation ofl information between those 
> lifecycle context listeners.
> Nathan, it would interesting to hear your opinion.
> Thanks!
> --
> nathanmarz: I agree, this should be added to Storm. The lifecycle methods 
> should be parameterized with which tasks are running in this worker.
> Additionally, I think lifecycle methods should be added for bolt/spouts in 
> the context of workers. Sometimes there's some code you want to run for a 
> spout/bolt within a worker only one time, regardless of how many tasks for 
> that bolt are within the worker. Then individual tasks should be able to 
> access that "global state" within the worker for that spout/bolt.
> --
> kyrill007: Thank you, Nathan, I think it would be relatively simple to 
> implement and would have big impact. Now we're forced to manage container 
> initializations through lazy static fields. You'd love to see that code. :-)
> --
> nathanmarz: Yup, this should be fairly easy to implement. I encourage you to 
> submit a patch for this.
> --
> kyrill007: Oh, well, my Clojure is unfortunately too weak for this kind of 
> work. But I am working on it... Any pointers as to where in Storm code 
> workers are started and stopped?
> --
> nathanmarz: Here's the function that's called to start a worker: 
> https://github.com/nathanmarz/storm/blob/master/src/clj/backtype/storm/daemon/worker.clj#L315
> And here's the code in the same file that shuts down a worker:
> https://github.com/nathanmarz/storm/blob/master/src/clj/backtype/storm/daemon/worker.clj#L352
> I think the interface for the lifecycle stuff should look something like this:
> interface WorkerHook extends Serializable {
> void start(Map conf, TopologyContext context, List taskIds);
> void shutdown();
> }
> You'll need to add a definition for worker hooks into the topology definition 
> Thrift structure:
> https://github.com/nathanmarz/storm/blob/master/src/storm.thrift#L91
> I think for the first go it's ok to make this a Java-only feature by adding 
> something like "4: list worker_hooks" to the StormTopology structure 
> (where the "binary" refers to a Java-serialized object).
> Then TopologyBuilder can have a simple "addWorkerHook" method that will 
> serialize the object and add it to the Thrift struct.
> --
> danehammer: I've started working on this. I've followed Nathan's proposed 
> design, but I keep hitting snags with calls to ThriftTopologyUtils, now that 
> there is an optional list on StormTopology.
> I would like to add some unit tests for what I change there, would it make 
> more sense for those to be in Java instead of Clojure? If so, are there any 
> strong preferences on what dependencies I add and how I go about adding Java 
> unit tests to storm-core?
> --
> nathanmarz: No... unit tests should remain in Clojure. You can run Java code 
> in Clojure very easily. Here's a good example of testing Java code in 
> Clojure: 
> https://github.com/nathanmarz/storm/blob/master/storm-core/test/clj/backtype/storm/security/auth/AuthUtils_test.clj
> --
> danehammer: For this design

[jira] [Created] (STORM-1214) Trident API Improvements

2015-11-17 Thread P. Taylor Goetz (JIRA)
P. Taylor Goetz created STORM-1214:
--

 Summary: Trident API Improvements
 Key: STORM-1214
 URL: https://issues.apache.org/jira/browse/STORM-1214
 Project: Apache Storm
  Issue Type: Bug
Reporter: P. Taylor Goetz
Assignee: P. Taylor Goetz


There are a few idiosyncrasies in the Trident API that can sometimes trip 
developers up (e.g. when and how to set the parallelism of components). There 
are also a few areas where the API could be made slightly more intuitive (e.g. 
add Java 8 streams-like methods like {{filter()}}, {{map()}}, {{flatMap()}}, 
etc.).

Some of these concerns can be addressed through documentation, and some by 
altering the API. Since we are approaching a 1.0 release, it would be good to 
address any API changes before a major release.

The goad of this JIRA is to identify specific areas of improvement and 
formulate an implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1214) Trident API Improvements

2015-11-17 Thread Suresh Srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009169#comment-15009169
 ] 

Suresh Srinivas commented on STORM-1214:


This is also the appropriate time to add javadoc to these APIs. 

> Trident API Improvements
> 
>
> Key: STORM-1214
> URL: https://issues.apache.org/jira/browse/STORM-1214
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: P. Taylor Goetz
>Assignee: P. Taylor Goetz
>
> There are a few idiosyncrasies in the Trident API that can sometimes trip 
> developers up (e.g. when and how to set the parallelism of components). There 
> are also a few areas where the API could be made slightly more intuitive 
> (e.g. add Java 8 streams-like methods like {{filter()}}, {{map()}}, 
> {{flatMap()}}, etc.).
> Some of these concerns can be addressed through documentation, and some by 
> altering the API. Since we are approaching a 1.0 release, it would be good to 
> address any API changes before a major release.
> The goad of this JIRA is to identify specific areas of improvement and 
> formulate an implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (STORM-1214) Trident API Improvements

2015-11-17 Thread P. Taylor Goetz (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

P. Taylor Goetz updated STORM-1214:
---
Description: 
There are a few idiosyncrasies in the Trident API that can sometimes trip 
developers up (e.g. when and how to set the parallelism of components). There 
are also a few areas where the API could be made slightly more intuitive (e.g. 
add Java 8 streams-like methods like {{filter()}}, {{map()}}, {{flatMap()}}, 
etc.).

Some of these concerns can be addressed through documentation, and some by 
altering the API. Since we are approaching a 1.0 release, it would be good to 
address any API changes before a major release.

The goad of this JIRA is to identify specific areas of improvement and 
formulate an implementation that addresses them.

  was:
There are a few idiosyncrasies in the Trident API that can sometimes trip 
developers up (e.g. when and how to set the parallelism of components). There 
are also a few areas where the API could be made slightly more intuitive (e.g. 
add Java 8 streams-like methods like {{filter()}}, {{map()}}, {{flatMap()}}, 
etc.).

Some of these concerns can be addressed through documentation, and some by 
altering the API. Since we are approaching a 1.0 release, it would be good to 
address any API changes before a major release.

The goad of this JIRA is to identify specific areas of improvement and 
formulate an implementation.


> Trident API Improvements
> 
>
> Key: STORM-1214
> URL: https://issues.apache.org/jira/browse/STORM-1214
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: P. Taylor Goetz
>Assignee: P. Taylor Goetz
>
> There are a few idiosyncrasies in the Trident API that can sometimes trip 
> developers up (e.g. when and how to set the parallelism of components). There 
> are also a few areas where the API could be made slightly more intuitive 
> (e.g. add Java 8 streams-like methods like {{filter()}}, {{map()}}, 
> {{flatMap()}}, etc.).
> Some of these concerns can be addressed through documentation, and some by 
> altering the API. Since we are approaching a 1.0 release, it would be good to 
> address any API changes before a major release.
> The goad of this JIRA is to identify specific areas of improvement and 
> formulate an implementation that addresses them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1214) Trident API Improvements

2015-11-17 Thread P. Taylor Goetz (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009187#comment-15009187
 ] 

P. Taylor Goetz commented on STORM-1214:


[~sureshms] Yes. I should have called that out explicitly. I assumed javadoc to 
be part of the documentation.

> Trident API Improvements
> 
>
> Key: STORM-1214
> URL: https://issues.apache.org/jira/browse/STORM-1214
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: P. Taylor Goetz
>Assignee: P. Taylor Goetz
>
> There are a few idiosyncrasies in the Trident API that can sometimes trip 
> developers up (e.g. when and how to set the parallelism of components). There 
> are also a few areas where the API could be made slightly more intuitive 
> (e.g. add Java 8 streams-like methods like {{filter()}}, {{map()}}, 
> {{flatMap()}}, etc.).
> Some of these concerns can be addressed through documentation, and some by 
> altering the API. Since we are approaching a 1.0 release, it would be good to 
> address any API changes before a major release.
> The goad of this JIRA is to identify specific areas of improvement and 
> formulate an implementation that addresses them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-1215) Use Async Loggers to avoid locking and logging overhead

2015-11-17 Thread Kishor Patil (JIRA)
Kishor Patil created STORM-1215:
---

 Summary: Use Async Loggers to avoid locking  and logging overhead
 Key: STORM-1215
 URL: https://issues.apache.org/jira/browse/STORM-1215
 Project: Apache Storm
  Issue Type: Improvement
  Components: storm-core
Reporter: Kishor Patil
Assignee: Kishor Patil


The loggers are synchronous with immediateFlush to disk, making some of the 
daemons slow down. In  some other cases, nimbus is slow too with submit-lock.
Making loggers asynchronous with no necessity to write to disk on every logger 
event would improve cpu resource usage for logging.

{code}
"pool-7-thread-986" #1025 prio=5 os_prio=0 tid=0x7f0f9628c800 nid=0x1b84 
runnable [0x7f0f0fa2a000]
   java.lang.Thread.State: RUNNABLE
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at 
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
- locked <0x0003c00ae520> (a java.io.BufferedOutputStream)
at java.io.PrintStream.write(PrintStream.java:482)
- locked <0x0003c00ae500> (a java.io.PrintStream)
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
at sun.nio.cs.StreamEncoder.flushBuffer(StreamEncoder.java:104)
- locked <0x0003c00ae640> (a java.io.OutputStreamWriter)
at java.io.OutputStreamWriter.flushBuffer(OutputStreamWriter.java:185)
at java.io.PrintStream.write(PrintStream.java:527)
- locked <0x0003c00ae500> (a java.io.PrintStream)
at java.io.PrintStream.print(PrintStream.java:669)
at java.io.PrintStream.println(PrintStream.java:806)
- locked <0x0003c00ae500> (a java.io.PrintStream)
at 
org.apache.logging.log4j.status.StatusConsoleListener.log(StatusConsoleListener.java:81)
at 
org.apache.logging.log4j.status.StatusLogger.logMessage(StatusLogger.java:218)
at 
org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:727)
at 
org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:716)
at 
org.apache.logging.log4j.spi.AbstractLogger.error(AbstractLogger.java:344)
at 
org.apache.logging.log4j.core.appender.DefaultErrorHandler.error(DefaultErrorHandler.java:59)
at 
org.apache.logging.log4j.core.appender.AbstractAppender.error(AbstractAppender.java:86)
at 
org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.append(AbstractOutputStreamAppender.java:116)
at 
org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:99)
at 
org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:430)
at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:409)
at 
org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:367)
at org.apache.logging.log4j.core.Logger.logMessage(Logger.java:112)
at 
org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:727)
at 
org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:716)
at org.apache.logging.slf4j.Log4jLogger.info(Log4jLogger.java:198)
at clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
at clojure.tools.logging.impl$fn__28$G__8__39.invoke(impl.clj:16)
at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:781)
at clojure.lang.RestFn.invoke(RestFn.java:410)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1215] Use Asynchronous logger for impro...

2015-11-17 Thread kishorvpatil
GitHub user kishorvpatil opened a pull request:

https://github.com/apache/storm/pull/888

[STORM-1215] Use Asynchronous logger for improving performance.

The changes to loggers for all daemons and workers.
- Turn off immediateFlush.
- Enable async logging.
- Turn off Immediate Fail for log events on syslog logger to avoid making 
blocking call without timeout.
 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kishorvpatil/incubator-storm STORM-1215

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/888.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #888


commit 601efa5cad6604bf922f7f1ef843e3f8ae9bf1f2
Author: Kishor Patil 
Date:   2015-11-17T16:14:56Z

All daemons should Async Logging to avoid locking issues

commit 1660da0e36b6f5647bc21f9813f3eca1c3c53afa
Author: Kishor Patil 
Date:   2015-11-17T17:23:38Z

Make worker log appenders async for performance




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1215) Use Async Loggers to avoid locking and logging overhead

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009200#comment-15009200
 ] 

ASF GitHub Bot commented on STORM-1215:
---

GitHub user kishorvpatil opened a pull request:

https://github.com/apache/storm/pull/888

[STORM-1215] Use Asynchronous logger for improving performance.

The changes to loggers for all daemons and workers.
- Turn off immediateFlush.
- Enable async logging.
- Turn off Immediate Fail for log events on syslog logger to avoid making 
blocking call without timeout.
 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kishorvpatil/incubator-storm STORM-1215

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/888.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #888


commit 601efa5cad6604bf922f7f1ef843e3f8ae9bf1f2
Author: Kishor Patil 
Date:   2015-11-17T16:14:56Z

All daemons should Async Logging to avoid locking issues

commit 1660da0e36b6f5647bc21f9813f3eca1c3c53afa
Author: Kishor Patil 
Date:   2015-11-17T17:23:38Z

Make worker log appenders async for performance




> Use Async Loggers to avoid locking  and logging overhead
> 
>
> Key: STORM-1215
> URL: https://issues.apache.org/jira/browse/STORM-1215
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Kishor Patil
>Assignee: Kishor Patil
>
> The loggers are synchronous with immediateFlush to disk, making some of the 
> daemons slow down. In  some other cases, nimbus is slow too with submit-lock.
> Making loggers asynchronous with no necessity to write to disk on every 
> logger event would improve cpu resource usage for logging.
> {code}
> "pool-7-thread-986" #1025 prio=5 os_prio=0 tid=0x7f0f9628c800 nid=0x1b84 
> runnable [0x7f0f0fa2a000]
>java.lang.Thread.State: RUNNABLE
>   at java.io.FileOutputStream.writeBytes(Native Method)
>   at java.io.FileOutputStream.write(FileOutputStream.java:326)
>   at 
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>   at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>   - locked <0x0003c00ae520> (a java.io.BufferedOutputStream)
>   at java.io.PrintStream.write(PrintStream.java:482)
>   - locked <0x0003c00ae500> (a java.io.PrintStream)
>   at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>   at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
>   at sun.nio.cs.StreamEncoder.flushBuffer(StreamEncoder.java:104)
>   - locked <0x0003c00ae640> (a java.io.OutputStreamWriter)
>   at java.io.OutputStreamWriter.flushBuffer(OutputStreamWriter.java:185)
>   at java.io.PrintStream.write(PrintStream.java:527)
>   - locked <0x0003c00ae500> (a java.io.PrintStream)
>   at java.io.PrintStream.print(PrintStream.java:669)
>   at java.io.PrintStream.println(PrintStream.java:806)
>   - locked <0x0003c00ae500> (a java.io.PrintStream)
>   at 
> org.apache.logging.log4j.status.StatusConsoleListener.log(StatusConsoleListener.java:81)
>   at 
> org.apache.logging.log4j.status.StatusLogger.logMessage(StatusLogger.java:218)
>   at 
> org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:727)
>   at 
> org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:716)
>   at 
> org.apache.logging.log4j.spi.AbstractLogger.error(AbstractLogger.java:344)
>   at 
> org.apache.logging.log4j.core.appender.DefaultErrorHandler.error(DefaultErrorHandler.java:59)
>   at 
> org.apache.logging.log4j.core.appender.AbstractAppender.error(AbstractAppender.java:86)
>   at 
> org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.append(AbstractOutputStreamAppender.java:116)
>   at 
> org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:99)
>   at 
> org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:430)
>   at 
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:409)
>   at 
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:367)
>   at org.apache.logging.log4j.core.Logger.logMessage(Logger.java:112)
>   at 
> org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:727)
>   at 
> org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:716)
>   at org.apache.logging.slf4j.Log4jLogger.info(Log4jLogger.java:198)
>   at clojure.tools.logging$eval1$fn__7.inv

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-17 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45099492
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -390,53 +444,98 @@
   [(.getNodeId slot) (.getPort slot)]
   )))
 
+(defn- get-version-for-key [key nimbus-host-port-info conf]
+  (let [version (KeyVersion. key nimbus-host-port-info)]
+(.getKeyVersion version conf)))
+
+(defn get-key-seq-from-blob-store [blob-store]
+  (let [key-iter (.listKeys blob-store nimbus-subject)]
+(iterator-seq key-iter)))
+
 (defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf 
topology]
-  (let [stormroot (master-stormdist-root conf storm-id)]
-   (log-message "nimbus file location:" stormroot)
-   (FileUtils/forceMkdir (File. stormroot))
-   (FileUtils/cleanDirectory (File. stormroot))
-   (setup-jar conf tmp-jar-location stormroot)
-   (FileUtils/writeByteArrayToFile (File. (master-stormcode-path 
stormroot)) (Utils/serialize topology))
-   (FileUtils/writeByteArrayToFile (File. (master-stormconf-path 
stormroot)) (Utils/toCompressedJsonConf storm-conf))
-   (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) 
stormroot storm-id))
-   ))
+  (let [subject (get-subject)
+storm-cluster-state (:storm-cluster-state nimbus)
+blob-store (:blob-store nimbus)
+jar-key (master-stormjar-key storm-id)
+code-key (master-stormcode-key storm-id)
+conf-key (master-stormconf-key storm-id)
+nimbus-host-port-info (:nimbus-host-port-info nimbus)]
+(when tmp-jar-location ;;in local mode there is no jar
+  (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+  (if (instance? LocalFsBlobStore blob-store)
+(.setup-blobstore! storm-cluster-state jar-key 
nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info 
conf
+(.createBlob blob-store conf-key (Utils/toCompressedJsonConf 
storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+(if (instance? LocalFsBlobStore blob-store)
+  (.setup-blobstore! storm-cluster-state conf-key 
nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info 
conf)))
+(.createBlob blob-store code-key (Utils/serialize topology) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+(if (instance? LocalFsBlobStore blob-store)
+  (.setup-blobstore! storm-cluster-state code-key 
nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info 
conf)
+
+(defn- read-storm-topology [storm-id blob-store]
+  (Utils/deserialize
+(.readBlob blob-store (master-stormcode-key storm-id) (get-subject)) 
StormTopology))
+
+(defn- get-blob-replication-count [blob-key nimbus]
+  (if (:blob-store nimbus)
+  (-> (:blob-store nimbus)
+(.getBlobReplication  blob-key nimbus-subject
 
 (defn- wait-for-desired-code-replication [nimbus conf storm-id]
   (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
 max-replication-wait-time (conf 
TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
-total-wait-time (atom 0)
-current-replication-count (atom (if (:code-distributor nimbus) 
(.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
-  (if (:code-distributor nimbus)
-(while (and (> min-replication-count @current-replication-count)
- (or (= -1 max-replication-wait-time)
-   (< @total-wait-time max-replication-wait-time)))
+current-replication-count-jar (if (not (local-mode? conf))
+(atom (get-blob-replication-count 
(master-stormjar-key storm-id) nimbus))
+(atom min-replication-count))
+current-replication-count-code (atom (get-blob-replication-count 
(master-stormcode-key storm-id) nimbus))
+current-replication-count-conf (atom (get-blob-replication-count 
(master-stormconf-key storm-id) nimbus))
+total-wait-time (atom 0)]
+(if (:blob-store nimbus)
+  (while (and
+   (or (> min-replication-count @current-replication-count-jar)
+   (> min-replication-count 
@current-replication-count-code)
+   (> min-replication-count 
@current-replication-count-conf))
+   (or (= -1 max-replication-wait-time)
+   (< @total-wait-time max-replication-wait-time)))
--- End diff --

Can we test for `(neg? max-replication-wait-time)`, as per my previous 
comment, or is there a reason we want to check specifically for `-1`?


---
If your project is set up for it, you can reply to th

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009205#comment-15009205
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45099492
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -390,53 +444,98 @@
   [(.getNodeId slot) (.getPort slot)]
   )))
 
+(defn- get-version-for-key [key nimbus-host-port-info conf]
+  (let [version (KeyVersion. key nimbus-host-port-info)]
+(.getKeyVersion version conf)))
+
+(defn get-key-seq-from-blob-store [blob-store]
+  (let [key-iter (.listKeys blob-store nimbus-subject)]
+(iterator-seq key-iter)))
+
 (defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf 
topology]
-  (let [stormroot (master-stormdist-root conf storm-id)]
-   (log-message "nimbus file location:" stormroot)
-   (FileUtils/forceMkdir (File. stormroot))
-   (FileUtils/cleanDirectory (File. stormroot))
-   (setup-jar conf tmp-jar-location stormroot)
-   (FileUtils/writeByteArrayToFile (File. (master-stormcode-path 
stormroot)) (Utils/serialize topology))
-   (FileUtils/writeByteArrayToFile (File. (master-stormconf-path 
stormroot)) (Utils/toCompressedJsonConf storm-conf))
-   (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) 
stormroot storm-id))
-   ))
+  (let [subject (get-subject)
+storm-cluster-state (:storm-cluster-state nimbus)
+blob-store (:blob-store nimbus)
+jar-key (master-stormjar-key storm-id)
+code-key (master-stormcode-key storm-id)
+conf-key (master-stormconf-key storm-id)
+nimbus-host-port-info (:nimbus-host-port-info nimbus)]
+(when tmp-jar-location ;;in local mode there is no jar
+  (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+  (if (instance? LocalFsBlobStore blob-store)
+(.setup-blobstore! storm-cluster-state jar-key 
nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info 
conf
+(.createBlob blob-store conf-key (Utils/toCompressedJsonConf 
storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+(if (instance? LocalFsBlobStore blob-store)
+  (.setup-blobstore! storm-cluster-state conf-key 
nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info 
conf)))
+(.createBlob blob-store code-key (Utils/serialize topology) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+(if (instance? LocalFsBlobStore blob-store)
+  (.setup-blobstore! storm-cluster-state code-key 
nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info 
conf)
+
+(defn- read-storm-topology [storm-id blob-store]
+  (Utils/deserialize
+(.readBlob blob-store (master-stormcode-key storm-id) (get-subject)) 
StormTopology))
+
+(defn- get-blob-replication-count [blob-key nimbus]
+  (if (:blob-store nimbus)
+  (-> (:blob-store nimbus)
+(.getBlobReplication  blob-key nimbus-subject
 
 (defn- wait-for-desired-code-replication [nimbus conf storm-id]
   (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
 max-replication-wait-time (conf 
TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
-total-wait-time (atom 0)
-current-replication-count (atom (if (:code-distributor nimbus) 
(.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
-  (if (:code-distributor nimbus)
-(while (and (> min-replication-count @current-replication-count)
- (or (= -1 max-replication-wait-time)
-   (< @total-wait-time max-replication-wait-time)))
+current-replication-count-jar (if (not (local-mode? conf))
+(atom (get-blob-replication-count 
(master-stormjar-key storm-id) nimbus))
+(atom min-replication-count))
+current-replication-count-code (atom (get-blob-replication-count 
(master-stormcode-key storm-id) nimbus))
+current-replication-count-conf (atom (get-blob-replication-count 
(master-stormconf-key storm-id) nimbus))
+total-wait-time (atom 0)]
+(if (:blob-store nimbus)
+  (while (and
+   (or (> min-replication-count @current-replication-count-jar)
+   (> min-replication-count 
@current-replication-count-code)
+   (> min-replication-count 
@current-replication-count-conf))
+   (or (= -1 max-replication-wait-time)
+   (< @total-wait-time max-re

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-17 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45100258
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -390,53 +444,98 @@
   [(.getNodeId slot) (.getPort slot)]
   )))
 
+(defn- get-version-for-key [key nimbus-host-port-info conf]
+  (let [version (KeyVersion. key nimbus-host-port-info)]
+(.getKeyVersion version conf)))
+
+(defn get-key-seq-from-blob-store [blob-store]
+  (let [key-iter (.listKeys blob-store nimbus-subject)]
+(iterator-seq key-iter)))
+
 (defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf 
topology]
-  (let [stormroot (master-stormdist-root conf storm-id)]
-   (log-message "nimbus file location:" stormroot)
-   (FileUtils/forceMkdir (File. stormroot))
-   (FileUtils/cleanDirectory (File. stormroot))
-   (setup-jar conf tmp-jar-location stormroot)
-   (FileUtils/writeByteArrayToFile (File. (master-stormcode-path 
stormroot)) (Utils/serialize topology))
-   (FileUtils/writeByteArrayToFile (File. (master-stormconf-path 
stormroot)) (Utils/toCompressedJsonConf storm-conf))
-   (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) 
stormroot storm-id))
-   ))
+  (let [subject (get-subject)
+storm-cluster-state (:storm-cluster-state nimbus)
+blob-store (:blob-store nimbus)
+jar-key (master-stormjar-key storm-id)
+code-key (master-stormcode-key storm-id)
+conf-key (master-stormconf-key storm-id)
+nimbus-host-port-info (:nimbus-host-port-info nimbus)]
+(when tmp-jar-location ;;in local mode there is no jar
+  (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+  (if (instance? LocalFsBlobStore blob-store)
+(.setup-blobstore! storm-cluster-state jar-key 
nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info 
conf
+(.createBlob blob-store conf-key (Utils/toCompressedJsonConf 
storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+(if (instance? LocalFsBlobStore blob-store)
+  (.setup-blobstore! storm-cluster-state conf-key 
nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info 
conf)))
+(.createBlob blob-store code-key (Utils/serialize topology) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+(if (instance? LocalFsBlobStore blob-store)
+  (.setup-blobstore! storm-cluster-state code-key 
nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info 
conf)
+
+(defn- read-storm-topology [storm-id blob-store]
+  (Utils/deserialize
+(.readBlob blob-store (master-stormcode-key storm-id) (get-subject)) 
StormTopology))
+
+(defn- get-blob-replication-count [blob-key nimbus]
+  (if (:blob-store nimbus)
+  (-> (:blob-store nimbus)
+(.getBlobReplication  blob-key nimbus-subject
 
 (defn- wait-for-desired-code-replication [nimbus conf storm-id]
   (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
 max-replication-wait-time (conf 
TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
-total-wait-time (atom 0)
-current-replication-count (atom (if (:code-distributor nimbus) 
(.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
-  (if (:code-distributor nimbus)
-(while (and (> min-replication-count @current-replication-count)
- (or (= -1 max-replication-wait-time)
-   (< @total-wait-time max-replication-wait-time)))
+current-replication-count-jar (if (not (local-mode? conf))
+(atom (get-blob-replication-count 
(master-stormjar-key storm-id) nimbus))
+(atom min-replication-count))
+current-replication-count-code (atom (get-blob-replication-count 
(master-stormcode-key storm-id) nimbus))
+current-replication-count-conf (atom (get-blob-replication-count 
(master-stormconf-key storm-id) nimbus))
+total-wait-time (atom 0)]
+(if (:blob-store nimbus)
+  (while (and
+   (or (> min-replication-count @current-replication-count-jar)
+   (> min-replication-count 
@current-replication-count-code)
+   (> min-replication-count 
@current-replication-count-conf))
+   (or (= -1 max-replication-wait-time)
+   (< @total-wait-time max-replication-wait-time)))
 (sleep-secs 1)
 (log-debug "waiting for desired replication to be achieved.
   min-replication-count = " min-replication-count  " 
max-replication-wait-time = " max-replication-wait-time
- 

[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009218#comment-15009218
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45100258
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -390,53 +444,98 @@
   [(.getNodeId slot) (.getPort slot)]
   )))
 
+(defn- get-version-for-key [key nimbus-host-port-info conf]
+  (let [version (KeyVersion. key nimbus-host-port-info)]
+(.getKeyVersion version conf)))
+
+(defn get-key-seq-from-blob-store [blob-store]
+  (let [key-iter (.listKeys blob-store nimbus-subject)]
+(iterator-seq key-iter)))
+
 (defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf 
topology]
-  (let [stormroot (master-stormdist-root conf storm-id)]
-   (log-message "nimbus file location:" stormroot)
-   (FileUtils/forceMkdir (File. stormroot))
-   (FileUtils/cleanDirectory (File. stormroot))
-   (setup-jar conf tmp-jar-location stormroot)
-   (FileUtils/writeByteArrayToFile (File. (master-stormcode-path 
stormroot)) (Utils/serialize topology))
-   (FileUtils/writeByteArrayToFile (File. (master-stormconf-path 
stormroot)) (Utils/toCompressedJsonConf storm-conf))
-   (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) 
stormroot storm-id))
-   ))
+  (let [subject (get-subject)
+storm-cluster-state (:storm-cluster-state nimbus)
+blob-store (:blob-store nimbus)
+jar-key (master-stormjar-key storm-id)
+code-key (master-stormcode-key storm-id)
+conf-key (master-stormconf-key storm-id)
+nimbus-host-port-info (:nimbus-host-port-info nimbus)]
+(when tmp-jar-location ;;in local mode there is no jar
+  (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+  (if (instance? LocalFsBlobStore blob-store)
+(.setup-blobstore! storm-cluster-state jar-key 
nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info 
conf
+(.createBlob blob-store conf-key (Utils/toCompressedJsonConf 
storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+(if (instance? LocalFsBlobStore blob-store)
+  (.setup-blobstore! storm-cluster-state conf-key 
nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info 
conf)))
+(.createBlob blob-store code-key (Utils/serialize topology) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+(if (instance? LocalFsBlobStore blob-store)
+  (.setup-blobstore! storm-cluster-state code-key 
nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info 
conf)
+
+(defn- read-storm-topology [storm-id blob-store]
+  (Utils/deserialize
+(.readBlob blob-store (master-stormcode-key storm-id) (get-subject)) 
StormTopology))
+
+(defn- get-blob-replication-count [blob-key nimbus]
+  (if (:blob-store nimbus)
+  (-> (:blob-store nimbus)
+(.getBlobReplication  blob-key nimbus-subject
 
 (defn- wait-for-desired-code-replication [nimbus conf storm-id]
   (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
 max-replication-wait-time (conf 
TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
-total-wait-time (atom 0)
-current-replication-count (atom (if (:code-distributor nimbus) 
(.getReplicationCount (:code-distributor nimbus) storm-id) 0))]
-  (if (:code-distributor nimbus)
-(while (and (> min-replication-count @current-replication-count)
- (or (= -1 max-replication-wait-time)
-   (< @total-wait-time max-replication-wait-time)))
+current-replication-count-jar (if (not (local-mode? conf))
+(atom (get-blob-replication-count 
(master-stormjar-key storm-id) nimbus))
+(atom min-replication-count))
+current-replication-count-code (atom (get-blob-replication-count 
(master-stormcode-key storm-id) nimbus))
+current-replication-count-conf (atom (get-blob-replication-count 
(master-stormconf-key storm-id) nimbus))
+total-wait-time (atom 0)]
+(if (:blob-store nimbus)
+  (while (and
+   (or (> min-replication-count @current-replication-count-jar)
+   (> min-replication-count 
@current-replication-count-code)
+   (> min-replication-count 
@current-replication-count-conf))
+   (or (= -1 max-replication-wait-time)
+   (< @total-wait-time max-re

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-17 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45100757
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -390,53 +444,98 @@
   [(.getNodeId slot) (.getPort slot)]
   )))
 
+(defn- get-version-for-key [key nimbus-host-port-info conf]
+  (let [version (KeyVersion. key nimbus-host-port-info)]
+(.getKeyVersion version conf)))
+
+(defn get-key-seq-from-blob-store [blob-store]
+  (let [key-iter (.listKeys blob-store nimbus-subject)]
+(iterator-seq key-iter)))
+
 (defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf 
topology]
-  (let [stormroot (master-stormdist-root conf storm-id)]
-   (log-message "nimbus file location:" stormroot)
-   (FileUtils/forceMkdir (File. stormroot))
-   (FileUtils/cleanDirectory (File. stormroot))
-   (setup-jar conf tmp-jar-location stormroot)
-   (FileUtils/writeByteArrayToFile (File. (master-stormcode-path 
stormroot)) (Utils/serialize topology))
-   (FileUtils/writeByteArrayToFile (File. (master-stormconf-path 
stormroot)) (Utils/toCompressedJsonConf storm-conf))
-   (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) 
stormroot storm-id))
-   ))
+  (let [subject (get-subject)
+storm-cluster-state (:storm-cluster-state nimbus)
+blob-store (:blob-store nimbus)
+jar-key (master-stormjar-key storm-id)
+code-key (master-stormcode-key storm-id)
+conf-key (master-stormconf-key storm-id)
+nimbus-host-port-info (:nimbus-host-port-info nimbus)]
+(when tmp-jar-location ;;in local mode there is no jar
+  (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+  (if (instance? LocalFsBlobStore blob-store)
+(.setup-blobstore! storm-cluster-state jar-key 
nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info 
conf
+(.createBlob blob-store conf-key (Utils/toCompressedJsonConf 
storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+(if (instance? LocalFsBlobStore blob-store)
+  (.setup-blobstore! storm-cluster-state conf-key 
nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info 
conf)))
+(.createBlob blob-store code-key (Utils/serialize topology) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+(if (instance? LocalFsBlobStore blob-store)
+  (.setup-blobstore! storm-cluster-state code-key 
nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info 
conf)
+
+(defn- read-storm-topology [storm-id blob-store]
+  (Utils/deserialize
+(.readBlob blob-store (master-stormcode-key storm-id) (get-subject)) 
StormTopology))
+
+(defn- get-blob-replication-count [blob-key nimbus]
+  (if (:blob-store nimbus)
+  (-> (:blob-store nimbus)
+(.getBlobReplication  blob-key nimbus-subject
--- End diff --

Fix indentation here. The threading macro should be indented two spaces in 
from the `if` on the previous line, and the call to `getBlobReplication` should 
be indented the same amount as `(:blob-store nimbus)`.


Also, here and elsewhere, when we are adding or changing clojure functions, 
we need to put the argument list on its own line in order to follow idiomatic 
clojure style.  There was a change we merged in a while ago that fixed many of 
these instances, and we should try to maintain the same style going forward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009227#comment-15009227
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45100757
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -390,53 +444,98 @@
   [(.getNodeId slot) (.getPort slot)]
   )))
 
+(defn- get-version-for-key [key nimbus-host-port-info conf]
+  (let [version (KeyVersion. key nimbus-host-port-info)]
+(.getKeyVersion version conf)))
+
+(defn get-key-seq-from-blob-store [blob-store]
+  (let [key-iter (.listKeys blob-store nimbus-subject)]
+(iterator-seq key-iter)))
+
 (defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf 
topology]
-  (let [stormroot (master-stormdist-root conf storm-id)]
-   (log-message "nimbus file location:" stormroot)
-   (FileUtils/forceMkdir (File. stormroot))
-   (FileUtils/cleanDirectory (File. stormroot))
-   (setup-jar conf tmp-jar-location stormroot)
-   (FileUtils/writeByteArrayToFile (File. (master-stormcode-path 
stormroot)) (Utils/serialize topology))
-   (FileUtils/writeByteArrayToFile (File. (master-stormconf-path 
stormroot)) (Utils/toCompressedJsonConf storm-conf))
-   (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) 
stormroot storm-id))
-   ))
+  (let [subject (get-subject)
+storm-cluster-state (:storm-cluster-state nimbus)
+blob-store (:blob-store nimbus)
+jar-key (master-stormjar-key storm-id)
+code-key (master-stormcode-key storm-id)
+conf-key (master-stormconf-key storm-id)
+nimbus-host-port-info (:nimbus-host-port-info nimbus)]
+(when tmp-jar-location ;;in local mode there is no jar
+  (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+  (if (instance? LocalFsBlobStore blob-store)
+(.setup-blobstore! storm-cluster-state jar-key 
nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info 
conf
+(.createBlob blob-store conf-key (Utils/toCompressedJsonConf 
storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+(if (instance? LocalFsBlobStore blob-store)
+  (.setup-blobstore! storm-cluster-state conf-key 
nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info 
conf)))
+(.createBlob blob-store code-key (Utils/serialize topology) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
+(if (instance? LocalFsBlobStore blob-store)
+  (.setup-blobstore! storm-cluster-state code-key 
nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info 
conf)
+
+(defn- read-storm-topology [storm-id blob-store]
+  (Utils/deserialize
+(.readBlob blob-store (master-stormcode-key storm-id) (get-subject)) 
StormTopology))
+
+(defn- get-blob-replication-count [blob-key nimbus]
+  (if (:blob-store nimbus)
+  (-> (:blob-store nimbus)
+(.getBlobReplication  blob-key nimbus-subject
--- End diff --

Fix indentation here. The threading macro should be indented two spaces in 
from the `if` on the previous line, and the call to `getBlobReplication` should 
be indented the same amount as `(:blob-store nimbus)`.


Also, here and elsewhere, when we are adding or changing clojure functions, 
we need to put the argument list on its own line in order to follow idiomatic 
clojure style.  There was a change we merged in a while ago that fixed many of 
these instances, and we should try to maintain the same style going forward.


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for th

[jira] [Commented] (STORM-1202) Migrate APIs to org.apache.storm, but try to provide backwards compatability as a bridge

2015-11-17 Thread Robert Joseph Evans (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009229#comment-15009229
 ] 

Robert Joseph Evans commented on STORM-1202:


My plan is not working as I had hoped.  New clients submitting both old and new 
jars work well, but the old client submitting an old jar results in 
serialization errors in the worker.  The reason is that the calculated serial 
version changes between the two.  Even if I manipulate/add a serialVersionUID 
to match technically these are going to be incompatible changes, as officially 
the type of some fields/parent classes will have changed.  Perhaps this is the 
best that we can do.

> Migrate APIs to org.apache.storm, but try to provide backwards compatability 
> as a bridge
> 
>
> Key: STORM-1202
> URL: https://issues.apache.org/jira/browse/STORM-1202
> Project: Apache Storm
>  Issue Type: New Feature
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
>
> We want to move the storm classpaths to org.apache.storm wherever possible, 
> but we also want to provide backwards compatibility for user facing APIs 
> whenever possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-17 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45101369
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -529,10 +628,11 @@
 
 (defn- compute-executors [nimbus storm-id]
   (let [conf (:conf nimbus)
+ blob-store (:blob-store nimbus)
--- End diff --

check white-space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009231#comment-15009231
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45101369
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -529,10 +628,11 @@
 
 (defn- compute-executors [nimbus storm-id]
   (let [conf (:conf nimbus)
+ blob-store (:blob-store nimbus)
--- End diff --

check white-space


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-17 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45102787
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -15,14 +15,23 @@
 ;; limitations under the License.
 (ns backtype.storm.daemon.nimbus
   (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
+  (:import [backtype.storm.generated KeyNotFoundException])
+  (:import [backtype.storm.blobstore LocalFsBlobStore])
   (:import [org.apache.thrift.protocol TBinaryProtocol 
TBinaryProtocol$Factory])
   (:import [org.apache.thrift.exception])
   (:import [org.apache.thrift.transport TNonblockingServerTransport 
TNonblockingServerSocket])
   (:import [org.apache.commons.io FileUtils])
+  (:import [javax.security.auth Subject])
+  (:import [backtype.storm.security.auth NimbusPrincipal])
   (:import [java.nio ByteBuffer]
[java.util Collections List HashMap]
[backtype.storm.generated NimbusSummary])
-  (:import [java.io FileNotFoundException File FileOutputStream])
+  (:import [java.util Iterator])
+  (:import [java.nio ByteBuffer]
+   [java.util Collections List HashMap ArrayList])
--- End diff --

We are importing more than once from `java.util`, 
`backtype.storm.generated`, `backtype.storm.blobstore`, and possibly some 
others.  We could consolidate these.  I read that the proper way to do this now 
is to have a single `:import` per file.  Maybe that would clean things up here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-17 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45102927
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -1206,22 +1328,40 @@
  (.set_reset_log_level_timeout_epoch log-config (coerce/to-long 
timeout))
  (.unset_reset_log_level_timeout_epoch log-config
 
+(defmethod blob-sync-code :distributed [conf nimbus]
+  (if (not (is-leader nimbus :throw-exception false))
+(let [storm-cluster-state (:storm-cluster-state nimbus)
+  nimbus-host-port-info (:nimbus-host-port-info nimbus)
+  blob-store-key-list (into [] (get-key-seq-from-blob-store 
(:blob-store nimbus)))
+  zk-key-list (into [] (.blobstore storm-cluster-state (fn [] 
(blob-sync-code conf nimbus]
+  (log-debug "blob-sync-code " "blob-store-keys " blob-store-key-list 
"zookeeper-keys " zk-key-list)
+  (let [sync-blobs (doto
+  (SyncBlobs. (:blob-store nimbus) conf)
+  (.setNimbusInfo nimbus-host-port-info)
+  (.setBlobStoreKeyList (if (not-nil? 
blob-store-key-list) (ArrayList. blob-store-key-list) (ArrayList.)))
+  (.setZookeeperKeyList (if (not-nil? zk-key-list) 
(ArrayList. zk-key-list) (ArrayList.]
--- End diff --

Do we need to send `ArrayList`s specifically when calling 
`setBlobStoreKeyList` and `setZookeeperKeyList`, or can we say something like 
`(or blob-store-key-list [])`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009256#comment-15009256
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45102927
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -1206,22 +1328,40 @@
  (.set_reset_log_level_timeout_epoch log-config (coerce/to-long 
timeout))
  (.unset_reset_log_level_timeout_epoch log-config
 
+(defmethod blob-sync-code :distributed [conf nimbus]
+  (if (not (is-leader nimbus :throw-exception false))
+(let [storm-cluster-state (:storm-cluster-state nimbus)
+  nimbus-host-port-info (:nimbus-host-port-info nimbus)
+  blob-store-key-list (into [] (get-key-seq-from-blob-store 
(:blob-store nimbus)))
+  zk-key-list (into [] (.blobstore storm-cluster-state (fn [] 
(blob-sync-code conf nimbus]
+  (log-debug "blob-sync-code " "blob-store-keys " blob-store-key-list 
"zookeeper-keys " zk-key-list)
+  (let [sync-blobs (doto
+  (SyncBlobs. (:blob-store nimbus) conf)
+  (.setNimbusInfo nimbus-host-port-info)
+  (.setBlobStoreKeyList (if (not-nil? 
blob-store-key-list) (ArrayList. blob-store-key-list) (ArrayList.)))
+  (.setZookeeperKeyList (if (not-nil? zk-key-list) 
(ArrayList. zk-key-list) (ArrayList.]
--- End diff --

Do we need to send `ArrayList`s specifically when calling 
`setBlobStoreKeyList` and `setZookeeperKeyList`, or can we say something like 
`(or blob-store-key-list [])`?


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009254#comment-15009254
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45102787
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -15,14 +15,23 @@
 ;; limitations under the License.
 (ns backtype.storm.daemon.nimbus
   (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
+  (:import [backtype.storm.generated KeyNotFoundException])
+  (:import [backtype.storm.blobstore LocalFsBlobStore])
   (:import [org.apache.thrift.protocol TBinaryProtocol 
TBinaryProtocol$Factory])
   (:import [org.apache.thrift.exception])
   (:import [org.apache.thrift.transport TNonblockingServerTransport 
TNonblockingServerSocket])
   (:import [org.apache.commons.io FileUtils])
+  (:import [javax.security.auth Subject])
+  (:import [backtype.storm.security.auth NimbusPrincipal])
   (:import [java.nio ByteBuffer]
[java.util Collections List HashMap]
[backtype.storm.generated NimbusSummary])
-  (:import [java.io FileNotFoundException File FileOutputStream])
+  (:import [java.util Iterator])
+  (:import [java.nio ByteBuffer]
+   [java.util Collections List HashMap ArrayList])
--- End diff --

We are importing more than once from `java.util`, 
`backtype.storm.generated`, `backtype.storm.blobstore`, and possibly some 
others.  We could consolidate these.  I read that the proper way to do this now 
is to have a single `:import` per file.  Maybe that would clean things up here.


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1202) Migrate APIs to org.apache.storm, but try to provide backwards compatability as a bridge

2015-11-17 Thread Robert Joseph Evans (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009281#comment-15009281
 ] 

Robert Joseph Evans commented on STORM-1202:


Yes this is not going to work even with an agent, so the best we can do is to 
support a new client submitting an old jar, but not an old client submitting an 
old jar.

> Migrate APIs to org.apache.storm, but try to provide backwards compatability 
> as a bridge
> 
>
> Key: STORM-1202
> URL: https://issues.apache.org/jira/browse/STORM-1202
> Project: Apache Storm
>  Issue Type: New Feature
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
>
> We want to move the storm classpaths to org.apache.storm wherever possible, 
> but we also want to provide backwards compatibility for user facing APIs 
> whenever possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009310#comment-15009310
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45106345
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/SyncBlobs.java ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ * Is called periodically and updates the nimbus with blobs based on the 
state stored inside the zookeeper
+ */
+public class SyncBlobs {
--- End diff --

"BlobSynchronizer"?


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-17 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45106444
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/SyncBlobs.java ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ * Is called periodically and updates the nimbus with blobs based on the 
state stored inside the zookeeper
+ */
+public class SyncBlobs {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SyncBlobs.class);
+  private CuratorFramework zkClient;
+  private Map conf;
+  private BlobStore blobStore;
+  private List blobStoreKeyList = new ArrayList();
+  private List zookeeperKeyList = new ArrayList();
+  private NimbusInfo nimbusInfo;
+
+  public SyncBlobs(BlobStore blobStore, Map conf) {
+this.blobStore = blobStore;
+this.conf = conf;
+  }
+
+  public void setNimbusInfo(NimbusInfo nimbusInfo) {
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public void setZookeeperKeyList(List zookeeperKeyList) {
+this.zookeeperKeyList = zookeeperKeyList;
+  }
+
+  public void setBlobStoreKeyList(List blobStoreKeyList) {
+this.blobStoreKeyList = blobStoreKeyList;
+  }
+
+  public NimbusInfo getNimbusInfo() {
+return nimbusInfo;
+  }
+
+  public List getBlobStoreKeyList() {
+List keyList = new ArrayList();
+keyList.addAll(blobStoreKeyList);
+return keyList;
+  }
--- End diff --

Lists might be the wrong data structure here.  They do not have to be 
ordered, but they cannot have duplicates.  Maybe we want Set here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-17 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45106345
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/SyncBlobs.java ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ * Is called periodically and updates the nimbus with blobs based on the 
state stored inside the zookeeper
+ */
+public class SyncBlobs {
--- End diff --

"BlobSynchronizer"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009313#comment-15009313
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45106444
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/SyncBlobs.java ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ * Is called periodically and updates the nimbus with blobs based on the 
state stored inside the zookeeper
+ */
+public class SyncBlobs {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SyncBlobs.class);
+  private CuratorFramework zkClient;
+  private Map conf;
+  private BlobStore blobStore;
+  private List blobStoreKeyList = new ArrayList();
+  private List zookeeperKeyList = new ArrayList();
+  private NimbusInfo nimbusInfo;
+
+  public SyncBlobs(BlobStore blobStore, Map conf) {
+this.blobStore = blobStore;
+this.conf = conf;
+  }
+
+  public void setNimbusInfo(NimbusInfo nimbusInfo) {
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public void setZookeeperKeyList(List zookeeperKeyList) {
+this.zookeeperKeyList = zookeeperKeyList;
+  }
+
+  public void setBlobStoreKeyList(List blobStoreKeyList) {
+this.blobStoreKeyList = blobStoreKeyList;
+  }
+
+  public NimbusInfo getNimbusInfo() {
+return nimbusInfo;
+  }
+
+  public List getBlobStoreKeyList() {
+List keyList = new ArrayList();
+keyList.addAll(blobStoreKeyList);
+return keyList;
+  }
--- End diff --

Lists might be the wrong data structure here.  They do not have to be 
ordered, but they cannot have duplicates.  Maybe we want Set here?


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009314#comment-15009314
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45106673
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/SyncBlobs.java ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ * Is called periodically and updates the nimbus with blobs based on the 
state stored inside the zookeeper
+ */
+public class SyncBlobs {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SyncBlobs.class);
+  private CuratorFramework zkClient;
+  private Map conf;
+  private BlobStore blobStore;
+  private List blobStoreKeyList = new ArrayList();
+  private List zookeeperKeyList = new ArrayList();
+  private NimbusInfo nimbusInfo;
+
+  public SyncBlobs(BlobStore blobStore, Map conf) {
+this.blobStore = blobStore;
+this.conf = conf;
+  }
+
+  public void setNimbusInfo(NimbusInfo nimbusInfo) {
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public void setZookeeperKeyList(List zookeeperKeyList) {
+this.zookeeperKeyList = zookeeperKeyList;
+  }
+
+  public void setBlobStoreKeyList(List blobStoreKeyList) {
+this.blobStoreKeyList = blobStoreKeyList;
+  }
+
+  public NimbusInfo getNimbusInfo() {
+return nimbusInfo;
+  }
+
+  public List getBlobStoreKeyList() {
+List keyList = new ArrayList();
+keyList.addAll(blobStoreKeyList);
+return keyList;
+  }
+
+  public List getZookeeperKeyList() {
+List keyList = new ArrayList();
+keyList.addAll(zookeeperKeyList);
+return keyList;
+  }
+
+  public synchronized void syncBlobs() {
+try {
+LOG.debug("Sync blobs - blobstore {} keys {} zookeeperkeys {}", 
blobStore, getBlobStoreKeyList(), getZookeeperKeyList());
+zkClient = Utils.createZKClient(conf);
+deleteKeyListFromBlobStoreNotOnZookeeper(getBlobStoreKeyList(), 
getZookeeperKeyList());
+updateKeyListForBlobStore(getBlobStoreKeyList());
+List keyListToDownload = 
getKeyListToDownload(getBlobStoreKeyList(), getZookeeperKeyList());
+LOG.debug("Key List Blobstore-> Zookeeper-> DownloadList {}-> {}-> 
{}", getBlobStoreKeyList(), getZookeeperKeyList(), keyListToDownload);
+
+for (String key : keyListToDownload) {
+  List nimbusInfoList = 
Utils.getNimbodesWithLatestVersionOfBlob(zkClient, key);
+  if(Utils.downloadMissingBlob(conf, blobStore, key, nimbusInfoList)) {
+Utils.createStateInZookeeper(conf, key, nimbusInfo);
+  }
+}
+if (zkClient !=null) {
+  zkClient.close();
+}
+} catch(InterruptedException exp) {
+LOG.error("InterruptedException {}", exp);
+} catch(TTransportException exp) {
+throw new RuntimeException(exp);
+} catch(Exception exp) {
+  // Should we log or throw exception
+throw new RuntimeException(exp);
--- End diff --

Let's just throw RTE here, unless there is something special we can do here 
for this Exception?


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporte

[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-17 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45106701
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/SyncBlobs.java ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ * Is called periodically and updates the nimbus with blobs based on the 
state stored inside the zookeeper
+ */
+public class SyncBlobs {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SyncBlobs.class);
+  private CuratorFramework zkClient;
+  private Map conf;
+  private BlobStore blobStore;
+  private List blobStoreKeyList = new ArrayList();
+  private List zookeeperKeyList = new ArrayList();
+  private NimbusInfo nimbusInfo;
+
+  public SyncBlobs(BlobStore blobStore, Map conf) {
+this.blobStore = blobStore;
+this.conf = conf;
+  }
+
+  public void setNimbusInfo(NimbusInfo nimbusInfo) {
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public void setZookeeperKeyList(List zookeeperKeyList) {
+this.zookeeperKeyList = zookeeperKeyList;
+  }
+
+  public void setBlobStoreKeyList(List blobStoreKeyList) {
+this.blobStoreKeyList = blobStoreKeyList;
+  }
+
+  public NimbusInfo getNimbusInfo() {
+return nimbusInfo;
+  }
--- End diff --

Unused?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-876) Dist Cache: Basic Functionality

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009317#comment-15009317
 ] 

ASF GitHub Bot commented on STORM-876:
--

Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45106701
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/SyncBlobs.java ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ * Is called periodically and updates the nimbus with blobs based on the 
state stored inside the zookeeper
+ */
+public class SyncBlobs {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SyncBlobs.class);
+  private CuratorFramework zkClient;
+  private Map conf;
+  private BlobStore blobStore;
+  private List blobStoreKeyList = new ArrayList();
+  private List zookeeperKeyList = new ArrayList();
+  private NimbusInfo nimbusInfo;
+
+  public SyncBlobs(BlobStore blobStore, Map conf) {
+this.blobStore = blobStore;
+this.conf = conf;
+  }
+
+  public void setNimbusInfo(NimbusInfo nimbusInfo) {
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public void setZookeeperKeyList(List zookeeperKeyList) {
+this.zookeeperKeyList = zookeeperKeyList;
+  }
+
+  public void setBlobStoreKeyList(List blobStoreKeyList) {
+this.blobStoreKeyList = blobStoreKeyList;
+  }
+
+  public NimbusInfo getNimbusInfo() {
+return nimbusInfo;
+  }
--- End diff --

Unused?


> Dist Cache: Basic Functionality
> ---
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-876] Blobstore API

2015-11-17 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/845#discussion_r45106673
  
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/SyncBlobs.java ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ * Is called periodically and updates the nimbus with blobs based on the 
state stored inside the zookeeper
+ */
+public class SyncBlobs {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SyncBlobs.class);
+  private CuratorFramework zkClient;
+  private Map conf;
+  private BlobStore blobStore;
+  private List blobStoreKeyList = new ArrayList();
+  private List zookeeperKeyList = new ArrayList();
+  private NimbusInfo nimbusInfo;
+
+  public SyncBlobs(BlobStore blobStore, Map conf) {
+this.blobStore = blobStore;
+this.conf = conf;
+  }
+
+  public void setNimbusInfo(NimbusInfo nimbusInfo) {
+this.nimbusInfo = nimbusInfo;
+  }
+
+  public void setZookeeperKeyList(List zookeeperKeyList) {
+this.zookeeperKeyList = zookeeperKeyList;
+  }
+
+  public void setBlobStoreKeyList(List blobStoreKeyList) {
+this.blobStoreKeyList = blobStoreKeyList;
+  }
+
+  public NimbusInfo getNimbusInfo() {
+return nimbusInfo;
+  }
+
+  public List getBlobStoreKeyList() {
+List keyList = new ArrayList();
+keyList.addAll(blobStoreKeyList);
+return keyList;
+  }
+
+  public List getZookeeperKeyList() {
+List keyList = new ArrayList();
+keyList.addAll(zookeeperKeyList);
+return keyList;
+  }
+
+  public synchronized void syncBlobs() {
+try {
+LOG.debug("Sync blobs - blobstore {} keys {} zookeeperkeys {}", 
blobStore, getBlobStoreKeyList(), getZookeeperKeyList());
+zkClient = Utils.createZKClient(conf);
+deleteKeyListFromBlobStoreNotOnZookeeper(getBlobStoreKeyList(), 
getZookeeperKeyList());
+updateKeyListForBlobStore(getBlobStoreKeyList());
+List keyListToDownload = 
getKeyListToDownload(getBlobStoreKeyList(), getZookeeperKeyList());
+LOG.debug("Key List Blobstore-> Zookeeper-> DownloadList {}-> {}-> 
{}", getBlobStoreKeyList(), getZookeeperKeyList(), keyListToDownload);
+
+for (String key : keyListToDownload) {
+  List nimbusInfoList = 
Utils.getNimbodesWithLatestVersionOfBlob(zkClient, key);
+  if(Utils.downloadMissingBlob(conf, blobStore, key, nimbusInfoList)) {
+Utils.createStateInZookeeper(conf, key, nimbusInfo);
+  }
+}
+if (zkClient !=null) {
+  zkClient.close();
+}
+} catch(InterruptedException exp) {
+LOG.error("InterruptedException {}", exp);
+} catch(TTransportException exp) {
+throw new RuntimeException(exp);
+} catch(Exception exp) {
+  // Should we log or throw exception
+throw new RuntimeException(exp);
--- End diff --

Let's just throw RTE here, unless there is something special we can do here 
for this Exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-904) move storm bin commands to java and provide appropriate bindings for windows and linux

2015-11-17 Thread Priyank Shah (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009320#comment-15009320
 ] 

Priyank Shah commented on STORM-904:


I have not worked on this in a while. The windows implementation is not done 
yet. Besides, the Unix based implementation will also need to be updated based 
on any additions to the command line. It needs an upmerge and additional review 
as well. I have updated the JIRA to Stop Progress for now. Sorry for not doing 
it sooner. 

> move storm bin commands to java and provide appropriate bindings for windows 
> and linux
> --
>
> Key: STORM-904
> URL: https://issues.apache.org/jira/browse/STORM-904
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Priyank Shah
>
> Currently we have python and .cmd implementation for windows. This is 
> becoming increasing difficult upkeep both versions. Lets make all the main 
> code of starting daemons etc. to java and provider wrapper scripts in shell 
> and batch for linux and windows respectively. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1202: Migrate APIs to org.apache.storm, ...

2015-11-17 Thread revans2
GitHub user revans2 opened a pull request:

https://github.com/apache/storm/pull/889

STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form 
of backwards compatability

This is not a typical pull request, because changing the packages is a huge 
task, and keeping it upmerged while this is reviewed and other patches goes in 
is a huge task, and error prone.

Instead the actual changes to the package/namespace is done by running

 ```./dev-tools/move_packages.sh ./```

you can wipe all of the changes clean again (Including any modifications on 
your branch that you have not checked in) by running
```./dev-tools/cleanup.sh ./```

once the changes are approved/committed these two scripts should be removed.

The other code is intended to provide a single executable that can shade a 
user jar so instead of using `backtype.storm` and `storm.trident` it uses 
`org.apache.storm` and `org.apache.storm.trident`.

These are off by default by can be enabled by adding
```
client.jartransformer.class: "org.apache.storm.hack.StormShadeTransformer"
```
to storm.yaml.  I would expect all of the code related to this to be 
removed when we go to the 2.0 release.

I have tested this by running a 0.10.0-SNAPSHOT storm starter topologies 
against a cluster running this patch (along with the move_package.sh executed).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/revans2/incubator-storm STORM-1202

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/889.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #889


commit dab5b0de51a6c83562d18ab8d823f77ff5c4d757
Author: Robert (Bobby) Evans 
Date:   2015-11-13T19:42:13Z

STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form 
of backwards compatability




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1202) Migrate APIs to org.apache.storm, but try to provide backwards compatability as a bridge

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009380#comment-15009380
 ] 

ASF GitHub Bot commented on STORM-1202:
---

GitHub user revans2 opened a pull request:

https://github.com/apache/storm/pull/889

STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form 
of backwards compatability

This is not a typical pull request, because changing the packages is a huge 
task, and keeping it upmerged while this is reviewed and other patches goes in 
is a huge task, and error prone.

Instead the actual changes to the package/namespace is done by running

 ```./dev-tools/move_packages.sh ./```

you can wipe all of the changes clean again (Including any modifications on 
your branch that you have not checked in) by running
```./dev-tools/cleanup.sh ./```

once the changes are approved/committed these two scripts should be removed.

The other code is intended to provide a single executable that can shade a 
user jar so instead of using `backtype.storm` and `storm.trident` it uses 
`org.apache.storm` and `org.apache.storm.trident`.

These are off by default by can be enabled by adding
```
client.jartransformer.class: "org.apache.storm.hack.StormShadeTransformer"
```
to storm.yaml.  I would expect all of the code related to this to be 
removed when we go to the 2.0 release.

I have tested this by running a 0.10.0-SNAPSHOT storm starter topologies 
against a cluster running this patch (along with the move_package.sh executed).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/revans2/incubator-storm STORM-1202

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/889.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #889


commit dab5b0de51a6c83562d18ab8d823f77ff5c4d757
Author: Robert (Bobby) Evans 
Date:   2015-11-13T19:42:13Z

STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form 
of backwards compatability




> Migrate APIs to org.apache.storm, but try to provide backwards compatability 
> as a bridge
> 
>
> Key: STORM-1202
> URL: https://issues.apache.org/jira/browse/STORM-1202
> Project: Apache Storm
>  Issue Type: New Feature
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
>
> We want to move the storm classpaths to org.apache.storm wherever possible, 
> but we also want to provide backwards compatibility for user facing APIs 
> whenever possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1215] Use Asynchronous logger for impro...

2015-11-17 Thread d2r
Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/888#issuecomment-157487326
  
This also switches to UTF-8 encoding for syslog, for compatibility.  We 
have seen issues with this in the past.

Evidently we cannot configure the context except via a System property, so 
looks OK to me.

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1215) Use Async Loggers to avoid locking and logging overhead

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009381#comment-15009381
 ] 

ASF GitHub Bot commented on STORM-1215:
---

Github user d2r commented on the pull request:

https://github.com/apache/storm/pull/888#issuecomment-157487326
  
This also switches to UTF-8 encoding for syslog, for compatibility.  We 
have seen issues with this in the past.

Evidently we cannot configure the context except via a System property, so 
looks OK to me.

+1


> Use Async Loggers to avoid locking  and logging overhead
> 
>
> Key: STORM-1215
> URL: https://issues.apache.org/jira/browse/STORM-1215
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Kishor Patil
>Assignee: Kishor Patil
>
> The loggers are synchronous with immediateFlush to disk, making some of the 
> daemons slow down. In  some other cases, nimbus is slow too with submit-lock.
> Making loggers asynchronous with no necessity to write to disk on every 
> logger event would improve cpu resource usage for logging.
> {code}
> "pool-7-thread-986" #1025 prio=5 os_prio=0 tid=0x7f0f9628c800 nid=0x1b84 
> runnable [0x7f0f0fa2a000]
>java.lang.Thread.State: RUNNABLE
>   at java.io.FileOutputStream.writeBytes(Native Method)
>   at java.io.FileOutputStream.write(FileOutputStream.java:326)
>   at 
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>   at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>   - locked <0x0003c00ae520> (a java.io.BufferedOutputStream)
>   at java.io.PrintStream.write(PrintStream.java:482)
>   - locked <0x0003c00ae500> (a java.io.PrintStream)
>   at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>   at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
>   at sun.nio.cs.StreamEncoder.flushBuffer(StreamEncoder.java:104)
>   - locked <0x0003c00ae640> (a java.io.OutputStreamWriter)
>   at java.io.OutputStreamWriter.flushBuffer(OutputStreamWriter.java:185)
>   at java.io.PrintStream.write(PrintStream.java:527)
>   - locked <0x0003c00ae500> (a java.io.PrintStream)
>   at java.io.PrintStream.print(PrintStream.java:669)
>   at java.io.PrintStream.println(PrintStream.java:806)
>   - locked <0x0003c00ae500> (a java.io.PrintStream)
>   at 
> org.apache.logging.log4j.status.StatusConsoleListener.log(StatusConsoleListener.java:81)
>   at 
> org.apache.logging.log4j.status.StatusLogger.logMessage(StatusLogger.java:218)
>   at 
> org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:727)
>   at 
> org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:716)
>   at 
> org.apache.logging.log4j.spi.AbstractLogger.error(AbstractLogger.java:344)
>   at 
> org.apache.logging.log4j.core.appender.DefaultErrorHandler.error(DefaultErrorHandler.java:59)
>   at 
> org.apache.logging.log4j.core.appender.AbstractAppender.error(AbstractAppender.java:86)
>   at 
> org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.append(AbstractOutputStreamAppender.java:116)
>   at 
> org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:99)
>   at 
> org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:430)
>   at 
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:409)
>   at 
> org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:367)
>   at org.apache.logging.log4j.core.Logger.logMessage(Logger.java:112)
>   at 
> org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:727)
>   at 
> org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:716)
>   at org.apache.logging.slf4j.Log4jLogger.info(Log4jLogger.java:198)
>   at clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
>   at clojure.tools.logging.impl$fn__28$G__8__39.invoke(impl.clj:16)
>   at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
>   at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:781)
>   at clojure.lang.RestFn.invoke(RestFn.java:410)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1198] Web UI to show resource usages an...

2015-11-17 Thread kishorvpatil
Github user kishorvpatil commented on a diff in the pull request:

https://github.com/apache/storm/pull/875#discussion_r45111484
  
--- Diff: storm-core/src/ui/public/index.html ---
@@ -143,6 +143,12 @@
 ]
   });
   $('#topology-summary [data-toggle="tooltip"]').tooltip();
+  $.getJSON("/api/v1/cluster/configuration", function(json){
+  var displayResource = json["scheduler.display.resource"];
+  if (!displayResource){
+  $('#topology-summary td:nth-child(10),#topology-summary 
th:nth-child(10)').hide();
--- End diff --

Let's avoid referring to fields by number which could change in future and 
make maintenance harder..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1202: Migrate APIs to org.apache.storm, ...

2015-11-17 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/889#issuecomment-157487487
  
Because this is a non-backwards compatible change, and the mailing lists 
have discussed moving the version numbers over to 1.0.0-SNAPSHOT I am happy to 
make that change here too, but I wanted to hold off and get feedback from 
others before doing so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1198) Web UI to show resource usages and Total Resources on all supervisors

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009386#comment-15009386
 ] 

ASF GitHub Bot commented on STORM-1198:
---

Github user kishorvpatil commented on a diff in the pull request:

https://github.com/apache/storm/pull/875#discussion_r45111484
  
--- Diff: storm-core/src/ui/public/index.html ---
@@ -143,6 +143,12 @@
 ]
   });
   $('#topology-summary [data-toggle="tooltip"]').tooltip();
+  $.getJSON("/api/v1/cluster/configuration", function(json){
+  var displayResource = json["scheduler.display.resource"];
+  if (!displayResource){
+  $('#topology-summary td:nth-child(10),#topology-summary 
th:nth-child(10)').hide();
--- End diff --

Let's avoid referring to fields by number which could change in future and 
make maintenance harder..


> Web UI to show resource usages and Total Resources on all supervisors
> -
>
> Key: STORM-1198
> URL: https://issues.apache.org/jira/browse/STORM-1198
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-core
>Reporter: Zhuo Liu
>Assignee: Zhuo Liu
>Priority: Minor
> Attachments: supervisor-resources.png
>
>
> As we have resource aware scheduler (STORM-894), we want to be able to 
> display resource capacity (CPU, memory; and network in future) and scheduled 
> resource usage on each supervisor node.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1202) Migrate APIs to org.apache.storm, but try to provide backwards compatability as a bridge

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009382#comment-15009382
 ] 

ASF GitHub Bot commented on STORM-1202:
---

Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/889#issuecomment-157487487
  
Because this is a non-backwards compatible change, and the mailing lists 
have discussed moving the version numbers over to 1.0.0-SNAPSHOT I am happy to 
make that change here too, but I wanted to hold off and get feedback from 
others before doing so.


> Migrate APIs to org.apache.storm, but try to provide backwards compatability 
> as a bridge
> 
>
> Key: STORM-1202
> URL: https://issues.apache.org/jira/browse/STORM-1202
> Project: Apache Storm
>  Issue Type: New Feature
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
>
> We want to move the storm classpaths to org.apache.storm wherever possible, 
> but we also want to provide backwards compatibility for user facing APIs 
> whenever possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1198) Web UI to show resource usages and Total Resources on all supervisors

2015-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009383#comment-15009383
 ] 

ASF GitHub Bot commented on STORM-1198:
---

Github user kishorvpatil commented on a diff in the pull request:

https://github.com/apache/storm/pull/875#discussion_r45111413
  
--- Diff: storm-core/src/ui/public/index.html ---
@@ -143,6 +143,12 @@
 ]
   });
   $('#topology-summary [data-toggle="tooltip"]').tooltip();
+  $.getJSON("/api/v1/cluster/configuration", function(json){
--- End diff --

i suggest we avoid making another ajax call - instead we can have topology 
summary include boolean as part of topology-summary.


> Web UI to show resource usages and Total Resources on all supervisors
> -
>
> Key: STORM-1198
> URL: https://issues.apache.org/jira/browse/STORM-1198
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-core
>Reporter: Zhuo Liu
>Assignee: Zhuo Liu
>Priority: Minor
> Attachments: supervisor-resources.png
>
>
> As we have resource aware scheduler (STORM-894), we want to be able to 
> display resource capacity (CPU, memory; and network in future) and scheduled 
> resource usage on each supervisor node.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1198] Web UI to show resource usages an...

2015-11-17 Thread kishorvpatil
Github user kishorvpatil commented on a diff in the pull request:

https://github.com/apache/storm/pull/875#discussion_r45111413
  
--- Diff: storm-core/src/ui/public/index.html ---
@@ -143,6 +143,12 @@
 ]
   });
   $('#topology-summary [data-toggle="tooltip"]').tooltip();
+  $.getJSON("/api/v1/cluster/configuration", function(json){
--- End diff --

i suggest we avoid making another ajax call - instead we can have topology 
summary include boolean as part of topology-summary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >