[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2442 In terms of the configs, we may want to explore splitting the Configs into multiple pieces. Config.java has become a monolith in a sense. Configs.java in the storm-client module should probably only contain configs users can set in their topology. ---
[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on the issue: https://github.com/apache/storm/pull/2442 Generally look good just some minor comments. @revans2 you can also implement a version of the algorithm using heap space in the JVM instead of stack stack space so you wouldn't need to tinker with the -Xss settings. May simplify operations. ---
[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/2465 @srdo it should be good now. Can you please take a look. Thanks. ---
[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2442#discussion_r157680068 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java --- @@ -129,7 +129,15 @@ public SchedulingResult schedule(Cluster cluster, TopologyDetails td) { protected TreeSet sortObjectResources( final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, final ExistingScheduleFunc existingScheduleFunc) { +return sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc); +} +/** + * Implelemtation of the sortObjectResources method so other strategies can reuse it. + */ +public static TreeSet sortObjectResourcesImpl( --- End diff -- I would suggest putting this in some sort of Utility class. Its kind of awkward for the ConstraintSovlerStrategy to call a static method in the GenericResourceAwareStrategy ---
[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2442#discussion_r157679506 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java --- @@ -0,0 +1,623 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.resource.strategies.scheduling; + +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.Stack; +import java.util.TreeMap; +import java.util.TreeSet; +import org.apache.storm.Config; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.ExecutorDetails; +import org.apache.storm.scheduler.SchedulerAssignment; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.resource.RAS_Node; +import org.apache.storm.scheduler.resource.RAS_Nodes; +import org.apache.storm.scheduler.resource.SchedulingResult; +import org.apache.storm.scheduler.resource.SchedulingStatus; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { +private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class); + +protected static class SolverResult { +private final int statesSearched; +private final boolean success; +private final long timeTakenMillis; +private final int backtracked; + +public SolverResult(SearcherState state, boolean success) { +this.statesSearched = state.getStatesSearched(); +this.success = success; +timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis; +backtracked = state.numBacktrack; +} + +public SchedulingResult asSchedulingResult() { +if (success) { +return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched ++ " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); +} +return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, +"Cannot find scheduling that satisfies all constraints (" + statesSearched ++ " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); +} +} + +protected static class SearcherState { +// Metrics +// How many states searched so far. +private int statesSearched = 0; +// Number of times we had to backtrack. +private int numBacktrack = 0; +final long startTimeMillis; +private final long maxEndTimeMs; + +// Current state +// The current executor we are trying to schedule +private int execIndex = 0; +// A map of the worker to the components in the worker to be able to enforce constraints. +private final Map> workerCompAssignment; +private final boolean[] okToRemoveFromWorker; +// for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints +private final Map> nodeCompAssignment; +private final boolean[] okToRemoveFromNode; + +// Static State +// The list of all executors (preferably sorted to make assignmen
[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2442#discussion_r157679086 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java --- @@ -0,0 +1,623 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.resource.strategies.scheduling; + +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.Stack; +import java.util.TreeMap; +import java.util.TreeSet; +import org.apache.storm.Config; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.ExecutorDetails; +import org.apache.storm.scheduler.SchedulerAssignment; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.resource.RAS_Node; +import org.apache.storm.scheduler.resource.RAS_Nodes; +import org.apache.storm.scheduler.resource.SchedulingResult; +import org.apache.storm.scheduler.resource.SchedulingStatus; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { +private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class); + +protected static class SolverResult { +private final int statesSearched; +private final boolean success; +private final long timeTakenMillis; +private final int backtracked; + +public SolverResult(SearcherState state, boolean success) { +this.statesSearched = state.getStatesSearched(); +this.success = success; +timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis; +backtracked = state.numBacktrack; +} + +public SchedulingResult asSchedulingResult() { +if (success) { +return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched ++ " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); +} +return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, +"Cannot find scheduling that satisfies all constraints (" + statesSearched ++ " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); +} +} + +protected static class SearcherState { +// Metrics +// How many states searched so far. +private int statesSearched = 0; +// Number of times we had to backtrack. +private int numBacktrack = 0; +final long startTimeMillis; +private final long maxEndTimeMs; + +// Current state +// The current executor we are trying to schedule +private int execIndex = 0; +// A map of the worker to the components in the worker to be able to enforce constraints. +private final Map> workerCompAssignment; +private final boolean[] okToRemoveFromWorker; +// for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints +private final Map> nodeCompAssignment; +private final boolean[] okToRemoveFromNode; + +// Static State +// The list of all executors (preferably sorted to make assignmen
[GitHub] storm pull request #2442: STORM-2837: ConstraintSolverStrategy
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/2442#discussion_r157677523 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java --- @@ -98,7 +98,7 @@ public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, } _spreadToSchedule = new HashMap<>(); - List spreadComps = (List)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS); + List spreadComps = (List)td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_SPREAD_COMPONENTS); --- End diff -- I don't know about changing TOPOLOGY_SPREAD_COMPONENTS to TOPOLOGY_RAS_CONSTRAINT_SPREAD_COMPONENTS. It seems kind of weird to me that the multitenant scheduler would have a config that references RAS. ---
[GitHub] storm issue #2468: [STORM-2690] resurrect invocation of ISupervisor.assigned...
Github user erikdw commented on the issue: https://github.com/apache/storm/pull/2468 @revans2 : can you please review this when you have a chance? Notably we mostly need this back in the [1.0.x-branch](https://github.com/apache/storm/tree/1.0.x-branch), since (as [we discussed](https://github.com/mesos/storm/issues/222#issuecomment-352514556)) storm 1.1+ is fundamentally incompatible with storm-mesos at this point. ---
[GitHub] storm pull request #2461: [STORM-2857] Loosen some constraints to support ru...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2461 ---
[GitHub] storm issue #2203: STORM-2153: New Metrics Reporting API
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2203 @ptgoetz OK got it. Maybe we could reconsider if separation of stream ID is not really needed. @revans2 Could you take a look at this again? ---
[GitHub] storm pull request #2468: [STORM-2690] resurrect invocation of ISupervisor.a...
GitHub user erikdw opened a pull request: https://github.com/apache/storm/pull/2468 [STORM-2690] resurrect invocation of ISupervisor.assigned() & make Supervisor.launchDaemon() accessible This commit fixes the storm-mesos integration for the interaction between the Storm core's Supervisor daemon and the MesosSupervisor that implements the ISupervisor interface. You can merge this pull request into a Git repository by running: $ git pull https://github.com/erikdw/storm STORM-2690 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2468.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2468 commit 5e3c966b23bc3b6d924a52ea62a32b194978af24 Author: Erik Weathers Date: 2017-12-19T04:41:35Z [STORM-2690] resurrect invocation of ISupervisor.assigned() & make Supervisor.launchDaemon() accessible This commit fixes the storm-mesos integration for the interaction between the Storm core's Supervisor daemon and the MesosSupervisor that implements the ISupervisor interface. ---
[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2442 @revans2 Thanks for the quick fix. I can see you've addressed my comment. Btw, I didn't see actual patch so would take time to review the code. @jerrypeng I think you've got context of the codebase, then could you help reviewing the change? Thanks in advance. ---
[GitHub] storm pull request #2457: STORM-2854 Expose IEventLogger to make event loggi...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2457 ---
[GitHub] storm pull request #2458: (1.x) STORM-2854 Expose IEventLogger to make event...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2458 ---
[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2442 @HeartSaVioR I rebased it and made the changes. My updates are in the last commit f5e9532 ---
[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2442 @revans2 No problem. Please take your time. ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157610099 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -225,6 +237,23 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { } } +/** + * Checks If {@link OffsetAndMetadata} was committed by this topology, either by this or another spout instance. + * This info is used to decide if {@link FirstPollOffsetStrategy} should be applied + * + * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka + * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise + */ +private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata committedOffset) { +try { +return committedOffset != null && JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpoutMessageId.class) --- End diff -- It returns false if it throws exception when attempting to deserialize OffsetAndMetadata committed using a previous version of Storm. ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157543286 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -225,6 +243,25 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { } } +/** + * Checks If {@link OffsetAndMetadata} was committed by an instance of {@link KafkaSpout} in this topology. + * This info is used to decide if {@link FirstPollOffsetStrategy} should be applied + * + * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka + * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise + */ +private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata committedOffset) { +try { +final KafkaSpout.Info info = JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpout.Info.class); +return info.getTopologyId().equals(context.getStormId()); +} catch (IOException e) { +LOG.trace("Failed to deserialize {}. Error likely occurred because the last commit " + --- End diff -- Sure, but we're still deserializing the metadata on every emit. I think we can very likely get away with deserializing once per commit, or even once per spout activation, since we don't need to serialize + deserialize if we only update the metadata when committing. ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157537266 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java --- @@ -140,10 +140,16 @@ public OffsetAndMetadata findNextCommitOffset() { OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (found) { -nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, -nextCommitMsg.getMetadata(Thread.currentThread())); +try { +nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, +JSON_MAPPER.writeValueAsString(new KafkaSpout.Info(Thread.currentThread(), context))); --- End diff -- Will do. ---
[GitHub] storm pull request #2465: STORM-2844: KafkaSpout Throws IllegalStateExceptio...
Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157537115 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -225,6 +243,25 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { } } +/** + * Checks If {@link OffsetAndMetadata} was committed by an instance of {@link KafkaSpout} in this topology. + * This info is used to decide if {@link FirstPollOffsetStrategy} should be applied + * + * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka + * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise + */ +private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata committedOffset) { +try { +final KafkaSpout.Info info = JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpout.Info.class); +return info.getTopologyId().equals(context.getStormId()); +} catch (IOException e) { +LOG.trace("Failed to deserialize {}. Error likely occurred because the last commit " + --- End diff -- The log messages only get printed in the case of a commit made by a topology running an older version of Storm and until the first commit is done by this topology for each topic-partition. Ater that the first commit logs will no longer occur. I will look into if it makes sense caching this info. ---
[GitHub] storm issue #2442: STORM-2837: ConstraintSolverStrategy
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2442 @HeartSaVioR sorry I have been out for the past 2 weeks. I will update the configs accordingly. ---
[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2465 @hmcl There were a few more comments, just making sure they weren't missed. https://github.com/apache/storm/pull/2465#discussion_r157350384 https://github.com/apache/storm/pull/2465#discussion_r157375639 https://github.com/apache/storm/pull/2465#discussion_r157375809 ---