[
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324397#comment-15324397
]
ASF GitHub Bot commented on STORM-1277:
---------------------------------------
Github user abhishekagarwal87 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1445#discussion_r66607521
--- Diff: storm-core/src/jvm/org/apache/storm/executor/BaseExecutor.java ---
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import java.net.UnknownHostException;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+
+public abstract class BaseExecutor implements Callable, EventHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseExecutor.class);
+
+ protected final ExecutorData executorData;
+ protected final Map stormConf;
+ protected final String componentId;
+ protected final WorkerTopologyContext workerTopologyContext;
+ protected final IReportError reportError;
+ protected final Callable<Boolean> sampler;
+ protected final Random rand;
+ protected final DisruptorQueue transferQueue;
+ protected final DisruptorQueue receiveQueue;
+ protected final Map<Integer, Task> idToTask;
+ protected final Map<String, String> credentials;
+ protected final Boolean isDebug;
+ protected final Boolean isEventLoggers;
+ protected String hostname;
+
+ public BaseExecutor(ExecutorData executorData, Map<Integer, Task>
idToTask, Map<String, String> credentials) {
+ this.executorData = executorData;
+ this.stormConf = executorData.getStormConf();
+ this.componentId = executorData.getComponentId();
+ this.workerTopologyContext =
executorData.getWorkerTopologyContext();
+ this.reportError = executorData.getReportError();
+ this.sampler = executorData.getSampler();
+ this.rand = new Random(Utils.secureRandomLong());
+ this.transferQueue = executorData.getBatchTransferWorkerQueue();
+ this.receiveQueue = executorData.getReceiveQueue();
+ this.idToTask = idToTask;
+ this.credentials = credentials;
+ this.isDebug =
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
+ this.isEventLoggers = StormCommon.hasEventLoggers(stormConf);
+
+ try {
+ this.hostname = Utils.hostname(stormConf);
+ } catch (UnknownHostException ignored) {
+ this.hostname = "";
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onEvent(Object event, long seq, boolean endOfBatch) throws
Exception {
+ ArrayList<AddressedTuple> addressedTuples =
(ArrayList<AddressedTuple>) event;
+ for (AddressedTuple addressedTuple : addressedTuples) {
+ TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+ int taskId = addressedTuple.getDest();
+ if (isDebug) {
+ LOG.info("Processing received message FOR {} TUPLE: {}",
taskId, tuple);
+ }
+ if (taskId != AddressedTuple.BROADCAST_DEST) {
+ tupleActionFn(taskId, tuple);
+ } else {
+ for (Integer t : executorData.getTaskIds()) {
+ tupleActionFn(t, tuple);
+ }
+ }
+ }
+ }
+
+ public void metricsTick(Task taskData, TupleImpl tuple) {
+ try {
+ Integer interval = tuple.getInteger(0);
+ int taskId = taskData.getTaskId();
+ Map<Integer, Map<String, IMetric>> taskToMetricToRegistry =
executorData.getIntervalToTaskToMetricToRegistry().get(interval);
+ Map<String, IMetric> nameToRegistry = null;
+ if (taskToMetricToRegistry != null) {
--- End diff --
this null check is not present in clojure code.
> port backtype.storm.daemon.executor to java
> -------------------------------------------
>
> Key: STORM-1277
> URL: https://issues.apache.org/jira/browse/STORM-1277
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Cody
> Labels: java-migration, jstorm-merger
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task
> kind of. Tasks and executors are combined in jstorm.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)