[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-10-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/1710


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-29 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r81219539
  
--- Diff: storm-core/src/jvm/org/apache/storm/Config.java ---
@@ -926,10 +926,10 @@
 public static final String UI_HTTPS_NEED_CLIENT_AUTH = 
"ui.https.need.client.auth";
 
 /**
- * The host that Pacemaker is running on.
+ * The list of servers that Pacemaker is running on.
  */
-@isString
-public static final String PACEMAKER_HOST = "pacemaker.host";
+@isStringList
+public static final String PACEMAKER_SERVERS = "pacemaker.servers";
--- End diff --

@HeartSaVioR Yep, the 1.x-branch PR will leave pacemaker.host in, but have 
it deprecated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-27 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80820718
  
--- Diff: storm-core/src/jvm/org/apache/storm/Config.java ---
@@ -926,10 +926,10 @@
 public static final String UI_HTTPS_NEED_CLIENT_AUTH = 
"ui.https.need.client.auth";
 
 /**
- * The host that Pacemaker is running on.
+ * The list of servers that Pacemaker is running on.
  */
-@isString
-public static final String PACEMAKER_HOST = "pacemaker.host";
+@isStringList
+public static final String PACEMAKER_SERVERS = "pacemaker.servers";
--- End diff --

I'm in favor of backward compatibility change for this, like nimbus.host 
and nimbus.seeds. For 2.0 we don't need to keep nimbus.host and pacemaker.host 
but 1.x still need to have it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-27 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80801551
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java ---
@@ -159,10 +162,6 @@ public void set_worker_hb(String path, byte[] data, 
List acls) {
 ret = details;
 }
 }
-if(ret == null) {
-throw new HBExecutionException("Failed to get a 
response.");
-}
-LOG.debug("Successful get_worker_hb");
 return ret;
--- End diff --

I think that is best, if all of them return something invalid or we get 
nothing back at all then we throw.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-27 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80798282
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java ---
@@ -159,10 +162,6 @@ public void set_worker_hb(String path, byte[] data, 
List acls) {
 ret = details;
 }
 }
-if(ret == null) {
-throw new HBExecutionException("Failed to get a 
response.");
-}
-LOG.debug("Successful get_worker_hb");
 return ret;
--- End diff --

Yes, I wasn't quite sure how we wanted to handle this, NULL responses are 
valid, and right now the code treats mismatched responses as if they were NULL. 
We can distinguish between them if we want the behavior to differ whether the 
pacemakers returned NULL details or returned an invalid message. Throwing on 
null is not what we wanted, though, since it will cause Nimbus to crash when 
nimbus tries to read a topology's heartbeats before it has started sending them.

Maybe we want to add some logic so that if ALL pacemakers return an invalid 
response, then we throw?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-27 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80791012
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java ---
@@ -159,10 +162,6 @@ public void set_worker_hb(String path, byte[] data, 
List acls) {
 ret = details;
 }
 }
