[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15402121#comment-15402121 ]
ASF GitHub Bot commented on STORM-1277: --------------------------------------- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72987136 --- Diff: storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java --- @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.bolt; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.Task; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BoltOutputCollectorImpl implements IOutputCollector { + + private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class); + + private final BoltExecutor executor; + private final Task taskData; + private final int taskId; + private final Random random; + private final boolean isEventLoggers; + private final boolean isDebug; + + public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random, + boolean isEventLoggers, boolean isDebug) { + this.executor = executor; + this.taskData = taskData; + this.taskId = taskId; + this.random = random; + this.isEventLoggers = isEventLoggers; + this.isDebug = isDebug; + } + + public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { + return boltEmit(streamId, anchors, tuple, null); + } + + @Override + public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { + boltEmit(streamId, anchors, tuple, taskId); + } + + private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values, Integer targetTaskId) { + List<Integer> outTasks; + if (targetTaskId != null) { + outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values); + } else { + outTasks = taskData.getOutgoingTasks(streamId, values); + } + + for (Integer t : outTasks) { + Map<Long, Long> anchorsToIds = new HashMap<>(); + if (anchors != null) { + for (Tuple a : anchors) { + long edgeId = MessageId.generateId(random); --- End diff -- ``` (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)] - (when (pos? (count root-ids)) - (let [edge-id (MessageId/generateId rand)] - (.updateAckVal a edge-id) - (fast-list-iter [root-id root-ids] - (put-xor! anchors-to-ids root-id edge-id)))))) ``` if root-ids.size() is 0, ported code behaves differ. It should check size first, and skip all if size is 0. > port backtype.storm.daemon.executor to java > ------------------------------------------- > > Key: STORM-1277 > URL: https://issues.apache.org/jira/browse/STORM-1277 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Cody > Labels: java-migration, jstorm-merger > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task > kind of. Tasks and executors are combined in jstorm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)