Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r158700986
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/JCQueue.java ---
    @@ -0,0 +1,457 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.utils;
    +
    +import org.apache.storm.policy.IWaitStrategy;
    +import org.apache.storm.metric.api.IStatefulObject;
    +import org.apache.storm.metric.internal.RateTracker;
    +import org.jctools.queues.MessagePassingQueue;
    +import org.jctools.queues.MpscArrayQueue;
    +import org.jctools.queues.MpscUnboundedArrayQueue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +
    +public final class JCQueue implements IStatefulObject {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(JCQueue.class);
    +
    +    public static final Object INTERRUPT = new Object();
    +
    +    private final ThroughputMeter emptyMeter = new 
ThroughputMeter("EmptyBatch");
    +    private final ExitCondition continueRunning = () -> true;
    +
    +    private interface Inserter {
    +        // blocking call that can be interrupted using Thread.interrupt()
    +        void publish(Object obj) throws InterruptedException;
    +        boolean tryPublish(Object obj);
    +
    +        void flush() throws InterruptedException;
    +        boolean tryFlush();
    +    }
    +
    +    /* Thread safe. Same instance can be used across multiple threads */
    +    private static class DirectInserter implements Inserter {
    +        private JCQueue q;
    +
    +        public DirectInserter(JCQueue q) {
    +            this.q = q;
    +        }
    +
    +        /** Blocking call, that can be interrupted via Thread.interrupt */
    +        @Override
    +        public void publish(Object obj) throws InterruptedException {
    +            boolean inserted = q.tryPublishInternal(obj);
    +            int idleCount = 0;
    +            while (!inserted) {
    +                q.metrics.notifyInsertFailure();
    +                if (idleCount==0) { // check avoids multiple log msgs when 
in a idle loop
    +                    LOG.debug("Experiencing Back Pressure on recvQueue: 
'{}'. Entering BackPressure Wait", q.getName());
    +                }
    +
    +                idleCount = q.backPressureWaitStrategy.idle(idleCount);
    +                if (Thread.interrupted()) {
    +                    throw new InterruptedException();
    +                }
    +                inserted = q.tryPublishInternal(obj);
    +            }
    +
    +        }
    +
    +        /** Non-Blocking call. return value indicates success/failure */
    +        @Override
    +        public boolean tryPublish(Object obj) {
    +            boolean inserted = q.tryPublishInternal(obj);
    +            if (!inserted) {
    +                q.metrics.notifyInsertFailure();
    +                return false;
    +            }
    +            return true;
    +        }
    +
    +        @Override
    +        public void flush() throws InterruptedException {
    +            return;
    +        }
    +
    +        @Override
    +        public boolean tryFlush() {
    +            return true;
    +        }
    +    } // class DirectInserter
    +
    +    private static class BatchInserter implements Inserter {
    --- End diff --
    
    minor: may be better to leave a comment that the class is non thread-safe. 
Even better to also add comment to ThreadLocal variable.


---

Reply via email to