[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15370072#comment-15370072 ]
ASF GitHub Bot commented on STORM-1277: --------------------------------------- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r70194309 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,567 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + + private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + + protected final Map workerData; + protected final WorkerTopologyContext workerTopologyContext; + protected final List<Long> executorId; + protected final List<Integer> taskIds; + protected final String componentId; + protected final AtomicBoolean openOrPrepareWasCalled; + protected final Map stormConf; + protected final Map conf; + protected final String stormId; + protected final HashMap sharedExecutorData; + protected final AtomicBoolean stormActive; + protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug; + protected final Runnable suicideFn; + protected final IStormClusterState stormClusterState; + protected final Map<Integer, String> taskToComponent; + protected CommonStats stats; + protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry; + protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper; + protected final ReportErrorAndDie reportErrorDie; + protected final Callable<Boolean> sampler; + protected final AtomicBoolean backpressure; + protected ExecutorTransfer executorTransfer; + protected final String type; + protected final AtomicBoolean throttleOn; + protected IFn transferFn; + + protected final IReportError reportError; + protected final Random rand; + protected final DisruptorQueue transferQueue; + protected final DisruptorQueue receiveQueue; + protected Map<Integer, Task> idToTask; + protected final Map<String, String> credentials; + protected final Boolean isDebug; + protected final Boolean isEventLoggers; + protected String hostname; + + protected Executor(Map workerData, List<Long> executorId, Map<String, String> credentials) { + this.workerData = workerData; + this.executorId = executorId; + this.workerTopologyContext = StormCommon.makeWorkerContext(workerData); + this.taskIds = StormCommon.executorIdToTasks(executorId); + this.componentId = workerTopologyContext.getComponentId(taskIds.get(0)); + this.openOrPrepareWasCalled = new AtomicBoolean(false); + this.stormConf = normalizedComponentConf((Map) workerData.get("storm-conf"), workerTopologyContext, componentId); + this.receiveQueue = (DisruptorQueue) (((Map) workerData.get("executor-receive-queue-map")).get(executorId)); + this.stormId = (String) workerData.get("storm-id"); + this.conf = (Map) workerData.get("conf"); + this.sharedExecutorData = new HashMap(); + this.stormActive = (AtomicBoolean) workerData.get("storm-active-atom"); + this.stormComponentDebug = (AtomicReference<Map<String, DebugOptions>>) workerData.get("storm-component->debug-atom"); + + this.transferQueue = mkExecutorBatchQueue(stormConf, executorId); + this.transferFn = (IFn) workerData.get("transfer-fn"); + this.executorTransfer = new ExecutorTransfer(workerTopologyContext, transferQueue, stormConf, transferFn); + + this.suicideFn = (Runnable) workerData.get("suicide-fn"); + try { + this.stormClusterState = ClusterUtils.mkStormClusterState(workerData.get("state-store"), Utils.getWorkerACL(stormConf), + new ClusterStateContext(DaemonType.SUPERVISOR)); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + + StormTopology topology = workerTopologyContext.getRawTopology(); + Map<String, SpoutSpec> spouts = topology.get_spouts(); + Map<String, Bolt> bolts = topology.get_bolts(); + if (spouts.containsKey(componentId)) { + this.type = StatsUtil.SPOUT; + this.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(stormConf)); + } else if (bolts.containsKey(componentId)) { + this.type = StatsUtil.BOLT; + this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(stormConf)); + } else { + throw new RuntimeException("Could not find " + componentId + " in " + topology); + } + + this.intervalToTaskToMetricToRegistry = new HashMap<>(); + this.taskToComponent = (Map<Integer, String>) workerData.get("task->component"); + this.streamToComponentToGrouper = outboundComponents(workerTopologyContext, componentId, stormConf); + this.reportError = new ReportError(stormConf, stormClusterState, stormId, componentId, workerTopologyContext); + this.reportErrorDie = new ReportErrorAndDie(reportError, suicideFn); + this.sampler = ConfigUtils.mkStatsSampler(stormConf); + this.backpressure = new AtomicBoolean(false); + this.throttleOn = (AtomicBoolean) workerData.get("throttle-on"); + this.isDebug = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false); + this.rand = new Random(Utils.secureRandomLong()); + this.credentials = credentials; + this.isEventLoggers = StormCommon.hasEventLoggers(stormConf); + + try { + this.hostname = Utils.hostname(stormConf); + } catch (UnknownHostException ignored) { + this.hostname = ""; + } + } + + public static Executor mkExecutor(Map workerData, List<Long> executorId, Map<String, String> credentials) { + Executor executor; + + Map<String, Object> convertedWorkerData = Utils.convertMap(workerData); + WorkerTopologyContext workerTopologyContext = StormCommon.makeWorkerContext(convertedWorkerData); + List<Integer> taskIds = StormCommon.executorIdToTasks(executorId); + String componentId = workerTopologyContext.getComponentId(taskIds.get(0)); + + String type = getExecutorType(workerTopologyContext, componentId); + if (StatsUtil.SPOUT.equals(type)) { + executor = new SpoutExecutor(convertedWorkerData, executorId, credentials); + executor.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(executor.getStormConf())); + } else { + executor = new BoltExecutor(convertedWorkerData, executorId, credentials); + executor.stats = new BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf())); + } + + Map<Integer, Task> idToTask = new HashMap<>(); + for (Integer taskId : taskIds) { + try { + Task task = new Task(executor, taskId); + executor.sendUnanchored(task, StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer()); + idToTask.put(taskId, task); + } catch (IOException ex) { + throw Utils.wrapInRuntime(ex); + } + } + executor.init(idToTask); + + return executor; + } + + private static String getExecutorType(WorkerTopologyContext workerTopologyContext, String componentId) { + StormTopology topology = workerTopologyContext.getRawTopology(); + Map<String, SpoutSpec> spouts = topology.get_spouts(); + Map<String, Bolt> bolts = topology.get_bolts(); + if (spouts.containsKey(componentId)) { + return StatsUtil.SPOUT; + } else if (bolts.containsKey(componentId)) { + return StatsUtil.BOLT; + } else { + throw new RuntimeException("Could not find " + componentId + " in " + topology); + } + } + + /** + * separated from mkExecutor in order to replace executor transfer in executor data for testing + */ + public ExecutorShutdown execute() throws Exception { + LOG.info("Loading executor tasks " + componentId + ":" + executorId); + + registerBackpressure(); + Utils.SmartThread systemThreads = + Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie); + + String handlerName = componentId + "-executor" + executorId; + Utils.SmartThread handlers = Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, false, true, handlerName); + setupTicks(StatsUtil.SPOUT.equals(type)); + LOG.info("Finished loading executor " + componentId + ":" + executorId); + return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask); + } + + public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception; + + public abstract void init(Map<Integer, Task> idToTask); + + @SuppressWarnings("unchecked") + @Override + public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception { + ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event; + for (AddressedTuple addressedTuple : addressedTuples) { + TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); + int taskId = addressedTuple.getDest(); + if (isDebug) { + LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple); + } + if (taskId != AddressedTuple.BROADCAST_DEST) { + tupleActionFn(taskId, tuple); + } else { + for (Integer t : taskIds) { + tupleActionFn(t, tuple); + } + } + } + } + + public void metricsTick(Task taskData, TupleImpl tuple) { + try { + Integer interval = tuple.getInteger(0); + int taskId = taskData.getTaskId(); + Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval); + Map<String, IMetric> nameToRegistry = null; + if (taskToMetricToRegistry != null) { + nameToRegistry = taskToMetricToRegistry.get(taskId); + } + if (nameToRegistry != null) { + IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(hostname, workerTopologyContext.getThisWorkerPort(), + componentId, taskId, Time.currentTimeSecs(), interval); + List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>(); + for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) { + IMetric metric = entry.getValue(); + Object value = metric.getValueAndReset(); + if (value != null) { + IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value); + dataPoints.add(dataPoint); + } + } + if (!dataPoints.isEmpty()) { + sendUnanchored(taskData, Constants.METRICS_STREAM_ID, new Values(taskInfo, dataPoints), executorTransfer); + } + } + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + } + + protected void setupMetrics() { + for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) { + StormTimer timerTask = (StormTimer) workerData.get("user-timer"); + timerTask.scheduleRecurring(interval, interval, new Runnable() { + @Override + public void run() { + TupleImpl tuple = + new TupleImpl(workerTopologyContext, new Values(interval), (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID); + List<AddressedTuple> metricsTickTuple = Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple)); + receiveQueue.publish(metricsTickTuple); + } + }); + } + } + + public void sendUnanchored(Task task, String stream, List<Object> values, ExecutorTransfer transfer) { + Tuple tuple = task.getTuple(stream, values); + List<Integer> tasks = task.getOutgoingTasks(stream, values); + if (tasks.size() == 0) { --- End diff -- we don't need this check right. For loop won't enter if tasks are empty. We should avoid having multiple returns. > port backtype.storm.daemon.executor to java > ------------------------------------------- > > Key: STORM-1277 > URL: https://issues.apache.org/jira/browse/STORM-1277 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Cody > Labels: java-migration, jstorm-merger > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task > kind of. Tasks and executors are combined in jstorm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)