[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401651#comment-15401651 ]
ASF GitHub Bot commented on STORM-1277: --------------------------------------- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72934070 --- Diff: storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java --- @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.bolt; + +import clojure.lang.Atom; +import com.google.common.collect.ImmutableMap; +import java.util.List; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.executor.Executor; +import org.apache.storm.hooks.info.BoltExecuteInfo; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.task.IBolt; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.Callable; + +public class BoltExecutor extends Executor { + + private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class); + + private final Callable<Boolean> executeSampler; + + public BoltExecutor(Map workerData, List<Long> executorId, Map<String, String> credentials) { + super(workerData, executorId, credentials); + this.executeSampler = ConfigUtils.mkStatsSampler(stormConf); + } + + @Override + public void init(Map<Integer, Task> idToTask) { + this.idToTask = idToTask; + LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet()); + for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) { + Task taskData = entry.getValue(); + IBolt boltObject = (IBolt) taskData.getTaskObject(); + TopologyContext userContext = taskData.getUserContext(); + taskData.getBuiltInMetrics().registerAll(stormConf, userContext); + if (boltObject instanceof ICredentialsListener) { + ((ICredentialsListener) boltObject).setCredentials(credentials); + } + if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) { + Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue, + "transfer", (DisruptorQueue) workerData.get("transfer-queue")); + BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext); + + Map cachedNodePortToSocket = (Map) ((Atom) workerData.get("cached-node+port->socket")).deref(); + BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, stormConf, userContext); + BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.get("receiver"), stormConf, userContext); + } else { + Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue); + BuiltinMetricsUtil.registerQueueMetrics(map, stormConf, userContext); + } + + IOutputCollector outputCollector = new BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, isEventLoggers, isDebug); + boltObject.prepare(stormConf, userContext, new OutputCollector(outputCollector)); + } + openOrPrepareWasCalled.set(true); + LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet()); + setupMetrics(); + } + + @Override + public Object call() throws Exception { + while (!stormActive.get()) { --- End diff -- Same. This should block init() only once instead of each loop. > port backtype.storm.daemon.executor to java > ------------------------------------------- > > Key: STORM-1277 > URL: https://issues.apache.org/jira/browse/STORM-1277 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Cody > Labels: java-migration, jstorm-merger > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task > kind of. Tasks and executors are combined in jstorm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)