[GitHub] storm pull request #2831: STORM-3224: Fix FLUX YAML Viewer icon location/pos...

2018-09-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2831


---


[GitHub] storm pull request #2840: STORM-3147: Add metrics based on ClusterSummary

2018-09-17 Thread srdo
GitHub user srdo opened a pull request:

https://github.com/apache/storm/pull/2840

STORM-3147: Add metrics based on ClusterSummary

Rebase and update of https://github.com/apache/storm/pull/2764.

@zd-project Please take a look when you have a chance.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srdo/storm STORM-3147

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2840.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2840


commit 02428594b53801572f657f4d0bff85b2213b3723
Author: Zhengdai Hu 
Date:   2018-07-13T19:22:20Z

STORM-3147: Port ClusterSummary to StormMetricsRegistry

commit 392803c9bb2fe81a5df57f695a0e6d7c37bd1f2e
Author: Stig Rohde Døssing 
Date:   2018-09-17T20:21:12Z

STORM-3147: Fix minor nits, rebase to use non-static metrics registry




---


[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2836


---


[GitHub] storm pull request #2805: STORM-3197: Make StormMetricsRegistry non-static

2018-09-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2805


---


[GitHub] storm issue #2831: STORM-3224: Fix FLUX YAML Viewer icon location/position o...

2018-09-17 Thread kishorvpatil
Github user kishorvpatil commented on the issue:

https://github.com/apache/storm/pull/2831
  
https://user-images.githubusercontent.com/6090397/45646222-afa9d500-ba90-11e8-9651-56b153e7ce42.png;>

@revans2 I fixed the position and removed duplicate space entries showing 
FLUX image icon.


---


[GitHub] storm issue #2836: STORM-3162: Cleanup heartbeats cache and make it thread s...

2018-09-17 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2836
  
Looks good, thanks. +1


---


[GitHub] storm issue #2836: STORM-3162: Cleanup heartbeats cache and make it thread s...

2018-09-17 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2836
  
@srdo I think I addressed all of your review comments.


---


[GitHub] storm issue #2831: STORM-3224: Fix FLUX YAML Viewer icon location/position o...

2018-09-17 Thread kishorvpatil
Github user kishorvpatil commented on the issue:

https://github.com/apache/storm/pull/2831
  
OMG, my bad.. looks like the span tag is added twice. Let me fix it.. 


---


[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218177587
  
--- Diff: storm-server/pom.xml ---
@@ -171,7 +171,7 @@
 maven-checkstyle-plugin
 
 
-780
+853
--- End diff --

No, it's fine. 


---


[GitHub] storm issue #2831: STORM-3224: Fix FLUX YAML Viewer icon location/position o...

2018-09-17 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2831
  
Sorry to pull my +1 back.  On both Firefox and Chrome it looks off to me.  
Could you take a look at this again?


![ui](https://user-images.githubusercontent.com/3441321/45642002-a151be00-ba7c-11e8-9101-161114487d54.png)



---


[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218174414
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import 
org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatCache.class);
+private static final Function, 
ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+private static class ExecutorCache {
+private Boolean isTimedOut;
+private Integer nimbusTime;
+private Integer executorReportedTime;
+
+public ExecutorCache(Map newBeat) {
+if (newBeat != null) {
+executorReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+} else {
+executorReportedTime = 0;
+}
+
+nimbusTime = Time.currentTimeSecs();
+isTimedOut = false;
+}
+
+public ExecutorCache(boolean isTimedOut, Integer nimbusTime, 
Integer executorReportedTime) {
+this.isTimedOut = isTimedOut;
+this.nimbusTime = nimbusTime;
+this.executorReportedTime = executorReportedTime;
+}
+
+public synchronized Boolean isTimedOut() {
+return isTimedOut;
+}
+
+public synchronized Integer getNimbusTime() {
+return nimbusTime;
+}
+
+public synchronized Integer getExecutorReportedTime() {
+return executorReportedTime;
+}
+
+public synchronized void updateTimeout(Integer timeout) {
+isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+}
+
+public synchronized void updateFromHb(Integer timeout, 
Map newBeat) {
+if (newBeat != null) {
+Integer newReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+if (!newReportedTime.equals(executorReportedTime)) {
+nimbusTime = Time.currentTimeSecs();
+}
+executorReportedTime = newReportedTime;
+}
+updateTimeout(timeout);
+}
+}
+
+//Topology Id -> executor ids -> component -> stats(...)
+private final ConcurrentHashMap, ExecutorCache>> cache;
+
+/**
+ * Create an empty cache.
+ */
+public HeartbeatCache() {
+this.cache = new ConcurrentHashMap<>();
+}
+
+/**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+@VisibleForTesting
+public void addEmptyTopoForTests(String topoId) {
+cache.put(topoId, new ConcurrentHashMap<>());
+}
+
+/**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+  

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218174140
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4624,7 +4569,7 @@ public boolean isRemoteBlobExists(String blobKey) 
throws AuthorizationException,
 return true;
 }
 
-private static final class Assoc implements UnaryOperator> {
+static final class Assoc implements UnaryOperator> {
--- End diff --

Yes, I'll make them private again.  This was because as I was refactoring I 
kept the Assoc and  Dissoc until the very end when I fixed the threading.


---


[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218173878
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import 
org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatCache.class);
+private static final Function, 
ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+private static class ExecutorCache {
+private Boolean isTimedOut;
+private Integer nimbusTime;
+private Integer executorReportedTime;
+
+public ExecutorCache(Map newBeat) {
+if (newBeat != null) {
+executorReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+} else {
+executorReportedTime = 0;
+}
+
+nimbusTime = Time.currentTimeSecs();
+isTimedOut = false;
+}
+
+public ExecutorCache(boolean isTimedOut, Integer nimbusTime, 
Integer executorReportedTime) {
+this.isTimedOut = isTimedOut;
+this.nimbusTime = nimbusTime;
+this.executorReportedTime = executorReportedTime;
+}
+
+public synchronized Boolean isTimedOut() {
+return isTimedOut;
+}
+
+public synchronized Integer getNimbusTime() {
+return nimbusTime;
+}
+
+public synchronized Integer getExecutorReportedTime() {
+return executorReportedTime;
+}
+
+public synchronized void updateTimeout(Integer timeout) {
+isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+}
+
+public synchronized void updateFromHb(Integer timeout, 
Map newBeat) {
+if (newBeat != null) {
+Integer newReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+if (!newReportedTime.equals(executorReportedTime)) {
+nimbusTime = Time.currentTimeSecs();
+}
+executorReportedTime = newReportedTime;
+}
+updateTimeout(timeout);
+}
+}
+
+//Topology Id -> executor ids -> component -> stats(...)
+private final ConcurrentHashMap, ExecutorCache>> cache;
+
+/**
+ * Create an empty cache.
+ */
+public HeartbeatCache() {
+this.cache = new ConcurrentHashMap<>();
+}
+
+/**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+@VisibleForTesting
+public void addEmptyTopoForTests(String topoId) {
+cache.put(topoId, new ConcurrentHashMap<>());
+}
+
+/**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+  

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218173746
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import 
org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatCache.class);
+private static final Function, 
ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+private static class ExecutorCache {
+private Boolean isTimedOut;
+private Integer nimbusTime;
+private Integer executorReportedTime;
+
+public ExecutorCache(Map newBeat) {
+if (newBeat != null) {
+executorReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+} else {
+executorReportedTime = 0;
+}
+
+nimbusTime = Time.currentTimeSecs();
+isTimedOut = false;
+}
+
+public ExecutorCache(boolean isTimedOut, Integer nimbusTime, 
Integer executorReportedTime) {
+this.isTimedOut = isTimedOut;
+this.nimbusTime = nimbusTime;
+this.executorReportedTime = executorReportedTime;
+}
+
+public synchronized Boolean isTimedOut() {
+return isTimedOut;
+}
+
+public synchronized Integer getNimbusTime() {
+return nimbusTime;
+}
+
+public synchronized Integer getExecutorReportedTime() {
+return executorReportedTime;
+}
+
+public synchronized void updateTimeout(Integer timeout) {
+isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+}
+
+public synchronized void updateFromHb(Integer timeout, 
Map newBeat) {
+if (newBeat != null) {
+Integer newReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+if (!newReportedTime.equals(executorReportedTime)) {
+nimbusTime = Time.currentTimeSecs();
+}
+executorReportedTime = newReportedTime;
+}
+updateTimeout(timeout);
+}
+}
+
+//Topology Id -> executor ids -> component -> stats(...)
+private final ConcurrentHashMap, ExecutorCache>> cache;
+
+/**
+ * Create an empty cache.
+ */
+public HeartbeatCache() {
+this.cache = new ConcurrentHashMap<>();
+}
+
+/**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+@VisibleForTesting
+public void addEmptyTopoForTests(String topoId) {
+cache.put(topoId, new ConcurrentHashMap<>());
+}
+
+/**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+  

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218173536
  
--- Diff: storm-server/pom.xml ---
@@ -171,7 +171,7 @@
 maven-checkstyle-plugin
 
 
-780
+853
--- End diff --

Yes it is all the issues that came with StatsUtil, which is why the other 
one went in a good direction.  I can try and clean them up more if you want.


---


[GitHub] storm pull request #2839: STORM-3228 allow refernce counting of differing Po...

2018-09-17 Thread agresch
GitHub user agresch opened a pull request:

https://github.com/apache/storm/pull/2839

STORM-3228 allow refernce counting of differing PortAndAssignment obj…

…ects to work properly

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/agresch/storm agresch_storm-3228

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2839.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2839


commit 5c47b2d7c1f3628c892e16b4d0f2dc78beddbb28
Author: Aaron Gresch 
Date:   2018-09-17T17:58:31Z

STORM-3228 allow refernce counting of differing PortAndAssignment objects 
to work properly




---


[GitHub] storm issue #2805: STORM-3197: Make StormMetricsRegistry non-static

2018-09-17 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2805
  
Test failure looks unrelated, it's getting a permission error when writing 
to the local maven repo.


---


[GitHub] storm pull request #2832: STORM-3205 Optimization in TuplImpl

2018-09-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2832


---


[GitHub] storm pull request #2838: STORM-3227: Only push credentials if going to expe...

2018-09-17 Thread revans2
GitHub user revans2 opened a pull request:

https://github.com/apache/storm/pull/2838

STORM-3227: Only push credentials if going to expected user



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/revans2/incubator-storm STORM-3227

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2838


commit 7607778bb5387a98f2e0bf5cb0c70e69c967d137
Author: Robert (Bobby) Evans 
Date:   2018-09-17T16:29:29Z

STORM-3227: Only push credentials if going to expected user




---


[GitHub] storm pull request #2805: STORM-3197: Make StormMetricsRegistry non-static

2018-09-17 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2805#discussion_r218150464
  
--- Diff: storm-server/src/main/java/org/apache/storm/LocalDRPC.java ---
@@ -38,9 +39,9 @@
 private final DRPC drpc;
 private final String serviceId;
 
-public LocalDRPC() {
+public LocalDRPC(StormMetricsRegistry metricsRegistry) {
--- End diff --

Added a no-args constructor that creates a new registry.


---


[GitHub] storm pull request #2805: STORM-3197: Make StormMetricsRegistry non-static

2018-09-17 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2805#discussion_r218148492
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---
@@ -219,11 +201,11 @@ public void kill() throws IOException {
 shutdownTimer = shutdownDuration.time();
 }
 try {
-Set pids = getAllPids();
+Set pids = getAllPids();
 
-for (Long pid : pids) {
-kill(pid);
-}
+for (Long pid : pids) {
+kill(pid);
+}
--- End diff --

Will reformat


---


[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218142320
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import 
org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatCache.class);
+private static final Function, 
ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+private static class ExecutorCache {
+private Boolean isTimedOut;
+private Integer nimbusTime;
+private Integer executorReportedTime;
+
+public ExecutorCache(Map newBeat) {
+if (newBeat != null) {
+executorReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+} else {
+executorReportedTime = 0;
+}
+
+nimbusTime = Time.currentTimeSecs();
+isTimedOut = false;
+}
+
+public ExecutorCache(boolean isTimedOut, Integer nimbusTime, 
Integer executorReportedTime) {
+this.isTimedOut = isTimedOut;
+this.nimbusTime = nimbusTime;
+this.executorReportedTime = executorReportedTime;
+}
+
+public synchronized Boolean isTimedOut() {
+return isTimedOut;
+}
+
+public synchronized Integer getNimbusTime() {
+return nimbusTime;
+}
+
+public synchronized Integer getExecutorReportedTime() {
+return executorReportedTime;
+}
+
+public synchronized void updateTimeout(Integer timeout) {
+isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+}
+
+public synchronized void updateFromHb(Integer timeout, 
Map newBeat) {
+if (newBeat != null) {
+Integer newReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+if (!newReportedTime.equals(executorReportedTime)) {
+nimbusTime = Time.currentTimeSecs();
+}
+executorReportedTime = newReportedTime;
+}
+updateTimeout(timeout);
+}
+}
+
+//Topology Id -> executor ids -> component -> stats(...)
+private final ConcurrentHashMap, ExecutorCache>> cache;
+
+/**
+ * Create an empty cache.
+ */
+public HeartbeatCache() {
+this.cache = new ConcurrentHashMap<>();
+}
+
+/**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+@VisibleForTesting
+public void addEmptyTopoForTests(String topoId) {
+cache.put(topoId, new ConcurrentHashMap<>());
+}
+
+/**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+ 

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218121407
  
--- Diff: storm-server/pom.xml ---
@@ -171,7 +171,7 @@
 maven-checkstyle-plugin
 
 
-780
+853
--- End diff --

This number seems to be moving in the wrong direction :)


---


[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r217911368
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import 
org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatCache.class);
+private static final Function, 
ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+private static class ExecutorCache {
+private Boolean isTimedOut;
+private Integer nimbusTime;
--- End diff --

Please rename to include the unit of time.


---


[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r217911059
  
--- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java 
---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.stats;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.shade.com.google.common.collect.Lists;
+import org.apache.storm.utils.Time;
+
+/**
+ * Stats calculations needed by storm client code.
+ */
+public class ClientStatsUtil {
+public static final String SPOUT = "spout";
+public static final String BOLT = "bolt";
+static final String EXECUTOR_STATS = "executor-stats";
+static final String UPTIME = "uptime";
+public static final String TIME_SECS = "time-secs";
+public static final ToGlobalStreamIdTransformer TO_GSID = new 
ToGlobalStreamIdTransformer();
+public static final IdentityTransformer IDENTITY = new 
IdentityTransformer();
+
+/**
+ * Convert a List executor to java List.
+ */
+public static List convertExecutor(List executor) {
+return Lists.newArrayList(executor.get(0).intValue(), 
executor.get(1).intValue());
+}
+
+/**
+ * Make and map of executors to empty stats.
--- End diff --

Nit: Couple of places saying "and" where it should say "a"/"an"


---


[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218140315
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import 
org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatCache.class);
+private static final Function, 
ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+private static class ExecutorCache {
+private Boolean isTimedOut;
+private Integer nimbusTime;
+private Integer executorReportedTime;
+
+public ExecutorCache(Map newBeat) {
+if (newBeat != null) {
+executorReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+} else {
+executorReportedTime = 0;
+}
+
+nimbusTime = Time.currentTimeSecs();
+isTimedOut = false;
+}
+
+public ExecutorCache(boolean isTimedOut, Integer nimbusTime, 
Integer executorReportedTime) {
+this.isTimedOut = isTimedOut;
+this.nimbusTime = nimbusTime;
+this.executorReportedTime = executorReportedTime;
+}
+
+public synchronized Boolean isTimedOut() {
+return isTimedOut;
+}
+
+public synchronized Integer getNimbusTime() {
+return nimbusTime;
+}
+
+public synchronized Integer getExecutorReportedTime() {
+return executorReportedTime;
+}
+
+public synchronized void updateTimeout(Integer timeout) {
+isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+}
+
+public synchronized void updateFromHb(Integer timeout, 
Map newBeat) {
+if (newBeat != null) {
+Integer newReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+if (!newReportedTime.equals(executorReportedTime)) {
+nimbusTime = Time.currentTimeSecs();
+}
+executorReportedTime = newReportedTime;
+}
+updateTimeout(timeout);
+}
+}
+
+//Topology Id -> executor ids -> component -> stats(...)
+private final ConcurrentHashMap, ExecutorCache>> cache;
+
+/**
+ * Create an empty cache.
+ */
+public HeartbeatCache() {
+this.cache = new ConcurrentHashMap<>();
+}
+
+/**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+@VisibleForTesting
+public void addEmptyTopoForTests(String topoId) {
+cache.put(topoId, new ConcurrentHashMap<>());
+}
+
+/**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+ 

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218124022
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java 
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import 
org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatCache.class);
+private static final Function, 
ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+private static class ExecutorCache {
+private Boolean isTimedOut;
+private Integer nimbusTime;
+private Integer executorReportedTime;
+
+public ExecutorCache(Map newBeat) {
+if (newBeat != null) {
+executorReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+} else {
+executorReportedTime = 0;
+}
+
+nimbusTime = Time.currentTimeSecs();
+isTimedOut = false;
+}
+
+public ExecutorCache(boolean isTimedOut, Integer nimbusTime, 
Integer executorReportedTime) {
+this.isTimedOut = isTimedOut;
+this.nimbusTime = nimbusTime;
+this.executorReportedTime = executorReportedTime;
+}
+
+public synchronized Boolean isTimedOut() {
+return isTimedOut;
+}
+
+public synchronized Integer getNimbusTime() {
+return nimbusTime;
+}
+
+public synchronized Integer getExecutorReportedTime() {
+return executorReportedTime;
+}
+
+public synchronized void updateTimeout(Integer timeout) {
+isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+}
+
+public synchronized void updateFromHb(Integer timeout, 
Map newBeat) {
+if (newBeat != null) {
+Integer newReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+if (!newReportedTime.equals(executorReportedTime)) {
+nimbusTime = Time.currentTimeSecs();
+}
+executorReportedTime = newReportedTime;
+}
+updateTimeout(timeout);
+}
+}
+
+//Topology Id -> executor ids -> component -> stats(...)
+private final ConcurrentHashMap, ExecutorCache>> cache;
+
+/**
+ * Create an empty cache.
+ */
+public HeartbeatCache() {
+this.cache = new ConcurrentHashMap<>();
+}
+
+/**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+@VisibleForTesting
+public void addEmptyTopoForTests(String topoId) {
+cache.put(topoId, new ConcurrentHashMap<>());
+}
+
+/**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+ 

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218140867
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4624,7 +4569,7 @@ public boolean isRemoteBlobExists(String blobKey) 
throws AuthorizationException,
 return true;
 }
 
-private static final class Assoc implements UnaryOperator> {
+static final class Assoc implements UnaryOperator> {
--- End diff --

Assoc and Dissoc don't seem to be used in new places, so is making them 
package private still necessary?


---


[GitHub] storm pull request #2833: STORM-3225: Use MediaType for check

2018-09-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2833


---


[GitHub] storm pull request #2837: Remove powered-by.md, it lives in the storm-site r...

2018-09-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2837


---


[GitHub] storm pull request #2834: STORM-3226: Update error message to be more clear

2018-09-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2834


---


[GitHub] storm issue #2805: STORM-3197: Make StormMetricsRegistry non-static

2018-09-17 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2805
  
@kishorvpatil yes but the merge conflicts are minor.  I wanted to get this 
reviewed so @srdo can address any review comments at the same time as fixing 
the merge conflicts so we can get this in ASAP and have a 2.0.0 RC out ASAP.


---


[GitHub] storm issue #2805: STORM-3197: Make StormMetricsRegistry non-static

2018-09-17 Thread kishorvpatil
Github user kishorvpatil commented on the issue:

https://github.com/apache/storm/pull/2805
  
@revans2  Looks like we have few conflicts on this PR?


---


[GitHub] storm pull request #2834: STORM-3226: Update error message to be more clear

2018-09-17 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2834#discussion_r218116277
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java 
---
@@ -135,7 +135,7 @@ public Supervisor(Map conf, IContext 
sharedContext, ISupervisor
 (String) conf.get(DaemonConfig.SUPERVISOR_AUTHORIZER), conf);
 if (authorizationHandler == null && 
conf.get(DaemonConfig.NIMBUS_AUTHORIZER) != null) {
 throw new IllegalStateException("It looks like authorization 
is turned on for nimbus but not for the "
-+ "supervisor");
++ "supervisor ( " + DaemonConfig.SUPERVISOR_AUTHORIZER 
+ " is not set)");
--- End diff --

Will do on checkin.


---


[GitHub] storm issue #2805: STORM-3197: Make StormMetricsRegistry non-static

2018-09-17 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2805
  
Also the merge conflicts are really minor.


---


[GitHub] storm pull request #2805: STORM-3197: Make StormMetricsRegistry non-static

2018-09-17 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2805#discussion_r218107287
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---
@@ -219,11 +201,11 @@ public void kill() throws IOException {
 shutdownTimer = shutdownDuration.time();
 }
 try {
-Set pids = getAllPids();
+Set pids = getAllPids();
 
-for (Long pid : pids) {
-kill(pid);
-}
+for (Long pid : pids) {
+kill(pid);
+}
--- End diff --

It looks like indentation is off around here.  could be spaces vs tabs or 
something, not really sure.


---


[GitHub] storm pull request #2805: STORM-3197: Make StormMetricsRegistry non-static

2018-09-17 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2805#discussion_r218106022
  
--- Diff: storm-server/src/main/java/org/apache/storm/LocalDRPC.java ---
@@ -38,9 +39,9 @@
 private final DRPC drpc;
 private final String serviceId;
 
-public LocalDRPC() {
+public LocalDRPC(StormMetricsRegistry metricsRegistry) {
--- End diff --

This feels problematic to me.

At a minimum we need to update ./docs/Local-mode.md to have the new way to 
create it documented, but I would really prefer to just have at least the 
option for a default constructor.  This is local mode the only reason we would 
need to have a single metrics registry would be if we intended to gather 
metrics from it afterwards as a part of a test, which we don't really do right 
now.


---


[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218088968
  
--- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java 
---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.stats;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.shade.com.google.common.collect.Lists;
+import org.apache.storm.utils.Time;
+
+/**
+ * Stats calculations needed by storm client code.
+ */
+public class ClientStatsUtil {
+public static final String SPOUT = "spout";
+public static final String BOLT = "bolt";
+static final String EXECUTOR_STATS = "executor-stats";
+static final String UPTIME = "uptime";
+public static final String TIME_SECS = "time-secs";
+public static final ToGlobalStreamIdTransformer TO_GSID = new 
ToGlobalStreamIdTransformer();
+public static final IdentityTransformer IDENTITY = new 
IdentityTransformer();
+
+/**
+ * Convert a List executor to java List.
+ */
+public static List convertExecutor(List executor) {
+return Lists.newArrayList(executor.get(0).intValue(), 
executor.get(1).intValue());
+}
+
+/**
+ * Make and map of executors to empty stats.
+ * @param executors the executors as keys of the map.
+ * @return and empty map of executors to stats.
+ */
+public static Map, ExecutorStats> 
mkEmptyExecutorZkHbs(Set> executors) {
+Map, ExecutorStats> ret = new HashMap<>();
+for (Object executor : executors) {
+List startEnd = (List) executor;
+ret.put(convertExecutor(startEnd), null);
+}
+return ret;
+}
+
+/**
+ * Convert Long Executor Ids in ZkHbs to Integer ones structure to 
java maps.
+ */
+public static Map, ExecutorStats> 
convertExecutorZkHbs(Map, ExecutorStats> executorBeats) {
+Map, ExecutorStats> ret = new HashMap<>();
+for (Map.Entry, ExecutorStats> entry : 
executorBeats.entrySet()) {
+ret.put(convertExecutor(entry.getKey()), entry.getValue());
+}
+return ret;
+}
+
+/**
+ * Create a new worker heartbeat for zookeeper.
+ * @param topoId the topology id
+ * @param executorStats the stats for the executors
+ * @param uptime the uptime for the worker.
+ * @return the heartbeat map.
+ */
+public static Map mkZkWorkerHb(String topoId, 
Map, ExecutorStats> executorStats, Integer uptime) {
+Map ret = new HashMap<>();
+ret.put("storm-id", topoId);
+ret.put(EXECUTOR_STATS, executorStats);
+ret.put(UPTIME, uptime);
+ret.put(TIME_SECS, Time.currentTimeSecs());
+
+return ret;
+}
+
+private static Number getByKeyOr0(Map m, String k) {
+if (m == null) {
+return 0;
+}
+
+Number n = (Number) m.get(k);
+if (n == null) {
+return 0;
+}
+return n;
+}
+
+/**
+ * Get a sub-map by a given key.
+ * @param map the original map
+ * @param key the key to get it from.
+ * @return the map stored under key.
+ */
+public static  Map getMapByKey(Map map, String key) {
+if (map == null) {
+return null;
+}
+return (Map) map.get(key);
+}
+
+public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map heartbeat) {
+ClusterWorkerHeartbeat ret = new 

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

2018-09-17 Thread kishorvpatil
Github user kishorvpatil commented on a diff in the pull request:

https://github.com/apache/storm/pull/2836#discussion_r218086663
  
--- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java 
---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.stats;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.shade.com.google.common.collect.Lists;
+import org.apache.storm.utils.Time;
+
+/**
+ * Stats calculations needed by storm client code.
+ */
+public class ClientStatsUtil {
+public static final String SPOUT = "spout";
+public static final String BOLT = "bolt";
+static final String EXECUTOR_STATS = "executor-stats";
+static final String UPTIME = "uptime";
+public static final String TIME_SECS = "time-secs";
+public static final ToGlobalStreamIdTransformer TO_GSID = new 
ToGlobalStreamIdTransformer();
+public static final IdentityTransformer IDENTITY = new 
IdentityTransformer();
+
+/**
+ * Convert a List executor to java List.
+ */
+public static List convertExecutor(List executor) {
+return Lists.newArrayList(executor.get(0).intValue(), 
executor.get(1).intValue());
+}
+
+/**
+ * Make and map of executors to empty stats.
+ * @param executors the executors as keys of the map.
+ * @return and empty map of executors to stats.
+ */
+public static Map, ExecutorStats> 
mkEmptyExecutorZkHbs(Set> executors) {
+Map, ExecutorStats> ret = new HashMap<>();
+for (Object executor : executors) {
+List startEnd = (List) executor;
+ret.put(convertExecutor(startEnd), null);
+}
+return ret;
+}
+
+/**
+ * Convert Long Executor Ids in ZkHbs to Integer ones structure to 
java maps.
+ */
+public static Map, ExecutorStats> 
convertExecutorZkHbs(Map, ExecutorStats> executorBeats) {
+Map, ExecutorStats> ret = new HashMap<>();
+for (Map.Entry, ExecutorStats> entry : 
executorBeats.entrySet()) {
+ret.put(convertExecutor(entry.getKey()), entry.getValue());
+}
+return ret;
+}
+
+/**
+ * Create a new worker heartbeat for zookeeper.
+ * @param topoId the topology id
+ * @param executorStats the stats for the executors
+ * @param uptime the uptime for the worker.
+ * @return the heartbeat map.
+ */
+public static Map mkZkWorkerHb(String topoId, 
Map, ExecutorStats> executorStats, Integer uptime) {
+Map ret = new HashMap<>();
+ret.put("storm-id", topoId);
+ret.put(EXECUTOR_STATS, executorStats);
+ret.put(UPTIME, uptime);
+ret.put(TIME_SECS, Time.currentTimeSecs());
+
+return ret;
+}
+
+private static Number getByKeyOr0(Map m, String k) {
+if (m == null) {
+return 0;
+}
+
+Number n = (Number) m.get(k);
+if (n == null) {
+return 0;
+}
+return n;
+}
+
+/**
+ * Get a sub-map by a given key.
+ * @param map the original map
+ * @param key the key to get it from.
+ * @return the map stored under key.
+ */
+public static  Map getMapByKey(Map map, String key) {
+if (map == null) {
+return null;
+}
+return (Map) map.get(key);
+}
+
+public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map heartbeat) {
+ClusterWorkerHeartbeat ret = new 

Re: Regarding releasing Apache Storm 2.0.0

2018-09-17 Thread Bobby Evans
I think we are really close on this and I would love to see us get an RC
out ASAP.

We are still missing some things that Stig called out.

https://github.com/apache/storm/pull/2719 has a build issue, not sure if we
need to make an alternative patch or not.
https://github.com/apache/storm/pull/2800  has a newer alternative patch
https://github.com/apache/storm/pull/2836 please take a look.
https://github.com/apache/storm/pull/2805 has some merge conflicts
currently, but everyone please take a chance to review it.

Thanks,

Bobby

On Fri, Sep 14, 2018 at 2:57 AM Jungtaek Lim  wrote:

> I have sought the name of client artifact from some of streaming
> frameworks. Please refer below:
>
> Spark: spark-core
> Kafka: kafka-clients
> Flink: flink-clients
> Heron: heron-api
>
> Based on divergence, I don't see the reason "storm-core" is the only name
> which avoid confusion. Actually, if my understanding is right, we need to
> let end users including "storm-server" when running local cluster, then
> "storm-core" vs "storm-server" would give real confusion. I guess we
> already discussed about the naming, and given that we don't rename it we
> are OK with renamed artifacts.
>
> 2018년 9월 14일 (금) 오후 4:07, Roshan Naik 님이
> 작성:
>
> >  Happy to see consensus in moving fwd with 2.0 soon.
> > I will try to get a minor patch (STORM-3205) within 24 hours ... as it
> > seems like it has potential to deliver a decent perf boost and energy
> > savings.
> > One thing I am hoping we can address before releasing Storm 2 is... to
> fix
> > the naming of the storm-client.jar.  Its such a core jar really, it
> should
> > have been really called storm-core or something like that... but
> > unfortunately we already have another jar with that name.  Retaining the
> > 'client' name for this new jar would be confusing and give wrong
> > impressions to users and any new devs IMO.
> > -roshan
> >
> > On Thursday, September 13, 2018, 2:12:40 PM PDT, Govind Menon
> >  wrote:
> >
> >  STORM-3217 and STORM-3221 have been fixed - +1 from me for 2.0 RC.
> >
> > On Wed, Sep 12, 2018 at 10:01 AM Govind Menon  wrote:
> >
> > > Hi all,
> > >
> > > There are some regressions that I introduced as part of STORM-1311
> which
> > > I'm working on as part of
> > https://issues.apache.org/jira/browse/STORM-3217
> > > and https://issues.apache.org/jira/browse/STORM-3221. These should be
> > > fixed before a 2.x release
> > >
> > > I have code working on the Yahoo internal branch and should have PRs up
> > > for them in community soon.
> > >
> > > I apologize for slowing things up.
> > >
> > > Thanks,
> > > Govind.
> > >
> > > On Tue, Sep 11, 2018 at 3:31 PM Arun Mahadevan 
> wrote:
> > >
> > >> +1 for releasing 2.0.
> > >>
> > >> May be the RC can be cut once critical patches are merged.
> > >>
> > >> On Tue, 11 Sep 2018 at 10:28, Stig Rohde Døssing <
> > stigdoess...@gmail.com>
> > >> wrote:
> > >>
> > >> > +1 to cut an RC.
> > >> >
> > >> > Here are a couple of PRs that could maybe go in
> > >> >
> > >> > https://github.com/apache/storm/pull/2719
> > >> > https://github.com/apache/storm/pull/2800 (this one requires some
> > >> changes,
> > >> > but we should be able to fix it pretty quickly)
> > >> > also would like to get https://github.com/apache/storm/pull/2805
> > >> reviewed,
> > >> > it might change some public methods.
> > >> >
> > >> > Other than that, we should try to remove as much deprecated code as
> we
> > >> can
> > >> > before release
> > >> >
> > >> > https://issues.apache.org/jira/browse/STORM-2947
> > >> >
> > >> > Den man. 10. sep. 2018 kl. 21.59 skrev Alexandre Vermeerbergen <
> > >> > avermeerber...@gmail.com>:
> > >> >
> > >> > > +1 for an Storm 2.0 as soon as possible, let's jump into the
> future
> > :)
> > >> > > Le lun. 10 sept. 2018 à 21:50, Kishorkumar Patil
> > >> > >  a écrit :
> > >> > > >
> > >> > > > Looking into all issues reported under epic
> > >> > > > https://issues.apache.org/jira/browse/STORM-2714 are
> > >> resolved/closed.
> > >> > I
> > >> > > > don't see any open issues/blockers at this point for going ahead
> > >> with
> > >> > 2.x
> > >> > > > release.
> > >> > > >
> > >> > > > I am +1 to 2.0 release.
> > >> > > >
> > >> > > > Regards,
> > >> > > > -Kishor
> > >> > > >
> > >> > > > On Mon, Sep 10, 2018 at 2:24 PM, P. Taylor Goetz <
> > ptgo...@gmail.com
> > >> >
> > >> > > wrote:
> > >> > > >
> > >> > > > > I agree, and looking through the JIRAs against 2.0, I would
> say
> > a
> > >> > > majority
> > >> > > > > of the ones marked critical are not critical.
> > >> > > > >
> > >> > > > > I’m +1 on moving forward with a 2.0 release, but will give
> > others
> > >> > time
> > >> > > to
> > >> > > > > respond with any JIRAs they think should be included.
> > >> > > > >
> > >> > > > > > p.s. I don't want to create branch-2.x or branch-2.0.x until
> > >> > > absolutely
> > >> > > > > > necessary, I don't see any major features with pull requests
> > up
> > >> but
> > >> > > if
> > >> > > > > you
> > >> > > > >