Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2488#discussion_r245007052
--- Diff:
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array
modifications must go through these.
+ *
+ * @param <T> The type this class may hold, this is generic as can be
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl<T extends PriorityAware> extends
AbstractCollection<T> implements QueueConsumers<T> {
+
+ private final QueueConsumersIterator<T> iterator = new
QueueConsumersIterator<>(this, true);
+
+ private volatile Level<T>[] levels;
+ private volatile int size;
+ private volatile T first;
+
+ private void setArray(Level<T>[] array) {
+ this.levels = array;
+ }
+
+ private Level<T>[] getArray() {
+ return levels;
+ }
+
+
+ public QueueConsumersImpl() {
+ levels = newLevelArrayInstance(0);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Level<T>[] newLevelArrayInstance(int length) {
+ return (Level<T>[]) Array.newInstance(Level.class, length);
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ @Override
+ public Set<Integer> getPriorites() {
+ Level<T>[] levels = getArray();
+ return
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new QueueConsumersIterator<>(this, false);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ return iterator.next();
+ }
+
+ @Override
+ public QueueConsumers<T> reset() {
+ iterator.reset();
+ return this;
+ }
+
+ @Override
+ public void forEach(Consumer<? super T> action) {
+ Objects.requireNonNull(action);
+ Level<T>[] current = getArray();
+ int len = current.length;
+ for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+ }
+ }
+
+ private Level<T> getLevel(int level, boolean createIfMissing) {
+ Level<T>[] current = getArray();
+ int low = 0;
+ int high = current.length - 1;
+
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+ Level<T> midVal = current[mid];
+
+ if (midVal.level() > level)
+ low = mid + 1;
+ else if (midVal.level() < level)
+ high = mid - 1;
+ else
+ return midVal; //key found
+ }
+
+ if (createIfMissing) {
+ Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
+ if (low > 0) {
+ System.arraycopy(current, 0, newLevels, 0, low);
+ }
+ if (current.length - low > 0) {
+ System.arraycopy(current, low, newLevels, low + 1,
current.length - low);
+ }
+ newLevels[low] = new Level<T>(level);
+ setArray(newLevels);
+ return newLevels[low];
+ }
+ return null;
+ }
+
+ @Override
+ public synchronized boolean add(T t) {
--- End diff --
Yes, its following the lines on design of CopyOnWriteArrayList, but using
syncronized methods over using Reentrant lock as concurrency expected is low.
---