-if(ret == null) {
-throw new HBExecutionException("Failed to get a 
response.");
-}
-LOG.debug("Successful get_worker_hb");
 return ret;
--- End diff --

Do we need the ret == null check before this?  Is it possible to get more 
then one response and all of them are bogus?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-27 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80718914
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClientPool.java ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.pacemaker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.storm.Config;
+import org.apache.storm.generated.HBMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PacemakerClientPool {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(PacemakerClientPool.class);
+
+private ConcurrentHashMap clientForServer = 
new ConcurrentHashMap<>();
+private ConcurrentLinkedQueue servers;
+private Map config;
+
+public PacemakerClientPool(Map config) {
+this.config = config;
+List serverList = 
(List)config.get(Config.PACEMAKER_SERVERS);
+if(serverList == null) {
+serverList = new ArrayList();
--- End diff --

and in the following few lines too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-27 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80718833
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClientPool.java ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.pacemaker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.storm.Config;
+import org.apache.storm.generated.HBMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PacemakerClientPool {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(PacemakerClientPool.class);
+
+private ConcurrentHashMap clientForServer = 
new ConcurrentHashMap<>();
+private ConcurrentLinkedQueue servers;
+private Map config;
+
+public PacemakerClientPool(Map config) {
+this.config = config;
+List serverList = 
(List)config.get(Config.PACEMAKER_SERVERS);
+if(serverList == null) {
+serverList = new ArrayList();
--- End diff --

nit: I don't think `` is needed just `<>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-27 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80718589
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java ---
@@ -69,7 +69,7 @@ else if(evm instanceof HBMessage) {
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent 
event) {
-LOG.error("Connection to pacemaker failed", event.getCause());
+LOG.error("Connection to pacemaker failed. Trying to reconnect 
{}", event.getCause().getMessage());
--- End diff --

If we know which exceptions we want to ignore, perhaps just IOExceptions, 
then we can explicitly blacklist them, and even log them under warn, instead of 
error. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-27 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80719082
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClientPool.java ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.pacemaker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.storm.Config;
+import org.apache.storm.generated.HBMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PacemakerClientPool {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(PacemakerClientPool.class);
+
+private ConcurrentHashMap clientForServer = 
new ConcurrentHashMap<>();
+private ConcurrentLinkedQueue servers;
+private Map config;
+
+public PacemakerClientPool(Map config) {
+this.config = config;
+List serverList = 
(List)config.get(Config.PACEMAKER_SERVERS);
+if(serverList == null) {
+serverList = new ArrayList();
+}
+else {
+serverList = new ArrayList(serverList);
+}
+Collections.shuffle(serverList);
+if(serverList != null) {
+servers = new ConcurrentLinkedQueue(serverList);
+}
+else {
+servers = new ConcurrentLinkedQueue();
+}
+}
+
+public HBMessage send(HBMessage m) throws PacemakerConnectionException 
{
+try {
+return getWriteClient().send(m);
+} catch (Exception e) {
+rotateClients();
+throw e;
+}
+}
+
+public List sendAll(HBMessage m) throws 
PacemakerConnectionException {
+List responses = new ArrayList();
+LOG.info("Using servers: {}", servers);
--- End diff --

Perhaps this should be debug (especially if we are worried about filling 
the logs)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-27 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80717469
  
--- Diff: storm-core/src/jvm/org/apache/storm/Config.java ---
@@ -926,10 +926,10 @@
 public static final String UI_HTTPS_NEED_CLIENT_AUTH = 
"ui.https.need.client.auth";
 
 /**
- * The host that Pacemaker is running on.
+ * The list of servers that Pacemaker is running on.
  */
-@isString
-public static final String PACEMAKER_HOST = "pacemaker.host";
+@isStringList
+public static final String PACEMAKER_SERVERS = "pacemaker.servers";
--- End diff --

This is a breaking change that would be good to go back to 1.x as well.  I 
am OK with this, because I don't think anyone is really using pacemaker, but I 
would like to hear others opinions on it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-26 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80508806
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java ---
@@ -69,7 +69,7 @@ else if(evm instanceof HBMessage) {
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent 
event) {
-LOG.error("Connection to pacemaker failed", event.getCause());
+LOG.error("Connection to pacemaker failed. Trying to reconnect 
{}", event.getCause().getMessage());
--- End diff --

I don't think we want the stack. It ends up absolutely filling Nimbus logs 
with relatively useless messages when a Pacemaker node goes down. I'm open to 
ideas, though, because a trace for an NPE or something would be nice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-23 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80324279
  
--- Diff: 
storm-core/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java ---
@@ -20,13 +20,22 @@
 import org.apache.storm.cluster.PaceMakerStateStorage;
 import org.apache.storm.generated.*;
 import org.apache.storm.pacemaker.PacemakerClient;
+import org.apache.storm.pacemaker.PacemakerClientPool;
 import org.apache.storm.utils.Utils;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class PaceMakerStateStorageFactoryTest {
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-private PacemakerClient clientProxy;
+public class PaceMakerStateStorageFactoryTest {
+private static Logger LOG = 
LoggerFactory.getLogger(PaceMakerStateStorageFactoryTest.class);
--- End diff --

We don't seem to be logging anything in this test. Do we need the Logger?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-23 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80314321
  
--- Diff: conf/defaults.yaml ---
@@ -276,6 +276,7 @@ resource.aware.scheduler.priority.strategy: 
"org.apache.storm.scheduler.resource
 dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
 
 pacemaker.host: "localhost"
+pacemaker.servers: []
--- End diff --

Let's remove `pacemaker.host` if it is no longer used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-23 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80323247
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java ---
@@ -69,7 +69,7 @@ else if(evm instanceof HBMessage) {
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent 
event) {
-LOG.error("Connection to pacemaker failed", event.getCause());
+LOG.error("Connection to pacemaker failed. Trying to reconnect 
{}", event.getCause().getMessage());
--- End diff --

We don't want to log the stack here, only the message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-23 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80321800
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java ---
@@ -160,13 +179,23 @@ public void set_worker_hb(String path, byte[] data, 
List acls) {
 int retry = maxRetries;
 while (true) {
 try {
+HashSet retSet = new HashSet<>();
+int latest_time_secs = 0;
--- End diff --

`latest_time_sec` not used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-23 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80319044
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClientPool.java ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.pacemaker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.storm.Config;
+import org.apache.storm.generated.HBMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PacemakerClientPool {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(PacemakerClientPool.class);
--- End diff --

check indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-23 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80319682
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClientPool.java ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.pacemaker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.storm.Config;
+import org.apache.storm.generated.HBMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PacemakerClientPool {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(PacemakerClientPool.class);
+
+private ConcurrentHashMap clientForServer = 
new ConcurrentHashMap<>();
+private ConcurrentLinkedQueue servers;
+private Map config;
+
+public PacemakerClientPool(Map config) {
+this.config = config;
+List serverList = 
(List)config.get(Config.PACEMAKER_SERVERS);
+if(serverList == null) {
+serverList = new ArrayList();
+}
+else {
+serverList = new ArrayList(serverList);
+}
+Collections.shuffle(serverList);
+if(serverList != null) {
+servers = new ConcurrentLinkedQueue(serverList);
+}
+else {
+servers = new ConcurrentLinkedQueue();
+}
+}
+
+public HBMessage send(HBMessage m) throws PacemakerConnectionException 
{
+try {
+return getWriteClient().send(m);
+} catch (Exception e) {
+rotateClients();
+throw e;
+}
+}
+
+public List sendAll(HBMessage m) throws 
PacemakerConnectionException {
+List responses = new ArrayList();
+LOG.info("Using servers: {}", servers);
+for(String s : servers) {
+try {
+HBMessage response = getClientForServer(s).send(m);
+responses.add(response);
+} catch (PacemakerConnectionException e) {
+LOG.error("Failed to send message to Pacemaker " + s);
+}
+}
+if(responses.size() == 0) {
+throw new PacemakerConnectionException("Failed to connect to 
any Pacemaker.");
+}
+return responses;
+}
+
+public void close() {
+for(PacemakerClient client : clientForServer.values()) {
+client.shutdown();
+client.close();
+}
+}
+
+private void rotateClients() {
+PacemakerClient c = getWriteClient();
+String server = servers.peek();
+// Servers should be rotated **BEFORE** the old client is removed 
from clientForServer
+// or a race with getWriteClient() could cause it to be put back 
in the map.
+servers.add(servers.remove());
+clientForServer.remove(server);   
+c.shutdown();
+c.close();
+}
+
+private PacemakerClient getWriteClient() {
+return getClientForServer(servers.peek());
+}
+
+private PacemakerClient getClientForServer(String server) {
+PacemakerClient client = clientForServer.get(server);
+if(client == null) {
+client = new PacemakerClient(config, server);
+clientForServer.put(server, client);
+}
+return client;
+}
+
--- End diff --

minor: extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at i

[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-23 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1710#discussion_r80321976
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java 
---
@@ -18,48 +18,22 @@
 package org.apache.storm.cluster;
 
 import org.apache.storm.pacemaker.PacemakerClient;
+import org.apache.storm.pacemaker.PacemakerClientPool;
--- End diff --

Remove PacemakerClient import, as it's not used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-23 Thread knusbaum
Github user knusbaum closed the pull request at:

https://github.com/apache/storm/pull/1710


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-23 Thread knusbaum
GitHub user knusbaum reopened a pull request:

https://github.com/apache/storm/pull/1710

STORM-1546: Adding Read and Write Aggregations for Pacemaker to make it HA 
compatible



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knusbaum/incubator-storm Pacemaker-HA

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1710.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1710


commit c71314fbd2101ff081bee7aef3ecc434c3541044
Author: Kyle Nusbaum 
Date:   2016-09-06T21:04:07Z

Inspecting travis's maven cache

commit efa546862f7717ee1db222895ea14e6432666ab3
Author: Kyle Nusbaum 
Date:   2016-09-08T20:07:49Z

Working on Pacemaker HA

commit f46cfc15fa2069cabaa742ccfd2b5689fe5a3777
Author: Kyle Nusbaum 
Date:   2016-09-19T19:22:46Z

cleanup.

commit 0c002928e49f63070915e7a41f97f70b40336f23
Author: Kyle Nusbaum 
Date:   2016-09-19T19:47:04Z

Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/storm

commit 6cd9661bde2ce4c93c5628aae2565e167b89aeb3
Author: Kyle Nusbaum 
Date:   2016-09-23T06:48:33Z

Fix ups.

commit 6a8be9f8c19bacb646c119c162c6cdba69cc66ff
Author: Kyle Nusbaum 
Date:   2016-09-23T07:03:46Z

Fix ups.

commit 523317b06abe87d7e745885c32c8a9c5dfbbeb0d
Author: Kyle Nusbaum 
Date:   2016-09-23T07:29:59Z

Cleaning up tests.

commit 410ef3a816fc6a961771abfbcac2d83512192040
Author: Kyle Nusbaum 
Date:   2016-09-23T07:31:54Z

Cleaning up.

commit 158228b7ad434287e7d1ec92aca9c5c08f144869
Author: Kyle Nusbaum 
Date:   2016-09-23T16:03:32Z

Kick travis

commit 6a657f16301306b5b0d8d4ab91f916c6e7757ffe
Author: Kyle Nusbaum 
Date:   2016-09-23T16:28:39Z

Fixing travis, adding license to new code.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1710: STORM-1546: Adding Read and Write Aggregations for...

2016-09-23 Thread knusbaum
GitHub user knusbaum opened a pull request:

https://github.com/apache/storm/pull/1710

STORM-1546: Adding Read and Write Aggregations for Pacemaker to make it HA 
compatible



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knusbaum/incubator-storm Pacemaker-HA

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1710.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1710


commit c71314fbd2101ff081bee7aef3ecc434c3541044
Author: Kyle Nusbaum 
Date:   2016-09-06T21:04:07Z

Inspecting travis's maven cache

commit efa546862f7717ee1db222895ea14e6432666ab3
Author: Kyle Nusbaum 
Date:   2016-09-08T20:07:49Z

Working on Pacemaker HA

commit f46cfc15fa2069cabaa742ccfd2b5689fe5a3777
Author: Kyle Nusbaum 
Date:   2016-09-19T19:22:46Z

cleanup.

commit 0c002928e49f63070915e7a41f97f70b40336f23
Author: Kyle Nusbaum 
Date:   2016-09-19T19:47:04Z

Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/storm

commit 6cd9661bde2ce4c93c5628aae2565e167b89aeb3
Author: Kyle Nusbaum 
Date:   2016-09-23T06:48:33Z

Fix ups.

commit 6a8be9f8c19bacb646c119c162c6cdba69cc66ff
Author: Kyle Nusbaum 
Date:   2016-09-23T07:03:46Z

Fix ups.

commit 523317b06abe87d7e745885c32c8a9c5dfbbeb0d
Author: Kyle Nusbaum 
Date:   2016-09-23T07:29:59Z

Cleaning up tests.

commit 410ef3a816fc6a961771abfbcac2d83512192040
Author: Kyle Nusbaum 
Date:   2016-09-23T07:31:54Z

Cleaning up.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---