[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15403248#comment-15403248 ]
ASF GitHub Bot commented on STORM-1277: --------------------------------------- Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73085409 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + + private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + + private final ISpoutWaitStrategy spoutWaitStrategy; + private Integer maxSpoutPending; + private final AtomicBoolean lastActive; + private List<ISpout> spouts; + private List<SpoutOutputCollector> outputCollectors; + private final MutableLong emittedCount; + private final MutableLong emptyEmitStreak; + private final SpoutThrottlingMetrics spoutThrottlingMetrics; + private final boolean hasAckers; + private RotatingMap<Long, TupleInfo> pending; + private final boolean backPressureEnabled; + + public SpoutExecutor(final Map workerData, final List<Long> executorId, Map<String, String> credentials) { + super(workerData, executorId, credentials); + this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); + this.spoutWaitStrategy.prepare(stormConf); + + this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + + this.lastActive = new AtomicBoolean(false); + this.hasAckers = StormCommon.hasAckers(stormConf); + this.emittedCount = new MutableLong(0); + this.emptyEmitStreak = new MutableLong(0); + this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); + } + + @Override + public void init(final Map<Integer, Task> idToTask) { + LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); + this.idToTask = idToTask; + this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); + this.spouts = new ArrayList<>(); + for (Task task : idToTask.values()) { + this.spouts.add((ISpout) task.getTaskObject()); + } + this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() { + @Override + public void expire(Long key, TupleInfo tupleInfo) { + Long timeDelta = null; + if (tupleInfo.getTimestamp() != 0) { + timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); + } + failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId()), timeDelta, tupleInfo, "TIMEOUT"); + } + }); + + this.spoutThrottlingMetrics.registerAll(stormConf, idToTask.values().iterator().next().getUserContext()); + this.outputCollectors = new ArrayList<>(); + for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) { + Task taskData = entry.getValue(); + ISpout spoutObject = (ISpout) taskData.getTaskObject(); + SpoutOutputCollectorImpl spoutOutputCollector = new SpoutOutputCollectorImpl( + spoutObject, this, taskData, entry.getKey(), emittedCount, + hasAckers, rand, isEventLoggers, isDebug, pending); + SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector); + this.outputCollectors.add(outputCollector); + + taskData.getBuiltInMetrics().registerAll(stormConf, taskData.getUserContext()); + Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue); + BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, taskData.getUserContext()); + + if (spoutObject instanceof ICredentialsListener) { + ((ICredentialsListener) spoutObject).setCredentials(credentials); + } + spoutObject.open(stormConf, taskData.getUserContext(), outputCollector); + } + openOrPrepareWasCalled.set(true); + LOG.info("Opened spout {}:{}", componentId, idToTask.keySet()); + setupMetrics(); + } + + @Override + public Object call() throws Exception { + while (!stormActive.get()) { --- End diff -- I'm not sure if I understand correctly, you mean we should have a flag like `inited`, and call init inside of asyncLoop once and set the `inited` to true, then start the loop to consume disruptor queue? But calling init inside the loop once doesn't differ much from calling it outside. > port backtype.storm.daemon.executor to java > ------------------------------------------- > > Key: STORM-1277 > URL: https://issues.apache.org/jira/browse/STORM-1277 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Cody > Labels: java-migration, jstorm-merger > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task > kind of. Tasks and executors are combined in jstorm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)