Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r158940938
--- Diff: storm-client/src/jvm/org/apache/storm/utils/JCQueue.java ---
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metric.internal.RateTracker;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.MpscArrayQueue;
+import org.jctools.queues.MpscUnboundedArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public final class JCQueue implements IStatefulObject {
+ private static final Logger LOG =
LoggerFactory.getLogger(JCQueue.class);
+
+ public static final Object INTERRUPT = new Object();
+
+ private final ThroughputMeter emptyMeter = new
ThroughputMeter("EmptyBatch");
+ private final ExitCondition continueRunning = () -> true;
+
+ private interface Inserter {
+ // blocking call that can be interrupted using Thread.interrupt()
+ void publish(Object obj) throws InterruptedException;
+ boolean tryPublish(Object obj);
+
+ void flush() throws InterruptedException;
+ boolean tryFlush();
+ }
+
+ /* Thread safe. Same instance can be used across multiple threads */
+ private static class DirectInserter implements Inserter {
+ private JCQueue q;
+
+ public DirectInserter(JCQueue q) {
+ this.q = q;
+ }
+
+ /** Blocking call, that can be interrupted via Thread.interrupt */
+ @Override
+ public void publish(Object obj) throws InterruptedException {
+ boolean inserted = q.tryPublishInternal(obj);
+ int idleCount = 0;
+ while (!inserted) {
+ q.metrics.notifyInsertFailure();
+ if (idleCount==0) { // check avoids multiple log msgs when
in a idle loop
+ LOG.debug("Experiencing Back Pressure on recvQueue:
'{}'. Entering BackPressure Wait", q.getName());
+ }
+
+ idleCount = q.backPressureWaitStrategy.idle(idleCount);
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ inserted = q.tryPublishInternal(obj);
+ }
+
+ }
+
+ /** Non-Blocking call. return value indicates success/failure */
+ @Override
+ public boolean tryPublish(Object obj) {
+ boolean inserted = q.tryPublishInternal(obj);
+ if (!inserted) {
+ q.metrics.notifyInsertFailure();
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void flush() throws InterruptedException {
+ return;
+ }
+
+ @Override
+ public boolean tryFlush() {
+ return true;
+ }
+ } // class DirectInserter
+
+ private static class BatchInserter implements Inserter {
--- End diff --
good point.
---