[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

2017-12-18 Thread jerrypeng
Github user jerrypeng commented on the issue:

https://github.com/apache/storm/pull/2442
  
In terms of the configs, we may want to explore splitting the Configs into 
multiple pieces.  Config.java has become a monolith in a sense. Configs.java in 
the storm-client module should probably only contain configs users can set in 
their topology.


---


[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

2017-12-18 Thread jerrypeng
Github user jerrypeng commented on the issue:

https://github.com/apache/storm/pull/2442
  
Generally look good just some minor comments.  @revans2 you can also 
implement a version of the algorithm using heap space in the JVM instead of 
stack stack space so you wouldn't need to tinker with the -Xss settings.  May 
simplify operations.


---


[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-18 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2465
  
@srdo it should be good now. Can you please take a look. Thanks.


---


[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

2017-12-18 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/2442#discussion_r157680068
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
 ---
@@ -129,7 +129,15 @@ public SchedulingResult schedule(Cluster cluster, 
TopologyDetails td) {
 protected TreeSet sortObjectResources(
 final AllResources allResources, ExecutorDetails exec, 
TopologyDetails topologyDetails,
 final ExistingScheduleFunc existingScheduleFunc) {
+return sortObjectResourcesImpl(allResources, exec, 
topologyDetails, existingScheduleFunc);
+}
 
+/**
+ * Implelemtation of the sortObjectResources method so other 
strategies can reuse it.
+ */
+public static TreeSet sortObjectResourcesImpl(
--- End diff --

I would suggest putting this in some sort of Utility class.  Its kind of 
awkward for the ConstraintSovlerStrategy to call a static method in the 
GenericResourceAwareStrategy


---


[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

2017-12-18 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/2442#discussion_r157679506
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
 ---
@@ -0,0 +1,623 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.Stack;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RAS_Node;
+import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingStatus;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
+private static final Logger LOG = 
LoggerFactory.getLogger(ConstraintSolverStrategy.class);
+
+protected static class SolverResult {
+private final int statesSearched;
+private final boolean success;
+private final long timeTakenMillis;
+private final int backtracked;
+
+public SolverResult(SearcherState state, boolean success) {
+this.statesSearched = state.getStatesSearched();
+this.success = success;
+timeTakenMillis = Time.currentTimeMillis() - 
state.startTimeMillis;
+backtracked = state.numBacktrack;
+}
+
+public SchedulingResult asSchedulingResult() {
+if (success) {
+return SchedulingResult.success("Fully Scheduled by 
ConstraintSolverStrategy (" + statesSearched
++ " states traversed in " + timeTakenMillis + "ms, 
backtracked " + backtracked + " times)");
+}
+return 
SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
+"Cannot find scheduling that satisfies all constraints (" 
+ statesSearched
++ " states traversed in " + timeTakenMillis + "ms, 
backtracked " + backtracked + " times)");
+}
+}
+
+protected static class SearcherState {
+// Metrics
+// How many states searched so far.
+private int statesSearched = 0;
+// Number of times we had to backtrack.
+private int numBacktrack = 0;
+final long startTimeMillis;
+private final long maxEndTimeMs;
+
+// Current state
+// The current executor we are trying to schedule
+private int execIndex = 0;
+// A map of the worker to the components in the worker to be able 
to enforce constraints.
+private final Map> workerCompAssignment;
+private final boolean[] okToRemoveFromWorker;
+// for the currently tested assignment a Map of the node to the 
components on it to be able to enforce constraints
+private final Map> nodeCompAssignment;
+private final boolean[] okToRemoveFromNode;
+
+// Static State
+// The list of all executors (preferably sorted to make 
assignmen

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

2017-12-18 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/2442#discussion_r157679086
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
 ---
@@ -0,0 +1,623 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.Stack;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RAS_Node;
+import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingStatus;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
+private static final Logger LOG = 
LoggerFactory.getLogger(ConstraintSolverStrategy.class);
+
+protected static class SolverResult {
+private final int statesSearched;
+private final boolean success;
+private final long timeTakenMillis;
+private final int backtracked;
+
+public SolverResult(SearcherState state, boolean success) {
+this.statesSearched = state.getStatesSearched();
+this.success = success;
+timeTakenMillis = Time.currentTimeMillis() - 
state.startTimeMillis;
+backtracked = state.numBacktrack;
+}
+
+public SchedulingResult asSchedulingResult() {
+if (success) {
+return SchedulingResult.success("Fully Scheduled by 
ConstraintSolverStrategy (" + statesSearched
++ " states traversed in " + timeTakenMillis + "ms, 
backtracked " + backtracked + " times)");
+}
+return 
SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
+"Cannot find scheduling that satisfies all constraints (" 
+ statesSearched
++ " states traversed in " + timeTakenMillis + "ms, 
backtracked " + backtracked + " times)");
+}
+}
+
+protected static class SearcherState {
+// Metrics
+// How many states searched so far.
+private int statesSearched = 0;
+// Number of times we had to backtrack.
+private int numBacktrack = 0;
+final long startTimeMillis;
+private final long maxEndTimeMs;
+
+// Current state
+// The current executor we are trying to schedule
+private int execIndex = 0;
+// A map of the worker to the components in the worker to be able 
to enforce constraints.
+private final Map> workerCompAssignment;
+private final boolean[] okToRemoveFromWorker;
+// for the currently tested assignment a Map of the node to the 
components on it to be able to enforce constraints
+private final Map> nodeCompAssignment;
+private final boolean[] okToRemoveFromNode;
+
+// Static State
+// The list of all executors (preferably sorted to make 
assignmen

[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy

2017-12-18 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/2442#discussion_r157677523
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java 
---
@@ -98,7 +98,7 @@ public RoundRobinSlotScheduler(TopologyDetails td, int 
slotsToUse,
   }
   
   _spreadToSchedule = new HashMap<>();
-  List spreadComps = 
(List)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
+  List spreadComps = 
(List)td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_SPREAD_COMPONENTS);
--- End diff --

I don't know about changing TOPOLOGY_SPREAD_COMPONENTS to 
TOPOLOGY_RAS_CONSTRAINT_SPREAD_COMPONENTS.  It seems kind of weird to me that 
the multitenant scheduler would have a config that references RAS.


---


[GitHub] storm issue #2468: [STORM-2690] resurrect invocation of ISupervisor.assigned...

2017-12-18 Thread erikdw
Github user erikdw commented on the issue:

https://github.com/apache/storm/pull/2468
  
@revans2 : can you please review this when you have a chance?  Notably we 
mostly need this back in the 
[1.0.x-branch](https://github.com/apache/storm/tree/1.0.x-branch), since (as 
[we 
discussed](https://github.com/mesos/storm/issues/222#issuecomment-352514556)) 
storm 1.1+ is fundamentally incompatible with storm-mesos at this point.


---


[GitHub] storm pull request #2461: [STORM-2857] Loosen some constraints to support ru...

2017-12-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2461


---


[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API

2017-12-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2203
  
@ptgoetz OK got it. Maybe we could reconsider if separation of stream ID is 
not really needed.
@revans2 Could you take a look at this again?


---


[GitHub] storm pull request #2468: [STORM-2690] resurrect invocation of ISupervisor.a...

2017-12-18 Thread erikdw
GitHub user erikdw opened a pull request:

https://github.com/apache/storm/pull/2468

[STORM-2690] resurrect invocation of ISupervisor.assigned() & make 
Supervisor.launchDaemon() accessible

This commit fixes the storm-mesos integration for the interaction between 
the Storm core's Supervisor daemon and the MesosSupervisor that implements the 
ISupervisor interface.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/erikdw/storm STORM-2690

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2468.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2468


commit 5e3c966b23bc3b6d924a52ea62a32b194978af24
Author: Erik Weathers 
Date:   2017-12-19T04:41:35Z

[STORM-2690] resurrect invocation of ISupervisor.assigned() & make 
Supervisor.launchDaemon() accessible

This commit fixes the storm-mesos integration for the interaction between 
the Storm core's Supervisor daemon
and the MesosSupervisor that implements the ISupervisor interface.




---


[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

2017-12-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2442
  
@revans2 Thanks for the quick fix. I can see you've addressed my comment. 
Btw, I didn't see actual patch so would take time to review the code.

@jerrypeng I think you've got context of the codebase, then could you help 
reviewing the change? Thanks in advance.


---


[GitHub] storm pull request #2457: STORM-2854 Expose IEventLogger to make event loggi...

2017-12-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2457


---


[GitHub] storm pull request #2458: (1.x) STORM-2854 Expose IEventLogger to make event...

2017-12-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2458


---


[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

2017-12-18 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2442
  
@HeartSaVioR I rebased it and made the changes.  My updates are in the last 
commit f5e9532


---


[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

2017-12-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2442
  
@revans2 No problem. Please take your time.


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r157610099
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -225,6 +237,23 @@ private long doSeek(TopicPartition tp, 
OffsetAndMetadata committedOffset) {
 }
 }
 
+/**
+ * Checks If {@link OffsetAndMetadata} was committed by this topology, 
either by this or another spout instance.
+ * This info is used to decide if {@link FirstPollOffsetStrategy} 
should be applied
+ *
+ * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
+ * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
+ */
+private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata 
committedOffset) {
+try {
+return committedOffset != null && 
JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpoutMessageId.class)
--- End diff --

It returns false if it throws exception when attempting to deserialize 
OffsetAndMetadata committed using a previous version of Storm.


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r157543286
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -225,6 +243,25 @@ private long doSeek(TopicPartition tp, 
OffsetAndMetadata committedOffset) {
 }
 }
 
+/**
+ * Checks If {@link OffsetAndMetadata} was committed by an instance of 
{@link KafkaSpout} in this topology.
+ * This info is used to decide if {@link FirstPollOffsetStrategy} 
should be applied
+ *
+ * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
+ * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
+ */
+private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata 
committedOffset) {
+try {
+final KafkaSpout.Info info = 
JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpout.Info.class);
+return info.getTopologyId().equals(context.getStormId());
+} catch (IOException e) {
+LOG.trace("Failed to deserialize {}. Error likely occurred 
because the last commit " +
--- End diff --

Sure, but we're still deserializing the metadata on every emit. I think we 
can very likely get away with deserializing once per commit, or even once per 
spout activation, since we don't need to serialize + deserialize if we only 
update the metadata when committing.


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r157537266
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 ---
@@ -140,10 +140,16 @@ public OffsetAndMetadata findNextCommitOffset() {
 
 OffsetAndMetadata nextCommitOffsetAndMetadata = null;
 if (found) {
-nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset,
-nextCommitMsg.getMetadata(Thread.currentThread()));
+try {
+nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset,
+JSON_MAPPER.writeValueAsString(new 
KafkaSpout.Info(Thread.currentThread(), context)));
--- End diff --

Will do.


---


[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...

2017-12-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2465#discussion_r157537115
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -225,6 +243,25 @@ private long doSeek(TopicPartition tp, 
OffsetAndMetadata committedOffset) {
 }
 }
 
+/**
+ * Checks If {@link OffsetAndMetadata} was committed by an instance of 
{@link KafkaSpout} in this topology.
+ * This info is used to decide if {@link FirstPollOffsetStrategy} 
should be applied
+ *
+ * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
+ * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
+ */
+private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata 
committedOffset) {
+try {
+final KafkaSpout.Info info = 
JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpout.Info.class);
+return info.getTopologyId().equals(context.getStormId());
+} catch (IOException e) {
+LOG.trace("Failed to deserialize {}. Error likely occurred 
because the last commit " +
--- End diff --

The log messages only get printed in the case of a commit made by a 
topology running an older version of Storm and until the first commit is done 
by this topology for each topic-partition. Ater that the first commit logs will 
no longer occur. I will look into if it makes sense caching this info.


---


[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy

2017-12-18 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2442
  
@HeartSaVioR sorry I have been out for the past 2 weeks.  I will update the 
configs accordingly.


---


[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-18 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2465
  
@hmcl There were a few more comments, just making sure they weren't missed.

https://github.com/apache/storm/pull/2465#discussion_r157350384

https://github.com/apache/storm/pull/2465#discussion_r157375639

https://github.com/apache/storm/pull/2465#discussion_r157375809


---