[
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401640#comment-15401640
]
ASF GitHub Bot commented on STORM-1277:
---------------------------------------
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/1445#discussion_r72933087
--- Diff:
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SpoutExecutor.class);
+
+ private final ISpoutWaitStrategy spoutWaitStrategy;
+ private Integer maxSpoutPending;
+ private final AtomicBoolean lastActive;
+ private List<ISpout> spouts;
+ private List<SpoutOutputCollector> outputCollectors;
+ private final MutableLong emittedCount;
+ private final MutableLong emptyEmitStreak;
+ private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+ private final boolean hasAckers;
+ private RotatingMap<Long, TupleInfo> pending;
+ private final boolean backPressureEnabled;
+
+ public SpoutExecutor(final Map workerData, final List<Long>
executorId, Map<String, String> credentials) {
+ super(workerData, executorId, credentials);
+ this.spoutWaitStrategy = Utils.newInstance((String)
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+ this.spoutWaitStrategy.prepare(stormConf);
+
+ this.backPressureEnabled =
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+ this.lastActive = new AtomicBoolean(false);
+ this.hasAckers = StormCommon.hasAckers(stormConf);
+ this.emittedCount = new MutableLong(0);
+ this.emptyEmitStreak = new MutableLong(0);
+ this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+ }
+
+ @Override
+ public void init(final Map<Integer, Task> idToTask) {
+ LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+ this.idToTask = idToTask;
+ this.maxSpoutPending =
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) *
idToTask.size();
+ this.spouts = new ArrayList<>();
+ for (Task task : idToTask.values()) {
+ this.spouts.add((ISpout) task.getTaskObject());
+ }
+ this.pending = new RotatingMap<>(2, new
RotatingMap.ExpiredCallback<Long, TupleInfo>() {
+ @Override
+ public void expire(Long key, TupleInfo tupleInfo) {
+ Long timeDelta = null;
+ if (tupleInfo.getTimestamp() != 0) {
+ timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+ }
+ failSpoutMsg(SpoutExecutor.this,
idToTask.get(tupleInfo.getTaskId()), timeDelta, tupleInfo, "TIMEOUT");
+ }
+ });
+
+ this.spoutThrottlingMetrics.registerAll(stormConf,
idToTask.values().iterator().next().getUserContext());
+ this.outputCollectors = new ArrayList<>();
+ for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
+ Task taskData = entry.getValue();
+ ISpout spoutObject = (ISpout) taskData.getTaskObject();
+ SpoutOutputCollectorImpl spoutOutputCollector = new
SpoutOutputCollectorImpl(
+ spoutObject, this, taskData, entry.getKey(),
emittedCount,
+ hasAckers, rand, isEventLoggers, isDebug, pending);
+ SpoutOutputCollector outputCollector = new
SpoutOutputCollector(spoutOutputCollector);
+ this.outputCollectors.add(outputCollector);
+
+ taskData.getBuiltInMetrics().registerAll(stormConf,
taskData.getUserContext());
+ Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue",
transferQueue, "receive", receiveQueue);
+ BuiltinMetricsUtil.registerQueueMetrics(map, stormConf,
taskData.getUserContext());
+
+ if (spoutObject instanceof ICredentialsListener) {
+ ((ICredentialsListener)
spoutObject).setCredentials(credentials);
+ }
+ spoutObject.open(stormConf, taskData.getUserContext(),
outputCollector);
+ }
+ openOrPrepareWasCalled.set(true);
+ LOG.info("Opened spout {}:{}", componentId, idToTask.keySet());
+ setupMetrics();
+ }
+
+ @Override
+ public Object call() throws Exception {
+ while (!stormActive.get()) {
--- End diff --
AFAIK, this statement should block calling init(), means only use once for
starting up. This can lead infinite loop instead of handling `deactivate`.
> port backtype.storm.daemon.executor to java
> -------------------------------------------
>
> Key: STORM-1277
> URL: https://issues.apache.org/jira/browse/STORM-1277
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Cody
> Labels: java-migration, jstorm-merger
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task
> kind of. Tasks and executors are combined in jstorm.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)