Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167326878
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -313,107 +330,74 @@ public void metricsTick(Task taskData, TupleImpl
tuple) {
protected void setupMetrics() {
for (final Integer interval :
intervalToTaskToMetricToRegistry.keySet()) {
StormTimer timerTask = workerData.getUserTimer();
- timerTask.scheduleRecurring(interval, interval, new Runnable()
{
- @Override
- public void run() {
- TupleImpl tuple = new TupleImpl(workerTopologyContext,
new Values(interval),
- (int) Constants.SYSTEM_TASK_ID,
Constants.METRICS_TICK_STREAM_ID);
- List<AddressedTuple> metricsTickTuple =
- Lists.newArrayList(new
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
- receiveQueue.publish(metricsTickTuple);
+ timerTask.scheduleRecurring(interval, interval,
+ () -> {
+ TupleImpl tuple = new TupleImpl(workerTopologyContext,
new Values(interval), Constants.SYSTEM_COMPONENT_ID,
+ (int) Constants.SYSTEM_TASK_ID,
Constants.METRICS_TICK_STREAM_ID);
+ AddressedTuple metricsTickTuple = new
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+ try {
+ receiveQueue.publish(metricsTickTuple);
+ receiveQueue.flush(); // avoid buffering
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted when publishing
metrics. Setting interrupt flag.");
+ Thread.currentThread().interrupt();
+ return;
+ }
}
- });
- }
- }
-
- public void sendUnanchored(Task task, String stream, List<Object>
values, ExecutorTransfer transfer) {
- Tuple tuple = task.getTuple(stream, values);
- List<Integer> tasks = task.getOutgoingTasks(stream, values);
- for (Integer t : tasks) {
- transfer.transfer(t, tuple);
- }
- }
-
- /**
- * Send sampled data to the eventlogger if the global or component
level debug flag is set (via nimbus api).
- */
- public void sendToEventLogger(Executor executor, Task taskData, List
values,
- String componentId, Object messageId,
Random random) {
- Map<String, DebugOptions> componentDebug =
executor.getStormComponentDebug().get();
- DebugOptions debugOptions = componentDebug.get(componentId);
- if (debugOptions == null) {
- debugOptions = componentDebug.get(executor.getStormId());
- }
- double spct = ((debugOptions != null) &&
(debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
- if (spct > 0 && (random.nextDouble() * 100) < spct) {
- sendUnanchored(taskData, StormCommon.EVENTLOGGER_STREAM_ID,
- new Values(componentId, messageId,
System.currentTimeMillis(), values),
- executor.getExecutorTransfer());
+ );
}
}
- public void reflectNewLoadMapping(LoadMapping loadMapping) {
- for (LoadAwareCustomStreamGrouping g : groupers) {
- g.refreshLoad(loadMapping);
- }
- }
-
- private void registerBackpressure() {
- receiveQueue.registerBackpressureCallback(new
DisruptorBackpressureCallback() {
- @Override
- public void highWaterMark() throws Exception {
- LOG.debug("executor " + executorId + " is congested, set
backpressure flag true");
-
WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
- }
-
- @Override
- public void lowWaterMark() throws Exception {
- LOG.debug("executor " + executorId + " is not-congested,
set backpressure flag false");
-
WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
- }
- });
-
receiveQueue.setHighWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
-
receiveQueue.setLowWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
-
receiveQueue.setEnableBackpressure(ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE),
false));
- }
-
protected void setupTicks(boolean isSpout) {
final Integer tickTimeSecs =
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
- boolean enableMessageTimeout = (Boolean)
topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
if (tickTimeSecs != null) {
+ boolean enableMessageTimeout = (Boolean)
topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) &&
Utils.isSystemId(componentId))
|| (!enableMessageTimeout && isSpout)) {
- LOG.info("Timeouts disabled for executor {}:{}",
componentId, executorId);
+ LOG.info("Timeouts disabled for executor " + componentId +
":" + executorId);
--- End diff --
nit: why did we go back to String concatenation?
---