Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r158208197
--- Diff:
storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java ---
@@ -18,105 +18,188 @@
package org.apache.storm.executor.bolt;
import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+
+import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.daemon.Task;
import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
+import org.apache.storm.policy.WaitStrategyPark;
import org.apache.storm.stats.BoltExecutorStats;
import org.apache.storm.task.IBolt;
-import org.apache.storm.task.IOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.JCQueue.ExitCondition;
+import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.function.BooleanSupplier;
public class BoltExecutor extends Executor {
private static final Logger LOG =
LoggerFactory.getLogger(BoltExecutor.class);
- private final Callable<Boolean> executeSampler;
+ private final BooleanSupplier executeSampler;
+ private final boolean isSystemBoltExecutor;
+ private final IWaitStrategy consumeWaitStrategy; // employed
when no incoming data
+ private final IWaitStrategy backPressureWaitStrategy; // employed
when outbound path is congested
+ private BoltOutputCollectorImpl outputCollector;
public BoltExecutor(WorkerState workerData, List<Long> executorId,
Map<String, String> credentials) {
super(workerData, executorId, credentials);
this.executeSampler = ConfigUtils.mkStatsSampler(topoConf);
+ this.isSystemBoltExecutor = (executorId ==
Constants.SYSTEM_EXECUTOR_ID );
+ if (isSystemBoltExecutor) {
+ this.consumeWaitStrategy = makeSystemBoltWaitStrategy();
+ } else {
+ this.consumeWaitStrategy =
ReflectionUtils.newInstance((String)
topoConf.get(Config.TOPOLOGY_BOLT_WAIT_STRATEGY));
+ this.consumeWaitStrategy.prepare(topoConf,
WAIT_SITUATION.BOLT_WAIT);
+ }
+ this.backPressureWaitStrategy =
ReflectionUtils.newInstance((String)
topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
+ this.backPressureWaitStrategy.prepare(topoConf,
WAIT_SITUATION.BACK_PRESSURE_WAIT);
+
}
- public void init(Map<Integer, Task> idToTask) {
+ private static IWaitStrategy makeSystemBoltWaitStrategy() {
+ WaitStrategyPark ws = new WaitStrategyPark();
+ HashMap conf = new HashMap<String,Object>();
+ conf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 5000);
+ ws.prepare(conf, WAIT_SITUATION.BOLT_WAIT);
+ return ws;
+ }
+
+ public void init(ArrayList<Task> idToTask, int idToTaskBase) {
+ executorTransfer.initLocalRecvQueues();
while (!stormActive.get()) {
Utils.sleep(100);
}
- this.errorReportingMetrics.registerAll(topoConf,
idToTask.values().iterator().next().getUserContext());
-
- LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet());
- for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
- Task taskData = entry.getValue();
+ if (!componentId.equals(StormCommon.SYSTEM_STREAM_ID)) { // System
bolt doesn't call reportError()
+ this.errorReportingMetrics.registerAll(topoConf,
idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
+ }
+ LOG.info("Preparing bolt {}:{}", componentId, getTaskIds());
+ for (Task taskData : idToTask) {
+ if (taskData == null) {
+ //This happens if the min id is too small
+ continue;
+ }
IBolt boltObject = (IBolt) taskData.getTaskObject();
TopologyContext userContext = taskData.getUserContext();
taskData.getBuiltInMetrics().registerAll(topoConf,
userContext);
if (boltObject instanceof ICredentialsListener) {
((ICredentialsListener)
boltObject).setCredentials(credentials);
}
if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
- Map<String, DisruptorQueue> map =
ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue,
- "transfer", workerData.getTransferQueue());
+ Map<String, JCQueue> map = ImmutableMap.of("receive",
receiveQueue, "transfer", workerData.getTransferQueue());
BuiltinMetricsUtil.registerQueueMetrics(map, topoConf,
userContext);
- Map cachedNodePortToSocket = (Map)
workerData.getCachedNodeToPortSocket().get();
+ Map cachedNodePortToSocket =
workerData.getCachedNodeToPortSocket().get();
BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket,
topoConf, userContext);
BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(),
topoConf, userContext);
} else {
- Map<String, DisruptorQueue> map =
ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+ Map<String, JCQueue> map = ImmutableMap.of("receive",
receiveQueue);
BuiltinMetricsUtil.registerQueueMetrics(map, topoConf,
userContext);
}
- IOutputCollector outputCollector = new
BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, hasEventLoggers,
isDebug);
+ this.outputCollector = new BoltOutputCollectorImpl(this,
taskData, rand, hasEventLoggers, ackingEnabled, isDebug);
boltObject.prepare(topoConf, userContext, new
OutputCollector(outputCollector));
}
openOrPrepareWasCalled.set(true);
- LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet());
+ LOG.info("Prepared bolt {}:{}", componentId, taskIds);
setupMetrics();
}
@Override
- public Callable<Object> call() throws Exception {
- init(idToTask);
+ public Callable<Long> call() throws Exception {
+ init(idToTask, idToTaskBase);
- return new Callable<Object>() {
+ return new Callable<Long>() {
+ private ExitCondition tillNoPendingEmits = () ->
pendingEmits.isEmpty();
+ int bpIdleCount = 0;
+ int consumeIdleCounter = 0;
@Override
- public Object call() throws Exception {
- receiveQueue.consumeBatchWhenAvailable(BoltExecutor.this);
+ public Long call() throws Exception {
+ boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
+ if (pendingEmitsIsEmpty) {
+ if (bpIdleCount!=0) {
+ LOG.debug("Ending Back Pressure Wait stretch :
{}", bpIdleCount);
+ }
+ bpIdleCount = 0;
+ int consumeCount =
receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits);
+ if (consumeCount == 0) {
+ if (consumeIdleCounter==0) {
+ LOG.debug("Invoking consume wait strategy");
+ }
+ consumeIdleCounter =
consumeWaitStrategy.idle(consumeIdleCounter);
--- End diff --
Could we let wait strategy instances handle relevant counters
(consumeIdleCounter, bpIdleCount)?
---