[GitHub] storm issue #2365: [STORM-2773]If a drpcserver node in cluster is down,drpc ...

2017-10-26 Thread liu-zhaokun
Github user liu-zhaokun commented on the issue:

https://github.com/apache/storm/pull/2365
  
@revans2 
Do you mean there no need to reconnect when I create a DRPCInvocationClient 
instance?


---


[GitHub] storm pull request #2381: STORM-2784: storm-kafka-client KafkaTupleListener ...

2017-10-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2381


---


[GitHub] storm pull request #2363: STORM-2759: Let users indicate if a blob should re...

2017-10-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2363


---


[GitHub] storm issue #2363: STORM-2759: Let users indicate if a blob should restart a...

2017-10-26 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2363
  
OK. Thanks for quick addressing. +1 again.


---


[GitHub] storm issue #2386: [STORM-1927] upgrade to jetty 9

2017-10-26 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2386
  
Thanks @revans2  for pointing it out. I made stupid mistakes.  I am still 
fixing it.  Please don't merge yet.


---


[GitHub] storm issue #2379: STORM-2782 - refactor partial key grouping to make it mor...

2017-10-26 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2379
  
@kevpeek yes the JVM can be very dumb when it comes to arrays vs Lists.

That sounds great if we can be within a few percentage points I would be 
happy to merge this in.


---


[GitHub] storm issue #2379: STORM-2782 - refactor partial key grouping to make it mor...

2017-10-26 Thread kevpeek
Github user kevpeek commented on the issue:

https://github.com/apache/storm/pull/2379
  
heh. It crossed my mind to change those to arrays, but I figured the jvm 
would render the difference negligible. A quick test proves that I was very 
wrong. I'll tweak that tomorrow and run some more legitimate performance 
numbers, but it looks like that change alone closes the gap to just a few 
percent.


---


[GitHub] storm issue #2365: [STORM-2773]If a drpcserver node in cluster is down,drpc ...

2017-10-26 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2365
  
@liu-zhaokun You are correct I had forgotten the ThriftClient establishes 
the connection then initializes the _protocol member and then finally the 
Client itself is created which will read from the protocol.

The solution is that we need a way to not call reconnect from the 
constructor of ThriftClient.  We have that in a few cases for testing/local mode


https://github.com/liu-zhaokun/storm/blob/d92f1a9c8d7442d4959fec57813fc5de42b179a9/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java#L73-L75

 But I think we want another parameter to the constructor to say don't do 
this, so we can try and connect ourselves and if it fails we can ignore it and 
go on.



---


[GitHub] storm issue #2379: STORM-2782 - refactor partial key grouping to make it mor...

2017-10-26 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2379
  
@kevpeek I agree that it might not be worth merging it right now.  But 
there is room for improvement.

The first thing I would do is to pre-allocate the possible return values as 
lists.  We should avoid as much memory allocation as possible within 
chooseTasks.

assignmentCreator.createAssignment is also returning a newly created 
object.  We could at least turn that into an int array, so there is only one 
object allocated instead of 3.

You might also consider refactoring createAssignment to take a reference to 
targetSelector so you don't need to create a class that is an intermediary 
between the two.  Just have targetSelector take two ints and return one of 
them, no object allocation at all.

If you are still having performance issues after that I can probably help 
with some profiling.  


---


[GitHub] storm issue #2363: STORM-2759: Let users indicate if a blob should restart a...

2017-10-26 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2363
  
@HeartSaVioR I addressed your review comments.  I didn't change the name of 
shouldLogLeader, but I added javadocs to make it clear what it does.


---


[GitHub] storm issue #2386: [STORM-1927] upgrade to jetty 9

2017-10-26 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2386
  
