[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r87187329 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import com.google.common.base.Preconditions; +import com.lmax.disruptor.EventHandler; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ObjectUtils; +import org.apache.storm.Config; +import org.apache.storm.cluster.*; +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.ExecutorShutdown; +import org.apache.storm.executor.IRunningExecutor; +import org.apache.storm.executor.LocalExecutor; +import org.apache.storm.generated.*; +import org.apache.storm.messaging.IConnection; +import org.apache.storm.messaging.IContext; +import org.apache.storm.messaging.TaskMessage; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IAutoCredentials; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.*; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; + +import javax.security.auth.Subject; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class Worker implements Shutdownable, DaemonCommon { + +private static final Logger LOG = LoggerFactory.getLogger(Worker.class); +private final Map conf; +private final IContext context; +private final String topologyId; +private final String assignmentId; +private final int port; +private final String workerId; +private final LogConfigManager logConfigManager; + + +private WorkerState workerState; +private AtomicReferenceexecutorsAtom; +private Thread transferThread; +private WorkerBackpressureThread backpressureThread; + +private AtomicReference credentialsAtom; +private Subject subject; +private Collection autoCreds; + + +/** + * TODO: should worker even take the topologyId as input? this should be + * deducable from cluster state (by searching through assignments) + * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency + * + * @param conf - Storm configuration + * @param context - + * @param topologyId - topology id + * @param assignmentId - assignement id + * @param port - port on which the worker runs + * @param workerId - worker id + */ + +public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) { +this.conf = conf; +this.context = context; +this.topologyId = topologyId; +this.assignmentId = assignmentId; +this.port = port; +this.workerId = workerId; +this.logConfigManager = new LogConfigManager(); +} + +public void start() throws Exception { +LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId, +conf); +// because in local mode, its not a separate +// process. supervisor will register it in this case +// if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r87191222 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import com.google.common.base.Preconditions; +import com.lmax.disruptor.EventHandler; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ObjectUtils; +import org.apache.storm.Config; +import org.apache.storm.cluster.*; +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.ExecutorShutdown; +import org.apache.storm.executor.IRunningExecutor; +import org.apache.storm.executor.LocalExecutor; +import org.apache.storm.generated.*; +import org.apache.storm.messaging.IConnection; +import org.apache.storm.messaging.IContext; +import org.apache.storm.messaging.TaskMessage; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IAutoCredentials; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.*; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; + +import javax.security.auth.Subject; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class Worker implements Shutdownable, DaemonCommon { + +private static final Logger LOG = LoggerFactory.getLogger(Worker.class); +private final Map conf; +private final IContext context; +private final String topologyId; +private final String assignmentId; +private final int port; +private final String workerId; +private final LogConfigManager logConfigManager; + + +private WorkerState workerState; +private AtomicReferenceexecutorsAtom; +private Thread transferThread; +private WorkerBackpressureThread backpressureThread; + +private AtomicReference credentialsAtom; +private Subject subject; +private Collection autoCreds; + + +/** + * TODO: should worker even take the topologyId as input? this should be + * deducable from cluster state (by searching through assignments) + * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency + * + * @param conf - Storm configuration + * @param context - + * @param topologyId - topology id + * @param assignmentId - assignement id + * @param port - port on which the worker runs + * @param workerId - worker id + */ + +public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) { +this.conf = conf; +this.context = context; +this.topologyId = topologyId; +this.assignmentId = assignmentId; +this.port = port; +this.workerId = workerId; +this.logConfigManager = new LogConfigManager(); +} + +public void start() throws Exception { +LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId, +conf); +// because in local mode, its not a separate +// process. supervisor will register it in this case +// if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r87336388 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/WorkerState.java --- @@ -0,0 +1,690 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.cluster.VersionedData; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.supervisor.AdvancedFSOps; +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.NodeInfo; +import org.apache.storm.generated.StormBase; +import org.apache.storm.generated.StormBase; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.StreamInfo; +import org.apache.storm.generated.TopologyStatus; +import org.apache.storm.grouping.Load; +import org.apache.storm.grouping.LoadMapping; +import org.apache.storm.hooks.BaseWorkerHook; +import org.apache.storm.messaging.ConnectionWithStatus; +import org.apache.storm.messaging.DeserializingConnectionCallback; +import org.apache.storm.messaging.IConnection; +import org.apache.storm.messaging.IContext; +import org.apache.storm.messaging.TaskMessage; +import org.apache.storm.messaging.TransportFactory; +import org.apache.storm.serialization.KryoTupleSerializer; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.ThriftTopologyUtils; +import org.apache.storm.utils.TransferDrainer; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; + +public class WorkerState { + +private static final Logger LOG = LoggerFactory.getLogger(WorkerState.class); + +final Map conf; +final IContext mqContext; + +public Map getConf() { +return conf; +} + +public IConnection getReceiver() { +return receiver; +} + +public String getTopologyId() { +return topologyId; +} + +public int getPort() { +return port; +} + +public String getWorkerId() { +return workerId; +} + +public IStateStorage getStateStorage() { +return stateStorage; +} + +public AtomicBoolean getIsTopologyActive() { +return isTopologyActive; +} + +public AtomicReference
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r87337480 --- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java --- @@ -123,6 +123,7 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; --- End diff -- Nit: new import without actual usage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r87191290 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import com.google.common.base.Preconditions; +import com.lmax.disruptor.EventHandler; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ObjectUtils; +import org.apache.storm.Config; +import org.apache.storm.cluster.*; +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.ExecutorShutdown; +import org.apache.storm.executor.IRunningExecutor; +import org.apache.storm.executor.LocalExecutor; +import org.apache.storm.generated.*; +import org.apache.storm.messaging.IConnection; +import org.apache.storm.messaging.IContext; +import org.apache.storm.messaging.TaskMessage; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IAutoCredentials; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.*; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; + +import javax.security.auth.Subject; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class Worker implements Shutdownable, DaemonCommon { + +private static final Logger LOG = LoggerFactory.getLogger(Worker.class); +private final Map conf; +private final IContext context; +private final String topologyId; +private final String assignmentId; +private final int port; +private final String workerId; +private final LogConfigManager logConfigManager; + + +private WorkerState workerState; +private AtomicReferenceexecutorsAtom; +private Thread transferThread; +private WorkerBackpressureThread backpressureThread; + +private AtomicReference credentialsAtom; +private Subject subject; +private Collection autoCreds; + + +/** + * TODO: should worker even take the topologyId as input? this should be + * deducable from cluster state (by searching through assignments) + * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency + * + * @param conf - Storm configuration + * @param context - + * @param topologyId - topology id + * @param assignmentId - assignement id + * @param port - port on which the worker runs + * @param workerId - worker id + */ + +public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) { +this.conf = conf; +this.context = context; +this.topologyId = topologyId; +this.assignmentId = assignmentId; +this.port = port; +this.workerId = workerId; +this.logConfigManager = new LogConfigManager(); +} + +public void start() throws Exception { +LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId, +conf); +// because in local mode, its not a separate +// process. supervisor will register it in this case +// if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r87160199 --- Diff: storm-core/src/jvm/org/apache/storm/Constants.java --- @@ -20,12 +20,15 @@ import org.apache.storm.coordination.CoordinatedBolt; import clojure.lang.RT; --- End diff -- Nitpick: We could remove RT now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r87189395 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import com.google.common.base.Preconditions; +import com.lmax.disruptor.EventHandler; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ObjectUtils; +import org.apache.storm.Config; +import org.apache.storm.cluster.*; +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.ExecutorShutdown; +import org.apache.storm.executor.IRunningExecutor; +import org.apache.storm.executor.LocalExecutor; +import org.apache.storm.generated.*; +import org.apache.storm.messaging.IConnection; +import org.apache.storm.messaging.IContext; +import org.apache.storm.messaging.TaskMessage; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IAutoCredentials; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.*; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; + +import javax.security.auth.Subject; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class Worker implements Shutdownable, DaemonCommon { + +private static final Logger LOG = LoggerFactory.getLogger(Worker.class); +private final Map conf; +private final IContext context; +private final String topologyId; +private final String assignmentId; +private final int port; +private final String workerId; +private final LogConfigManager logConfigManager; + + +private WorkerState workerState; +private AtomicReferenceexecutorsAtom; +private Thread transferThread; +private WorkerBackpressureThread backpressureThread; + +private AtomicReference credentialsAtom; +private Subject subject; +private Collection autoCreds; + + +/** + * TODO: should worker even take the topologyId as input? this should be + * deducable from cluster state (by searching through assignments) + * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency + * + * @param conf - Storm configuration + * @param context - + * @param topologyId - topology id + * @param assignmentId - assignement id + * @param port - port on which the worker runs + * @param workerId - worker id + */ + +public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) { +this.conf = conf; +this.context = context; +this.topologyId = topologyId; +this.assignmentId = assignmentId; +this.port = port; +this.workerId = workerId; +this.logConfigManager = new LogConfigManager(); +} + +public void start() throws Exception { +LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId, +conf); +// because in local mode, its not a separate +// process. supervisor will register it in this case +// if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r87163440 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import com.google.common.base.Preconditions; +import com.lmax.disruptor.EventHandler; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ObjectUtils; +import org.apache.storm.Config; +import org.apache.storm.cluster.*; --- End diff -- Minor or maybe nitpick: it would be better to expand here and below three. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r87161787 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java --- @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.apache.storm.utils.Time; +import org.apache.storm.generated.LogConfig; +import org.apache.storm.generated.LogLevel; +import org.apache.storm.generated.LogLevelAction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; + +public class LogConfigManager { + +private static final Logger LOG = LoggerFactory.getLogger(LogConfigManager.class); + +private final AtomicReference> latestLogConfig; +private final Map originalLogLevels; + +public LogConfigManager() { +this(new AtomicReference<>(new TreeMap<>())); +} + +public LogConfigManager(AtomicReference > latestLogConfig) { +this.latestLogConfig = latestLogConfig; +this.originalLogLevels = getLoggerLevels(); +LOG.info("Started with log levels: {}", originalLogLevels); +} + +public void processLogConfigChange(LogConfig logConfig) { +if (null != logConfig) { +LOG.debug("Processing received log config: {}", logConfig); +TreeMap loggers = new TreeMap<>(logConfig.get_named_logger_level()); +LoggerContext logContext = (LoggerContext) LogManager.getContext(false); +Map newLogConfigs = new HashMap<>(); +for (Map.Entry entry : loggers.entrySet()) { +String msgLoggerName = entry.getKey(); +msgLoggerName = ("ROOT".equalsIgnoreCase(msgLoggerName)) ? LogManager.ROOT_LOGGER_NAME : msgLoggerName; +LogLevel loggerLevel = entry.getValue(); +// the new-timeouts map now contains logger => timeout +if (loggerLevel.is_set_reset_log_level_timeout_epoch()) { +LogLevel copy = new LogLevel(loggerLevel); +if (originalLogLevels.containsKey(msgLoggerName)) { + copy.set_reset_log_level(originalLogLevels.get(msgLoggerName).name()); +} else { +copy.set_reset_log_level(Level.INFO.name()); +} + +//copy.unset_reset_log_level(); --- End diff -- Nitpick: remove comment here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1756: STORM-1278: Port org.apache.storm.daemon.worker to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1756#discussion_r87190721 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import com.google.common.base.Preconditions; +import com.lmax.disruptor.EventHandler; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ObjectUtils; +import org.apache.storm.Config; +import org.apache.storm.cluster.*; +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.ExecutorShutdown; +import org.apache.storm.executor.IRunningExecutor; +import org.apache.storm.executor.LocalExecutor; +import org.apache.storm.generated.*; +import org.apache.storm.messaging.IConnection; +import org.apache.storm.messaging.IContext; +import org.apache.storm.messaging.TaskMessage; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IAutoCredentials; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.*; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; + +import javax.security.auth.Subject; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class Worker implements Shutdownable, DaemonCommon { + +private static final Logger LOG = LoggerFactory.getLogger(Worker.class); +private final Map conf; +private final IContext context; +private final String topologyId; +private final String assignmentId; +private final int port; +private final String workerId; +private final LogConfigManager logConfigManager; + + +private WorkerState workerState; +private AtomicReferenceexecutorsAtom; +private Thread transferThread; +private WorkerBackpressureThread backpressureThread; + +private AtomicReference credentialsAtom; +private Subject subject; +private Collection autoCreds; + + +/** + * TODO: should worker even take the topologyId as input? this should be + * deducable from cluster state (by searching through assignments) + * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency + * + * @param conf - Storm configuration + * @param context - + * @param topologyId - topology id + * @param assignmentId - assignement id + * @param port - port on which the worker runs + * @param workerId - worker id + */ + +public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) { +this.conf = conf; +this.context = context; +this.topologyId = topologyId; +this.assignmentId = assignmentId; +this.port = port; +this.workerId = workerId; +this.logConfigManager = new LogConfigManager(); +} + +public void start() throws Exception { +LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId, +conf); +// because in local mode, its not a separate +// process. supervisor will register it in this case +// if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
[GitHub] storm issue #1772: [STORM-2196] A typo in RAS_Node::consumeCPU
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1772 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1771: STORM-2197: NimbusClient connections leak due to leakage ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1771 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1770: STORM 2197: NimbusClient connections leak due to ThriftCl...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1770 +1 Nice finding. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1772: [STORM-2196] A typo in RAS_Node::consumeCPU
GitHub user wangli1426 opened a pull request: https://github.com/apache/storm/pull/1772 [STORM-2196] A typo in RAS_Node::consumeCPU Fixed a typo in RAS_Node::consumeCPU() You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangli1426/storm patch-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1772.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1772 commit 2bae6789a47b2cb848c844a8a3e552df8e65f97d Author: Li WangDate: 2016-11-10T04:48:26Z [STORM-2196] A typo in RAS_Node::consumeCPU Fixed a typo in RAS_Node::consumeCPU() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1771: STORM-2197: NimbusClient connections leak due to l...
GitHub user satishd opened a pull request: https://github.com/apache/storm/pull/1771 STORM-2197: NimbusClient connections leak due to leakage in ThriftClient. - This is #1770 for master. You can merge this pull request into a Git repository by running: $ git pull https://github.com/satishd/storm STORM-2197-master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1771.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1771 commit c34607c7aafc3da6b6f85ccd1732b503dcb5ef02 Author: Satish DugganaDate: 2016-11-10T04:05:31Z STORM-2197: NimbusClient connectins leak due to leakage in ThriftClient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1770: STORM 2197: NimbusClient connections leak due to T...
GitHub user satishd opened a pull request: https://github.com/apache/storm/pull/1770 STORM 2197: NimbusClient connections leak due to ThriftClient connection's leakage in case of errors. - NimbusClient connections leak due to ThriftClient connection's leakage in case of errors. You can merge this pull request into a Git repository by running: $ git pull https://github.com/satishd/storm STORM-2197 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1770.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1770 commit aab7b21f551f78d67aacf07d8bcc1982cb2ebd90 Author: Satish DugganaDate: 2016-11-10T04:05:31Z STORM-2197: NimbusClient connectins leak due to leakage in ThriftClient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1751: [STORM-2172][SQL] Support Avro as input / output format
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1751 @vesense Could you provide working example so that we can test it manually and finally document it? I'm +1 after checking manual test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1751: [STORM-2172][SQL] Support Avro as input / output format
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1751 @vesense OK great. We could even document this so that other Storm SQL data source contributors don't use 'input' / 'output' as prefix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [DISCUSS] breaking changes in 2.x
If we want users to upgrade to new version, the rolling upgrade is a major decision factor. As a community, we need to look API updates or breaking changes much more diligently. I agree to an extent we shouldn't limiting ourselves with rolling upgrade. But having announced rolling-upgrade in 0.10 and then not supporting it in 1.x and now in 2.x. In User's point of view, Storm is not rolling upgradable although we shipped a release stating that rolling upgrade is supported and in follow-up release we taken that off. Does these API changes are critical and worth breaking rolling upgrade? -Harsha On Mon, Nov 7, 2016 at 9:16 AM Kyle Nusbaumwrote: I worry that making it a priority to have rolling upgrades between major versions significantly restricts the kinds of changes that we can make, including some kinds of changes that a major version increment is supposed to mark. I'm not really in support of trying to do that. If we can't make changes that break compatibility now, when can we make them? Can changes like that ever be made? I don't know that it's good to limit ourselves like that. Trying to accommodate rolling upgrades is a good idea, but I'm not sure about rejecting code changes across major versions to support them. 2.x represents a large shift in the project, and I expect once the translation, etc. is done, things will calm down and APIs will become more stable, allowing more of our future releases to be rolling even across major versions. I'd rather get these kinds of changes out of the way now in the 2.x release than cart along the vestiges of 1.x from now on. What do others think about this? -- Kyle On Monday, November 7, 2016, 3:10:08 AM CST, Bobby Evans wrote:Let's distinguish between wire compatibility changes and API compatibility changes, along with impact to workers vs impact to clients. For 3) splitting the classpath up for each daemon wire compatibility is not impacted, but we are potentially removing a bunch of APIs from the worker and client classpath. Most of these where shaded and users should not be impacted by them being removed, but a few like servlet-api-2.5.jar are likely to be removed. So yes the impact here would likely be very small. However on the client side if a topology wants to include LocalCluster (like we do in a lot of examples) the topology jar will get a lot bigger. LocalCluster needs access to nimbus, supervisor, and drpc server. These would not be on the worker classpath any more and would then need to be packaged into the topology jar to make LocalCluster work. In production I would expect LocalCluster to be used by tests and not be included like we do in a lot of examples. This is more of a shift for how we expect users to interact with the LocalCluster. For 1) NodeInfo.port depending on how we do it, it could break wire compatibility and API compatibility (which is what I would prefer). We could play some games to maintain compatibility, but for me it is enough of a pain that I am not sure it is worth it. However this is not likely to impact workers because they don't use these APIs directly. It might impact clients but only if they have custom code to profile their topologies. IF they use the build in CLI/UI it would not be impacted. For 2) nocamel would break API compatibility, but not wire compatibility. This is not likely to impact workers because like with 1 workers don't really interact with nimbus directly so it would not be a problem. Old clients running with older versions of storm would continue to work, but any custom client code (think what gets run by storm jar) would need to be recompiled/modified to be able to run on against a storm-2.x client. - Bobby
[GitHub] storm issue #1766: [STORM-2192] Add a new IAutoCredentials plugin to support...
Github user knusbaum commented on the issue: https://github.com/apache/storm/pull/1766 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [DISCUSS] breaking changes in 2.x
I agree with Kyle. I think trying to cling to being able to do rolling upgrades across major versions would be too limiting in terms of adding new features. If we can keep RU capability within major version lines, that seems like a fair tradeoff. -Taylor > On Nov 7, 2016, at 12:16 PM, Kyle Nusbaum> wrote: > > I worry that making it a priority to have rolling upgrades between major > versions significantly restricts the kinds of changes that we can make, > including some kinds of changes that a major version increment is supposed to > mark. I'm not really in support of trying to do that. > If we can't make changes that break compatibility now, when can we make them? > Can changes like that ever be made? I don't know that it's good to limit > ourselves like that. > Trying to accommodate rolling upgrades is a good idea, but I'm not sure about > rejecting code changes across major versions to support them. 2.x represents > a large shift in the project, and I expect once the translation, etc. is > done, things will calm down and APIs will become more stable, allowing more > of our future releases to be rolling even across major versions. I'd rather > get these kinds of changes out of the way now in the 2.x release than cart > along the vestiges of 1.x from now on. > What do others think about this? > -- Kyle > > On Monday, November 7, 2016, 3:10:08 AM CST, Bobby Evans > wrote:Let's distinguish between wire > compatibility changes and API compatibility changes, along with impact to > workers vs impact to clients. > For 3) splitting the classpath up for each daemon wire compatibility is not > impacted, but we are potentially removing a bunch of APIs from the worker and > client classpath. Most of these where shaded and users should not be > impacted by them being removed, but a few like servlet-api-2.5.jar are likely > to be removed. So yes the impact here would likely be very small. > However on the client side if a topology wants to include LocalCluster (like > we do in a lot of examples) the topology jar will get a lot bigger. > LocalCluster needs access to nimbus, supervisor, and drpc server. These > would not be on the worker classpath any more and would then need to be > packaged into the topology jar to make LocalCluster work. In production I > would expect LocalCluster to be used by tests and not be included like we do > in a lot of examples. This is more of a shift for how we expect users to > interact with the LocalCluster. > For 1) NodeInfo.port depending on how we do it, it could break wire > compatibility and API compatibility (which is what I would prefer). We could > play some games to maintain compatibility, but for me it is enough of a pain > that I am not sure it is worth it. However this is not likely to impact > workers because they don't use these APIs directly. It might impact clients > but only if they have custom code to profile their topologies. IF they use > the build in CLI/UI it would not be impacted. > For 2) nocamel would break API compatibility, but not wire compatibility. > This is not likely to impact workers because like with 1 workers don't really > interact with nimbus directly so it would not be a problem. Old clients > running with older versions of storm would continue to work, but any custom > client code (think what gets run by storm jar) would need to be > recompiled/modified to be able to run on against a storm-2.x client. > > - Bobby
[GitHub] storm pull request #1769: STORM-2195: Clean up some of worker-launcher code
GitHub user knusbaum opened a pull request: https://github.com/apache/storm/pull/1769 STORM-2195: Clean up some of worker-launcher code You can merge this pull request into a Git repository by running: $ git pull https://github.com/knusbaum/incubator-storm worker-launcher-comm Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1769.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1769 commit eaa415a10a2d51aa87651c1adf7f68f2f0ac75de Author: Kyle NusbaumDate: 2016-11-02T20:44:48Z Cleaning up some of worker-launcher --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1768: STORM-2194: Report error and die, not report error...
GitHub user chawco opened a pull request: https://github.com/apache/storm/pull/1768 STORM-2194: Report error and die, not report error or die This appears to have just ported an existing bug from executor.clj -- this is what I believe the expected behaviour is/was. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chawco/storm master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1768.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1768 commit f3a64f08cf056f638e703f420d025c1a81af6fcc Author: Craig HawcoDate: 2016-11-09T20:07:03Z STORM-2194: Report error and die, not report error or die --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1767: STORM-2194: Report error and die, not report error...
GitHub user chawco opened a pull request: https://github.com/apache/storm/pull/1767 STORM-2194: Report error and die, not report error or die We should still kill our executor after encountering an unhandled exception. This change logs what happens as before, but also ensures we always call suicide-fn to ensure our worker exits. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chawco/storm STORM-2194 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1767.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1767 commit 952fe22f0ec2dc1193f0d951d16f8ee0eb58e817 Author: Craig HawcoDate: 2016-11-09T20:02:04Z STORM-2194: Report error and die, not report error or die --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1751: [STORM-2172][SQL] Support Avro as input / output format
Github user vesense commented on the issue: https://github.com/apache/storm/pull/1751 @HeartSaVioR You can take a look at the code after I addressed that using `input.avro.schema` and `output.avro.schema` for input and output Avro Schemas. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1751: [STORM-2172][SQL] Support Avro as input / output format
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1751 @vesense If this comes from TBLPROPERTIES, now TBLPROPERTIES is shared between data sources and input / output format. How we differentiate them? Would adding prefix be fine? Or would we want to manage reserved keys? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---