[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15370072#comment-15370072
 ] 

ASF GitHub Bot commented on STORM-1277:
---------------------------------------

Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1445#discussion_r70194309
  
    --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -0,0 +1,567 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.executor;
    +
    +import clojure.lang.IFn;
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.Lists;
    +import com.lmax.disruptor.EventHandler;
    +import com.lmax.disruptor.dsl.ProducerType;
    +import java.io.IOException;
    +import java.net.UnknownHostException;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicReference;
    +import org.apache.storm.Config;
    +import org.apache.storm.Constants;
    +import org.apache.storm.StormTimer;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.daemon.GrouperFactory;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.daemon.Task;
    +import org.apache.storm.executor.bolt.BoltExecutor;
    +import org.apache.storm.executor.error.IReportError;
    +import org.apache.storm.executor.error.ReportError;
    +import org.apache.storm.executor.error.ReportErrorAndDie;
    +import org.apache.storm.executor.spout.SpoutExecutor;
    +import org.apache.storm.generated.Bolt;
    +import org.apache.storm.generated.DebugOptions;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.generated.SpoutSpec;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.metric.api.IMetricsConsumer;
    +import org.apache.storm.stats.BoltExecutorStats;
    +import org.apache.storm.stats.CommonStats;
    +import org.apache.storm.stats.SpoutExecutorStats;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.task.WorkerTopologyContext;
    +import org.apache.storm.tuple.AddressedTuple;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.TupleImpl;
    +import org.apache.storm.tuple.Values;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.DisruptorBackpressureCallback;
    +import org.apache.storm.utils.DisruptorQueue;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.utils.WorkerBackpressureThread;
    +import org.json.simple.JSONValue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.*;
    +import java.util.concurrent.Callable;
    +
    +public abstract class Executor implements Callable, EventHandler {
    +
    +    private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
    +
    +    protected final Map workerData;
    +    protected final WorkerTopologyContext workerTopologyContext;
    +    protected final List<Long> executorId;
    +    protected final List<Integer> taskIds;
    +    protected final String componentId;
    +    protected final AtomicBoolean openOrPrepareWasCalled;
    +    protected final Map stormConf;
    +    protected final Map conf;
    +    protected final String stormId;
    +    protected final HashMap sharedExecutorData;
    +    protected final AtomicBoolean stormActive;
    +    protected final AtomicReference<Map<String, DebugOptions>> 
stormComponentDebug;
    +    protected final Runnable suicideFn;
    +    protected final IStormClusterState stormClusterState;
    +    protected final Map<Integer, String> taskToComponent;
    +    protected CommonStats stats;
    +    protected final Map<Integer, Map<Integer, Map<String, IMetric>>> 
intervalToTaskToMetricToRegistry;
    +    protected final Map<String, Map<String, 
LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
    +    protected final ReportErrorAndDie reportErrorDie;
    +    protected final Callable<Boolean> sampler;
    +    protected final AtomicBoolean backpressure;
    +    protected ExecutorTransfer executorTransfer;
    +    protected final String type;
    +    protected final AtomicBoolean throttleOn;
    +    protected IFn transferFn;
    +
    +    protected final IReportError reportError;
    +    protected final Random rand;
    +    protected final DisruptorQueue transferQueue;
    +    protected final DisruptorQueue receiveQueue;
    +    protected Map<Integer, Task> idToTask;
    +    protected final Map<String, String> credentials;
    +    protected final Boolean isDebug;
    +    protected final Boolean isEventLoggers;
    +    protected String hostname;
    +
    +    protected Executor(Map workerData, List<Long> executorId, Map<String, 
String> credentials) {
    +        this.workerData = workerData;
    +        this.executorId = executorId;
    +        this.workerTopologyContext = 
StormCommon.makeWorkerContext(workerData);
    +        this.taskIds = StormCommon.executorIdToTasks(executorId);
    +        this.componentId = 
workerTopologyContext.getComponentId(taskIds.get(0));
    +        this.openOrPrepareWasCalled = new AtomicBoolean(false);
    +        this.stormConf = normalizedComponentConf((Map) 
workerData.get("storm-conf"), workerTopologyContext, componentId);
    +        this.receiveQueue = (DisruptorQueue) (((Map) 
workerData.get("executor-receive-queue-map")).get(executorId));
    +        this.stormId = (String) workerData.get("storm-id");
    +        this.conf = (Map) workerData.get("conf");
    +        this.sharedExecutorData = new HashMap();
    +        this.stormActive = (AtomicBoolean) 
workerData.get("storm-active-atom");
    +        this.stormComponentDebug = (AtomicReference<Map<String, 
DebugOptions>>) workerData.get("storm-component->debug-atom");
    +
    +        this.transferQueue = mkExecutorBatchQueue(stormConf, executorId);
    +        this.transferFn = (IFn) workerData.get("transfer-fn");
    +        this.executorTransfer = new 
ExecutorTransfer(workerTopologyContext, transferQueue, stormConf, transferFn);
    +
    +        this.suicideFn = (Runnable) workerData.get("suicide-fn");
    +        try {
    +            this.stormClusterState = 
ClusterUtils.mkStormClusterState(workerData.get("state-store"), 
Utils.getWorkerACL(stormConf),
    +                    new ClusterStateContext(DaemonType.SUPERVISOR));
    +        } catch (Exception e) {
    +            throw Utils.wrapInRuntime(e);
    +        }
    +
    +        StormTopology topology = workerTopologyContext.getRawTopology();
    +        Map<String, SpoutSpec> spouts = topology.get_spouts();
    +        Map<String, Bolt> bolts = topology.get_bolts();
    +        if (spouts.containsKey(componentId)) {
    +            this.type = StatsUtil.SPOUT;
    +            this.stats = new 
SpoutExecutorStats(ConfigUtils.samplingRate(stormConf));
    +        } else if (bolts.containsKey(componentId)) {
    +            this.type = StatsUtil.BOLT;
    +            this.stats = new 
BoltExecutorStats(ConfigUtils.samplingRate(stormConf));
    +        } else {
    +            throw new RuntimeException("Could not find " + componentId + " 
in " + topology);
    +        }
    +
    +        this.intervalToTaskToMetricToRegistry = new HashMap<>();
    +        this.taskToComponent = (Map<Integer, String>) 
workerData.get("task->component");
    +        this.streamToComponentToGrouper = 
outboundComponents(workerTopologyContext, componentId, stormConf);
    +        this.reportError = new ReportError(stormConf, stormClusterState, 
stormId, componentId, workerTopologyContext);
    +        this.reportErrorDie = new ReportErrorAndDie(reportError, 
suicideFn);
    +        this.sampler = ConfigUtils.mkStatsSampler(stormConf);
    +        this.backpressure = new AtomicBoolean(false);
    +        this.throttleOn = (AtomicBoolean) workerData.get("throttle-on");
    +        this.isDebug = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
    +        this.rand = new Random(Utils.secureRandomLong());
    +        this.credentials = credentials;
    +        this.isEventLoggers = StormCommon.hasEventLoggers(stormConf);
    +
    +        try {
    +            this.hostname = Utils.hostname(stormConf);
    +        } catch (UnknownHostException ignored) {
    +            this.hostname = "";
    +        }
    +    }
    +
    +    public static Executor mkExecutor(Map workerData, List<Long> 
executorId, Map<String, String> credentials) {
    +        Executor executor;
    +
    +        Map<String, Object> convertedWorkerData = 
Utils.convertMap(workerData);
    +        WorkerTopologyContext workerTopologyContext = 
StormCommon.makeWorkerContext(convertedWorkerData);
    +        List<Integer> taskIds = StormCommon.executorIdToTasks(executorId);
    +        String componentId = 
workerTopologyContext.getComponentId(taskIds.get(0));
    +
    +        String type = getExecutorType(workerTopologyContext, componentId);
    +        if (StatsUtil.SPOUT.equals(type)) {
    +            executor = new SpoutExecutor(convertedWorkerData, executorId, 
credentials);
    +            executor.stats = new 
SpoutExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()));
    +        } else {
    +            executor = new BoltExecutor(convertedWorkerData, executorId, 
credentials);
    +            executor.stats = new 
BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()));
    +        }
    +
    +        Map<Integer, Task> idToTask = new HashMap<>();
    +        for (Integer taskId : taskIds) {
    +            try {
    +                Task task = new Task(executor, taskId);
    +                executor.sendUnanchored(task, 
StormCommon.SYSTEM_STREAM_ID, new Values("startup"), 
executor.getExecutorTransfer());
    +                idToTask.put(taskId, task);
    +            } catch (IOException ex) {
    +                throw Utils.wrapInRuntime(ex);
    +            }
    +        }
    +        executor.init(idToTask);
    +
    +        return executor;
    +    }
    +
    +    private static String getExecutorType(WorkerTopologyContext 
workerTopologyContext, String componentId) {
    +        StormTopology topology = workerTopologyContext.getRawTopology();
    +        Map<String, SpoutSpec> spouts = topology.get_spouts();
    +        Map<String, Bolt> bolts = topology.get_bolts();
    +        if (spouts.containsKey(componentId)) {
    +            return StatsUtil.SPOUT;
    +        } else if (bolts.containsKey(componentId)) {
    +            return StatsUtil.BOLT;
    +        } else {
    +            throw new RuntimeException("Could not find " + componentId + " 
in " + topology);
    +        }
    +    }
    +
    +    /**
    +     * separated from mkExecutor in order to replace executor transfer in 
executor data for testing
    +     */
    +    public ExecutorShutdown execute() throws Exception {
    +        LOG.info("Loading executor tasks " + componentId + ":" + 
executorId);
    +
    +        registerBackpressure();
    +        Utils.SmartThread systemThreads =
    +                Utils.asyncLoop(executorTransfer, 
executorTransfer.getName(), reportErrorDie);
    +
    +        String handlerName = componentId + "-executor" + executorId;
    +        Utils.SmartThread handlers = Utils.asyncLoop(this, false, 
reportErrorDie, Thread.NORM_PRIORITY, false, true, handlerName);
    +        setupTicks(StatsUtil.SPOUT.equals(type));
    +        LOG.info("Finished loading executor " + componentId + ":" + 
executorId);
    +        return new ExecutorShutdown(this, 
Lists.newArrayList(systemThreads, handlers), idToTask);
    +    }
    +
    +    public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws 
Exception;
    +
    +    public abstract void init(Map<Integer, Task> idToTask);
    +
    +    @SuppressWarnings("unchecked")
    +    @Override
    +    public void onEvent(Object event, long seq, boolean endOfBatch) throws 
Exception {
    +        ArrayList<AddressedTuple> addressedTuples = 
(ArrayList<AddressedTuple>) event;
    +        for (AddressedTuple addressedTuple : addressedTuples) {
    +            TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
    +            int taskId = addressedTuple.getDest();
    +            if (isDebug) {
    +                LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
    +            }
    +            if (taskId != AddressedTuple.BROADCAST_DEST) {
    +                tupleActionFn(taskId, tuple);
    +            } else {
    +                for (Integer t : taskIds) {
    +                    tupleActionFn(t, tuple);
    +                }
    +            }
    +        }
    +    }
    +
    +    public void metricsTick(Task taskData, TupleImpl tuple) {
    +        try {
    +            Integer interval = tuple.getInteger(0);
    +            int taskId = taskData.getTaskId();
    +            Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = 
intervalToTaskToMetricToRegistry.get(interval);
    +            Map<String, IMetric> nameToRegistry = null;
    +            if (taskToMetricToRegistry != null) {
    +                nameToRegistry = taskToMetricToRegistry.get(taskId);
    +            }
    +            if (nameToRegistry != null) {
    +                IMetricsConsumer.TaskInfo taskInfo = new 
IMetricsConsumer.TaskInfo(hostname, workerTopologyContext.getThisWorkerPort(),
    +                        componentId, taskId, Time.currentTimeSecs(), 
interval);
    +                List<IMetricsConsumer.DataPoint> dataPoints = new 
ArrayList<>();
    +                for (Map.Entry<String, IMetric> entry : 
nameToRegistry.entrySet()) {
    +                    IMetric metric = entry.getValue();
    +                    Object value = metric.getValueAndReset();
    +                    if (value != null) {
    +                        IMetricsConsumer.DataPoint dataPoint = new 
IMetricsConsumer.DataPoint(entry.getKey(), value);
    +                        dataPoints.add(dataPoint);
    +                    }
    +                }
    +                if (!dataPoints.isEmpty()) {
    +                    sendUnanchored(taskData, Constants.METRICS_STREAM_ID, 
new Values(taskInfo, dataPoints), executorTransfer);
    +                }
    +            }
    +        } catch (Exception e) {
    +            throw Utils.wrapInRuntime(e);
    +        }
    +    }
    +
    +    protected void setupMetrics() {
    +        for (final Integer interval : 
intervalToTaskToMetricToRegistry.keySet()) {
    +            StormTimer timerTask = (StormTimer) 
workerData.get("user-timer");
    +            timerTask.scheduleRecurring(interval, interval, new Runnable() 
{
    +                @Override
    +                public void run() {
    +                    TupleImpl tuple =
    +                            new TupleImpl(workerTopologyContext, new 
Values(interval), (int) Constants.SYSTEM_TASK_ID, 
Constants.METRICS_TICK_STREAM_ID);
    +                    List<AddressedTuple> metricsTickTuple = 
Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
    +                    receiveQueue.publish(metricsTickTuple);
    +                }
    +            });
    +        }
    +    }
    +
    +    public void sendUnanchored(Task task, String stream, List<Object> 
values, ExecutorTransfer transfer) {
    +        Tuple tuple = task.getTuple(stream, values);
    +        List<Integer> tasks = task.getOutgoingTasks(stream, values);
    +        if (tasks.size() == 0) {
    --- End diff --
    
    we don't need this check right. For loop won't enter if tasks are empty. We 
should avoid having multiple returns.


> port backtype.storm.daemon.executor to java
> -------------------------------------------
>
>                 Key: STORM-1277
>                 URL: https://issues.apache.org/jira/browse/STORM-1277
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Cody
>              Labels: java-migration, jstorm-merger
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task
>  kind of.  Tasks and executors are combined in jstorm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to