I am getting the UI crashing on startup, but with no stacktrace in the 
logs.  Could you look into it?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147236780
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -664,6 +722,9 @@ private double calculateSharedOffHeapMemory(String 
nodeId, SchedulerAssignmentIm
 
 private double calculateSharedOffHeapMemory(
 String nodeId, SchedulerAssignmentImpl assignment, ExecutorDetails 
extra) {
+if (assignment == null) {
--- End diff --

When is the assignment null???  Why would we try to calculate the shared 
off heap memory for no assignment???


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147237164
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---
@@ -168,6 +172,15 @@ private void initResourceList() {
 // topology.getbolt (AKA sys tasks most specifically __acker tasks)
 for (ExecutorDetails exec : getExecutors()) {
 if (!resourceList.containsKey(exec)) {
+// TODO: Update this debug statement
--- End diff --

Either update it or remove the TODO


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147234270
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -240,6 +240,12 @@
 public static final String TOPOLOGY_TASKS = "topology.tasks";
 
 /**
+ * A map of resources used by each component e.g {"cpu" : 200.0. 
"onheap.memory.mb": 256.0, "gpu" : 0.5 }
+ */
+@isMapEntryType(keyType = String.class, valueType = Double.class)
--- End diff --

It is really hard to enforce a double in the conf.  This is because both 
json and yaml don't know about this and put the parsed number in the smallest 
type that matches it.  i.e 1 becomes an integer, not a double.  1.0 might be a 
double, but it might also be a float, I dont' know.  If the number is really 
big it could be a long.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147234853
  
--- Diff: storm-client/src/jvm/org/apache/storm/Constants.java ---
@@ -56,5 +58,20 @@
 public static final String STORM_ACTIVE_ATOM = "storm-active-atom";
 public static final String COMPONENT_TO_DEBUG_ATOM = 
"storm-component->debug-atom";
 public static final Object LOAD_MAPPING = "load-mapping";
+
+public static final String COMMON_CPU_RESOURCE_NAME = 
"cpu.pcore.percent";
+public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = 
"onheap.memory.mb";
+public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = 
"offheap.memory.mb";
+public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = 
"memory.mb";
--- End diff --

Can we document these in some place? For example the configs show some 
examples, but they are not exhaustive, and the rebalance command is really bad 
at this so a user has no idea what to put in as the key.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147233694
  
--- Diff: 
examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java 
---
@@ -430,37 +427,37 @@ public static void main(String[] args) throws 
Exception {
 return topologyResources;
 }
 
-static void checkIntialization(Map topologyResources, 
String com,
-  Map 
topologyConf) {
-checkInitMem(topologyResources, com, topologyConf);
-checkInitCpu(topologyResources, com, topologyConf);
-}
+/**
+ * Checks if the topology's resource requirements are initialized.
+ * @param topologyResources map of resouces requirements
+ * @param componentId component for which initialization is being 
conducted
+ * @param topologyConf topology configuration
+ * @throws Exception on any error
+ */
+public static void checkInitialization(Map 
topologyResources, String componentId, Map topologyConf) {
+StringBuilder msgBuilder = new StringBuilder();
 
-static void checkInitMem(Map topologyResources, String 
com,
- Map topologyConf) {
-if 
(!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB))
 {
-Double onHeap = ObjectReader.getDouble(
-
topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
-if (onHeap != null) {
-
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
onHeap);
-}
+for (String resourceName : topologyResources.keySet()) {
+msgBuilder.append(checkInitResource(topologyResources, 
topologyConf, resourceName));
 }
-if 
(!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB))
 {
-Double offHeap = ObjectReader.getDouble(
-
topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
-if (offHeap != null) {
-
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
offHeap);
-}
+
+if (msgBuilder.length() > 0) {
+String resourceDefaults = msgBuilder.toString();
+LOG.debug(
+"Unable to extract resource requirement for Component 
{} \n Resources : {}",
+componentId, resourceDefaults);
 }
 }
 
-static void checkInitCpu(Map topologyResources, String 
com,
- Map topologyConf) {
-if 
(!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
-Double cpu = 
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
 null);
-if (cpu != null) {
-
topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu);
-}
+private static String checkInitResource(Map 
topologyResources, Map topologyConf, String resourceName) {
+StringBuilder msgBuilder = new StringBuilder();
+if (topologyResources.containsKey(resourceName)) {
+Double resourceValue = (Double) 
topologyConf.getOrDefault(resourceName, null);
--- End diff --

I think you missed something in the translation.  I think it should be more 
like

```
if (topologyResources.containsKey(resourceName)) {
Double resourceValue = 
ObjectReader.getDouble(topologyConf.get(resourceName), null);
if (resourceValue != null) {
topologyResources.put(resourceName, resourceValue);
}
...
```


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147235680
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java 
---
@@ -79,8 +81,18 @@ public T setMemoryLoad(Number onHeap, Number offHeap) {
 @Override
 public T setCPULoad(Number amount) {
 if(amount != null) {
+addResource(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 
amount);
 return 
addConfiguration(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount);
 }
 return (T) this;
 }
+
+@SuppressWarnings("unchecked")
+@Override
+public T addResources(Map resources) {
+if(resources != null) {
--- End diff --

nit: you need a space after the if.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147237442
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
 ---
@@ -189,13 +201,20 @@ private static void checkInitCpu(Map 
topologyResources, String c
 null);
 
topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
 }
-LOG.debug("Topology Resources {}", topologyResources);
+
+// If resource is also present in resources map will 
overwrite the above
+if 
(jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+topologyResources.putAll((Map) 
jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP));
+}
+LOG.info("Topology Resources {}", topologyResources);
 }
 } catch (ParseException e) {
 LOG.error("Failed to parse component resources is:" + 
e.toString(), e);
 return null;
 }
-return topologyResources;
+LOG.info("Topology Resources {}", 
normalizedResourceMap(topologyResources));
+LOG.info("Topology Resources {}", 
normalizedResourceMap(topologyResources));
--- End diff --

Why do we need to log this twice?  or at all?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147236218
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -503,62 +553,70 @@ public boolean wouldFit(
 WorkerSlot ws,
 ExecutorDetails exec,
 TopologyDetails td,
-double maxHeap,
-double memoryAvailable,
-double cpuAvailable) {
-//NOTE this is called lots and lots by schedulers, so anything we 
can do to make it faster is going to help a lot.
-//CPU is simplest because it does not have odd interactions.
-double cpuNeeded = td.getTotalCpuReqTask(exec);
-if (cpuNeeded > cpuAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough CPU 
{} > {}",
-td.getName(),
-exec,
-ws,
-cpuNeeded,
-cpuAvailable);
+Map resourcesAvailable,
+double maxHeap) {
+
+Map requestedResources = 
td.getTotalResources(exec);
+
+LOG.info(td.getName());
+LOG.info("requested");
+LOG.info(requestedResources.toString());
+LOG.info("available");
+LOG.info(resourcesAvailable.toString());
+LOG.info(ws.toString());
--- End diff --

I think this is debug logging.  Can we please remove it?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147235417
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java ---
@@ -17,12 +17,10 @@
  */
 package org.apache.storm.scheduler;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Map;
+import java.util.*;
--- End diff --

I think .* is against the coding standard.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147236953
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -993,6 +1054,31 @@ public WorkerResources getWorkerResources(WorkerSlot 
ws) {
 }
 
 @Override
+public Map getAllScheduledResourcesForNode(String 
nodeId) {
+Map totalScheduledResources = new HashMap<>();
+for (SchedulerAssignmentImpl assignment : assignments.values()) {
+for (Entry entry :
+assignment.getScheduledResources().entrySet()) {
+if (nodeId.equals(entry.getKey().getNodeId())) {
+WorkerResources resources = entry.getValue();
+for (Map.Entry resourceEntry : 
resources.get_resources().entrySet()) {
--- End diff --

Could you please add in the types for the Map.Entry?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147237298
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
 ---
@@ -18,22 +18,24 @@
 
 package org.apache.storm.scheduler.resource;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
--- End diff --

again lets avoid .* imports


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147235287
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/generated/WorkerResources.java ---
@@ -1,21 +1,4 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
  * Autogenerated by Thrift Compiler (0.9.3)
--- End diff --

What happened to the license header?  Please run the shell script to 
generate the thrift code instead of doing it on your own.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147236417
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -503,62 +553,70 @@ public boolean wouldFit(
 WorkerSlot ws,
 ExecutorDetails exec,
 TopologyDetails td,
-double maxHeap,
-double memoryAvailable,
-double cpuAvailable) {
-//NOTE this is called lots and lots by schedulers, so anything we 
can do to make it faster is going to help a lot.
-//CPU is simplest because it does not have odd interactions.
-double cpuNeeded = td.getTotalCpuReqTask(exec);
-if (cpuNeeded > cpuAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough CPU 
{} > {}",
-td.getName(),
-exec,
-ws,
-cpuNeeded,
-cpuAvailable);
+Map resourcesAvailable,
+double maxHeap) {
+
+Map requestedResources = 
td.getTotalResources(exec);
+
+LOG.info(td.getName());
+LOG.info("requested");
+LOG.info(requestedResources.toString());
+LOG.info("available");
+LOG.info(resourcesAvailable.toString());
+LOG.info(ws.toString());
+for (Entry resourceNeededEntry : requestedResources.entrySet()) {
+String resourceName = resourceNeededEntry.getKey().toString();
+if (resourceName == 
Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == 
Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) {
+continue;
 }
-//Not enough CPU no need to try any more
-return false;
-}
-
-//Lets see if we can make the Memory one fast too, at least in the 
failure case.
-//The totalMemReq is not really that accurate because it does not 
include shared memory, but if it does not fit we know
-// Even with shared it will not work
-double minMemNeeded = td.getTotalMemReqTask(exec);
-if (minMemNeeded > memoryAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough Mem 
{} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
+Double resourceNeeded = 
ObjectReader.getDouble(resourceNeededEntry.getValue());
+Double resourceAvailable = ObjectReader.getDouble(
+resourcesAvailable.getOrDefault(resourceName, null), 
0.0);
+if (resourceNeeded > resourceAvailable) {
+if (true) {
--- End diff --

I think this is a debug logging change.  Do you want to change it back?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147234354
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -1272,6 +1278,12 @@
 public static final String SUPERVISOR_CPU_CAPACITY = 
"supervisor.cpu.capacity";
 
 /**
+ * A map of resources the Supervisor has e.g {"cpu" : 200.0. 
"memory.capacity.mb": 256.0, "gpu" : 0.5 }
+ */
+@isMapEntryType(keyType = String.class, valueType = Double.class)
--- End diff --

Same comment here.  Trying to force the type to be a double is going to be 
difficult with how we use the configs.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147236577
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -503,62 +553,70 @@ public boolean wouldFit(
 WorkerSlot ws,
 ExecutorDetails exec,
 TopologyDetails td,
-double maxHeap,
-double memoryAvailable,
-double cpuAvailable) {
-//NOTE this is called lots and lots by schedulers, so anything we 
can do to make it faster is going to help a lot.
-//CPU is simplest because it does not have odd interactions.
-double cpuNeeded = td.getTotalCpuReqTask(exec);
-if (cpuNeeded > cpuAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough CPU 
{} > {}",
-td.getName(),
-exec,
-ws,
-cpuNeeded,
-cpuAvailable);
+Map resourcesAvailable,
+double maxHeap) {
+
+Map requestedResources = 
td.getTotalResources(exec);
+
+LOG.info(td.getName());
+LOG.info("requested");
+LOG.info(requestedResources.toString());
+LOG.info("available");
+LOG.info(resourcesAvailable.toString());
+LOG.info(ws.toString());
+for (Entry resourceNeededEntry : requestedResources.entrySet()) {
+String resourceName = resourceNeededEntry.getKey().toString();
+if (resourceName == 
Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == 
Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) {
+continue;
 }
-//Not enough CPU no need to try any more
-return false;
-}
-
-//Lets see if we can make the Memory one fast too, at least in the 
failure case.
-//The totalMemReq is not really that accurate because it does not 
include shared memory, but if it does not fit we know
-// Even with shared it will not work
-double minMemNeeded = td.getTotalMemReqTask(exec);
-if (minMemNeeded > memoryAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough Mem 
{} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
+Double resourceNeeded = 
ObjectReader.getDouble(resourceNeededEntry.getValue());
+Double resourceAvailable = ObjectReader.getDouble(
+resourcesAvailable.getOrDefault(resourceName, null), 
0.0);
+if (resourceNeeded > resourceAvailable) {
+if (true) {
+LOG.info("Could not schedule {}:{} on {} not enough {} 
{} > {}",
+td.getName(),
+exec,
+ws,
+resourceName,
+resourceNeeded,
+resourceAvailable);
+}
+//Not enough resources - stop trying
+return false;
 }
-//Not enough minimum MEM no need to try any more
-return false;
 }
 
 double currentTotal = 0.0;
 double afterTotal = 0.0;
 double afterOnHeap = 0.0;
+
 Set wouldBeAssigned = new HashSet<>();
 wouldBeAssigned.add(exec);
 SchedulerAssignmentImpl assignment = assignments.get(td.getId());
+
 if (assignment != null) {
 Collection currentlyAssigned = 
assignment.getSlotToExecutors().get(ws);
 if (currentlyAssigned != null) {
 wouldBeAssigned.addAll(currentlyAssigned);
 WorkerResources wrCurrent = calculateWorkerResources(td, 
currentlyAssigned);
 currentTotal = wrCurrent.get_mem_off_heap() + 
wrCurrent.get_mem_on_heap();
 }
-WorkerResources wrAfter = calculateWorkerResources(td, 
wouldBeAssigned);
-afterTotal = wrAfter.get_mem_off_heap() + 
wrAfter.get_mem_on_heap();
-afterOnHeap = wrAfter.get_mem_on_heap();
-
-currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), 
assignment);
-afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), 
assignment, exec);
 }
+WorkerResources wrAfter = calculateWorkerResources(td, 
wouldBeAssigned);
+afterTotal = wrAfter.get_mem_off_heap() + 
wrAfter.get_mem_on_heap();
+afterOnHeap = wrAfter.get_mem_on_heap();
+
+currentTotal += calculateSharedOffHeapMemory(ws.get

[GitHub] storm pull request #2363: STORM-2759: Let users indicate if a blob should re...

2017-10-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2363#discussion_r147159038
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java ---
@@ -86,7 +86,22 @@ public static NimbusClient 
getConfiguredClient(Map conf) {
 public static NimbusClient getConfiguredClient(Map 
conf, Integer timeout) {
 return getConfiguredClientAs(conf, null, timeout);
 }
-
+
+private static String oldLeader = "";
+
+private static synchronized boolean shouldLogLeader(String leader) {
+if (LOG.isDebugEnabled()) {
+//If debug logging is turned on we shoudl just log the leader 
all the time
--- End diff --

nit: shoudl -> should


---


[GitHub] storm pull request #2363: STORM-2759: Let users indicate if a blob should re...

2017-10-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2363#discussion_r145321214
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java ---
@@ -86,7 +86,18 @@ public static NimbusClient 
getConfiguredClient(Map conf) {
 public static NimbusClient getConfiguredClient(Map 
conf, Integer timeout) {
 return getConfiguredClientAs(conf, null, timeout);
 }
-
+
+private static String oldLeader = "";
+
+private static synchronized boolean shouldLogLeader(String leader) {
--- End diff --

Just 2 cents, I know the class field is only for this class method, but the 
class method name doesn't give impression of side effect. Maybe better to 
rename this to reflect its purpose better, like `updateLeaderNimbus` or so.


---


[GitHub] storm pull request #2363: STORM-2759: Let users indicate if a blob should re...

2017-10-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2363#discussion_r145322043
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
 ---
@@ -69,11 +69,22 @@ public static void rmrAsUser(Map conf, 
String id, String path) t
  * @param blobInfo
  * @return
  */
-public static Boolean shouldUncompressBlob(Map 
blobInfo) {
+public static boolean shouldUncompressBlob(Map 
blobInfo) {
 return ObjectReader.getBoolean(blobInfo.get("uncompress"), false);
 }
 
 /**
+ * Given the blob information returns the value of the workerRestart 
field, handling it either being a string or a boolean value, or
+ * if it's not specified then returns false
+ *
+ * @param blobInfo
+ * @return
+ */
+public static boolean needsCallback(Map blobInfo) {
--- End diff --

More detailed name would be better, like `blobNeedsWorkerRestart` or 
`isBlobRequiredWorkerRestart`, or so on. From method name I can't see which 
action this is about to do. Callback is too general.


---


[GitHub] storm issue #2365: [STORM-2773]If a drpcserver node in cluster is down,drpc ...

2017-10-26 Thread liu-zhaokun
Github user liu-zhaokun commented on the issue:

https://github.com/apache/storm/pull/2365
  
@revans2 
I try to fix this bug follow your advice,but I found the original bug 
occurred in line 41 
[https://github.com/liu-zhaokun/storm/blob/d92f1a9c8d7442d4959fec57813fc5de42b179a9/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java#L41](https://github.com/liu-zhaokun/storm/blob/d92f1a9c8d7442d4959fec57813fc5de42b179a9/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java#L41)
 drpcinvocationclient wan't to establish connection with the thrift server.And 
I am not sure we should fix this bug in line 41.Could you give me some 
suggestion?Thanks.


---