netudima commented on code in PR #4003:
URL: https://github.com/apache/cassandra/pull/4003#discussion_r2032078397
##########
src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java:
##########
@@ -855,4 +855,319 @@ public String toString()
return "[" + min + ',' + max + ']';
}
}
+
+ interface MetricCleaner
+ {
+ void clean();
+ }
+
+ private static class BucketsPhantomReference extends
PhantomReference<Object> implements MetricCleaner
+ {
+ private final MetricCleaner cleaner;
+
+ public BucketsPhantomReference(ReferenceQueue<? super Object> q,
MetricCleaner cleaner)
+ {
+ super(Thread.currentThread(), q);
+ this.cleaner = cleaner;
+ }
+
+ public void clean()
+ {
+ cleaner.clean();
+ }
+ }
+
+ /**
+ * Writes are exclusive to the thread-local buckets, so we can use a
single updater for all threads.
+ * Readers will see a consistent view of the buckets and could be blocked
for a while.
+ * <p>
+ * The class is aslso being tracked by a phantom reference queue to
release the accumulated buckets when the thread is dead.
+ */
+ protected class BucketsThreadLocal
+ {
+ // try to use int[] instead of long[] to reduce memory usage, and move
to the sum array when overflow
+ private final AtomicReference<DecayingArray> decayingRef;
+ private final long[] estimated;
+ private volatile boolean writing;
+
+ public BucketsThreadLocal(int size)
+ {
+ this.decayingRef = new AtomicReference<>(new DecayingArray(size,
decayingEstimatedBuckets.decayLandmark));
+ this.estimated = new long[size];
+ }
+
+ public void update(int index, long now)
+ {
+ // This is only called by the thread that owns the thread local,
so we don't need to worry about contention.
+ // Once the rescaling has occurred, we need to flush the values to
the decayingBucket and report that the values are no longer in use.
+ writing = true;
+ try
+ {
+ DecayingArray decaying = decayingRef.get();
+ if (decaying.decayLandmark !=
decayingEstimatedBuckets.decayLandmark)
+ decayingEstimatedBuckets.flush(this, decaying);
+
+ decayingRef.get().update(index, now);
+ estimated[index]++;
+ }
+ finally
+ {
+ writing = false;
+ }
+ }
+
+ public void release()
+ {
+ // The release method could be called by the
FastThreadLocal#onRemoval or by the PhantomReference queue.
+ // We need to make sure we transfer the values to the
decayingBuckets only once.
+ // There is also no need for the BucketsThreadLocal#inUse check
since the thread is dead and no one will update the values.
+ if (!bucketsThreadLocals.contains(this))
+ return;
+ long stamp = bucketsStampedLock.writeLock();
+ try
+ {
+ if (!bucketsThreadLocals.remove(this))
+ return;
+ DecayingArray locDecaying = decayingRef.get();
+ // The same write lock is used to flush the values to the
decaying buckets and to rescale the values,
+ // so the landmark is safe to use from the decaying array.
+ decayingEstimatedBuckets.updateExclusive((index, value) ->
locDecaying.data[(int) index] + value,
+ (index, value) ->
estimated[(int) index] + value,
+
locDecaying.decayLandmark);
+ }
+ finally
+ {
+ bucketsStampedLock.unlockWrite(stamp);
+ }
+ }
+ }
+
+ private static class DecayingEstimatedBuckets
+ {
+ private final int size;
+ /** Lock to protect the decaying {@code buckets}. Only one thread can
update the buckets at a time. */
+ private final StampedLock stampedLock;
+ private final ConcurrentLinkedQueue<DecayingArray> decayingPending =
new ConcurrentLinkedQueue<>();
+ /**
+ * The buckets array is used to store the decaying and estimated
buckets, we can use the same array for both.
+ * The actual size of the array is twice the size of the decaying
buckets or the estimated buckets.
+ * <p>
+ * The first half is used for the decaying buckets and the second half
is used for the estimated buckets.
+ */
+ private final long[] buckets;
+ private volatile long decayLandmark;
+
+ public DecayingEstimatedBuckets(StampedLock shared, int size, long now)
+ {
+ this.size = size;
+ this.buckets = new long[size * 2];
+ this.decayLandmark = now;
+ this.stampedLock = shared;
+ }
+
+ public void rescale(Set<BucketsThreadLocal> locals, long now)
+ {
+ if (now - decayLandmark <= LANDMARK_RESET_INTERVAL_IN_NS)
+ return;
+ long stamp = stampedLock.writeLock();
+ try
+ {
+ long previousDecayLandmark = decayLandmark;
+ decayLandmark = now;
+
+ // The list of thread locals should be fetched after the lock
is taken and decayLandmark is updated.
+ for (BucketsThreadLocal local : locals)
+ {
+ while (true)
+ {
+ DecayingArray prev = local.decayingRef.get();
+ // Skip the thread local if it was created after the
decayLandmark was updated.
+ if (prev.decayLandmark == now)
+ break;
+ if (local.decayingRef.compareAndSet(prev, new
DecayingArray(size, now)))
+ {
+ // We successfully switched the thread local to
the new decayLandmark, wait for the thread to finish updating.
+ while (local.writing)
+ LockSupport.parkNanos(50);
+ decayingPending.offer(prev);
+ break;
+ }
+ }
+ }
+ flushPendingExclusive();
+
DecayingEstimatedHistogramReservoir.decay(LongBuffer.wrap(buckets, 0, size),
previousDecayLandmark, now);
+ }
+ finally
+ {
+ stampedLock.unlockWrite(stamp);
+ }
+ }
+
+ public void updateExclusive(LongBinaryOperator decayingOp,
LongBinaryOperator estimatedOp, long decayLandmark)
+ {
+ assert stampedLock.isWriteLocked();
+ this.decayLandmark = decayLandmark;
+ for (int i = 0; i < size; i++)
+ {
+ buckets[i] = decayingOp.applyAsLong(i, buckets[i]);
+ buckets[size + i] = estimatedOp.applyAsLong(i, buckets[size +
i]);
+ }
+ }
+
+ /**
+ * Used only by a thread-local writer to flush the values to the
buffer.
+ */
+ public void flush(BucketsThreadLocal local, DecayingArray decaying)
+ {
+ long stamp = stampedLock.tryWriteLock();
+ if (stamp > 0)
+ {
+ try
+ {
+ boolean success =
local.decayingRef.compareAndSet(decaying, new DecayingArray(size,
decayLandmark));
+ assert success : "The thread local was updated by another
thread";
+ decayingPending.offer(decaying);
+ flushPendingExclusive();
+ }
+ finally
+ {
+ stampedLock.unlockWrite(stamp);
+ }
+ }
+ else
+ {
+ boolean success = local.decayingRef.compareAndSet(decaying,
new DecayingArray(size, decayLandmark));
+ // If the CAS failed, the thread local was updated by the
rescale thread and all the values were already flushed.
+ if (success)
+ decayingPending.offer(decaying);
+ }
+ }
+
+ public DecayingEstimatedArray snapshot(Set<BucketsThreadLocal> locals)
+ {
+ long[] decaying = new long[size];
+ long[] estimated = new long[size];
+ long resultLandmark = this.decayLandmark;
+ long stamp;
+ do
+ {
+ stamp = stampedLock.tryWriteLock();
+ if (stamp > 0)
+ {
+ try
+ {
+ flushPendingExclusive();
+ }
+ finally
+ {
+ stampedLock.unlockWrite(stamp);
+ }
+ }
+ // If the write lock is not available, we need to use the
optimistic read lock.
+ // This will allow us to read the buckets without blocking
other threads.
+ // We need to make sure that the buckets and the list of
thread locals are consistent,
+ // while we are reading them, so we won't miss any updates or
overlap with thread locals being released.
+ stamp = stampedLock.tryOptimisticRead();
+ if (stamp == 0)
+ {
+ LockSupport.parkNanos(this, 100);
+ continue;
+ }
+ Arrays.fill(decaying, 0);
+ Arrays.fill(estimated, 0);
+ resultLandmark = this.decayLandmark;
+ for (int i = 0; i < size; i++)
+ {
+ decaying[i] = buckets[i];
+ estimated[i] = buckets[size + i];
+ }
+ for (BucketsThreadLocal local : locals)
+ {
+ DecayingArray decayingLoc = local.decayingRef.get();
+ long[] estimatedLoc = local.estimated;
+ for (int i = 0; i < size; i++)
+ {
+ decaying[i] += decayingLoc.data[i];
+ estimated[i] += estimatedLoc[i];
+ }
+ }
+ } while (!stampedLock.validate(stamp));
+
+ return new DecayingEstimatedArray(decaying, estimated,
resultLandmark);
+ }
+
+ private void flushPendingExclusive()
+ {
+ assert stampedLock.isWriteLocked();
+ DecayingArray arr;
+ while ((arr = decayingPending.poll()) != null)
+ {
+ // We need to flush the values to the decaying buckets only,
which is a half of the buckets array.
+ for (int i = 0; i < arr.data.length; i++)
+ buckets[i] += arr.data[i];
+ }
+ }
+ }
+
+ private static class DecayingEstimatedArray
+ {
+ private final long[] decaying;
+ private final long[] estimated;
+ private final long decayLandmark;
+
+ public DecayingEstimatedArray(long[] decaying, long[] estimated, long
decayLandmark)
+ {
+ this.decaying = decaying;
+ this.estimated = estimated;
+ this.decayLandmark = decayLandmark;
+ }
+
+ public long[] estimated()
+ {
+ return estimated;
+ }
+
+ public long[] decaying()
+ {
+ return decaying;
+ }
+
+ public long landmark()
+ {
+ return decayLandmark;
+ }
+ }
+
+ /**
+ * This class is used to store the decaying buckets in a thread local
variable along with the landmark.
+ * No concurrency issues are expected here, as the thread local is only
used by one thread at a time.
+ */
+ private static class DecayingArray
+ {
+ private final long[] data;
+ private final long decayLandmark;
+ /**
+ * As SampledClock is used to register the last time the decay weight
was sampled,
+ * and the precision of the clock is not guaranteed to be nanoseconds
(approximately 2ms),
+ * we can avoid calculating the decay weight for every sample and
instead use the last calculated weight.
+ */
+ private long lastSampledClock;
+ private long lastDecayedWeight;
+
+ public DecayingArray(int size, long decayLandmark)
+ {
+ this.data = new long[size];
+ this.decayLandmark = decayLandmark;
+ }
+
+ public void update(int index, long now)
+ {
+ if (lastSampledClock != now)
Review Comment:
forwardDecayWeight actually uses seconds granularity, so we can reuse the
value for much longer..
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]