This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new bb63387a317 Dart: Wait for a merge buffer if none is available.
(#18075)
bb63387a317 is described below
commit bb63387a317c2511706e65291e121648fb537c2d
Author: Gian Merlino <[email protected]>
AuthorDate: Sat Jun 14 22:10:24 2025 -0700
Dart: Wait for a merge buffer if none is available. (#18075)
* Dart: Wait for a merge buffer if none is available.
For Dart-only workloads, it is not possible for merge buffers to be
exhausted, because the number of concurrent queries is capped at the
number of merge buffers by DartWorkerMemoryManagementModule. However,
When native and Dart queries are run on the same Historical, it is
possible for buffers to be held by native queries. In this case, Dart
workers must wait for buffers to become available.
This waiting now happens in the worker runner thread, and lasts for the
configured query timeout. At this time, the waiting is not interrupted
when a cancellation request arrives from the controller. This should be
addressed in another patch.
* Add test coverage for DartProcessingBuffersProvider.
* Changes from review.
* Add tests.
* More tests.
---
.../druid/msq/dart/worker/DartFrameContext.java | 15 +--
.../dart/worker/DartProcessingBuffersProvider.java | 11 +-
.../druid/msq/exec/ProcessingBuffersProvider.java | 4 +-
.../druid/msq/exec/ProcessingBuffersSet.java | 35 ++++-
.../druid/msq/indexing/IndexerFrameContext.java | 15 +--
.../indexing/IndexerProcessingBuffersProvider.java | 2 +-
.../indexing/PeonProcessingBuffersProvider.java | 2 +-
.../worker/DartProcessingBuffersProviderTest.java | 142 +++++++++++++++++++
.../druid/msq/exec/ProcessingBuffersSetTest.java | 150 +++++++++++++++++++++
9 files changed, 339 insertions(+), 37 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
index 7f2f928e6f0..3c0fd5cca7e 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
@@ -22,7 +22,6 @@ package org.apache.druid.msq.dart.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.ProcessingBuffers;
import org.apache.druid.msq.exec.WorkerContext;
@@ -40,7 +39,6 @@ import
org.apache.druid.segment.incremental.NoopRowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.loading.DataSegmentPusher;
-import javax.annotation.Nullable;
import java.io.File;
/**
@@ -53,7 +51,6 @@ public class DartFrameContext implements FrameContext
private final GroupingEngine groupingEngine;
private final DataSegmentProvider dataSegmentProvider;
private final WorkerContext workerContext;
- @Nullable
private final ResourceHolder<ProcessingBuffers> processingBuffers;
private final WorkerMemoryParameters memoryParameters;
private final WorkerStorageParameters storageParameters;
@@ -65,7 +62,7 @@ public class DartFrameContext implements FrameContext
final SegmentWrangler segmentWrangler,
final GroupingEngine groupingEngine,
final DataSegmentProvider dataSegmentProvider,
- @Nullable ResourceHolder<ProcessingBuffers> processingBuffers,
+ ResourceHolder<ProcessingBuffers> processingBuffers,
final WorkerMemoryParameters memoryParameters,
final WorkerStorageParameters storageParameters,
final DataServerQueryHandlerFactory dataServerQueryHandlerFactory
@@ -151,11 +148,7 @@ public class DartFrameContext implements FrameContext
@Override
public ProcessingBuffers processingBuffers()
{
- if (processingBuffers != null) {
- return processingBuffers.get();
- } else {
- throw new ISE("No processing buffers");
- }
+ return processingBuffers.get();
}
@Override
@@ -179,8 +172,6 @@ public class DartFrameContext implements FrameContext
@Override
public void close()
{
- if (processingBuffers != null) {
- processingBuffers.close();
- }
+ processingBuffers.close();
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartProcessingBuffersProvider.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartProcessingBuffersProvider.java
index e2a7b97c4c2..533f99073a2 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartProcessingBuffersProvider.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartProcessingBuffersProvider.java
@@ -23,11 +23,12 @@ import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.QueueNonBlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
-import org.apache.druid.error.DruidException;
import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.msq.exec.ProcessingBuffers;
import org.apache.druid.msq.exec.ProcessingBuffersProvider;
import org.apache.druid.msq.exec.ProcessingBuffersSet;
+import org.apache.druid.msq.indexing.error.CanceledFault;
+import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.utils.CloseableUtils;
import java.nio.ByteBuffer;
@@ -52,17 +53,15 @@ public class DartProcessingBuffersProvider implements
ProcessingBuffersProvider
}
@Override
- public ResourceHolder<ProcessingBuffersSet> acquire(final int poolSize)
+ public ResourceHolder<ProcessingBuffersSet> acquire(final int poolSize,
final long timeoutMillis)
{
if (poolSize == 0) {
return new ReferenceCountingResourceHolder<>(ProcessingBuffersSet.EMPTY,
() -> {});
}
- final List<ReferenceCountingResourceHolder<ByteBuffer>> batch =
mergeBufferPool.takeBatch(1, 0);
+ final List<ReferenceCountingResourceHolder<ByteBuffer>> batch =
mergeBufferPool.takeBatch(1, timeoutMillis);
if (batch.isEmpty()) {
- throw DruidException.forPersona(DruidException.Persona.USER)
- .ofCategory(DruidException.Category.RUNTIME_FAILURE)
- .build("No merge buffers available, cannot execute
query");
+ throw new MSQException(CanceledFault.timeout());
}
final ReferenceCountingResourceHolder<ByteBuffer> bufferHolder =
batch.get(0);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersProvider.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersProvider.java
index fb77d1c3078..b74383b3916 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersProvider.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersProvider.java
@@ -32,7 +32,7 @@ public interface ProcessingBuffersProvider
/**
* Acquire buffers for a {@link Worker}.
*/
- ResourceHolder<ProcessingBuffersSet> acquire(int poolSize);
+ ResourceHolder<ProcessingBuffersSet> acquire(int poolSize, long
timeoutMillis);
/**
* Acquire buffers for a {@link Worker}, using a pool size equal to the
minimum of
@@ -53,6 +53,6 @@ public interface ProcessingBuffersProvider
.count()
);
- return acquire(poolSize);
+ return acquire(poolSize, queryDef.getContext().getTimeout());
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersSet.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersSet.java
index 95f0335b71f..dc9fc620310 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersSet.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersSet.java
@@ -23,7 +23,6 @@ import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.error.DruidException;
import org.apache.druid.msq.kernel.StageDefinition;
-import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
@@ -59,13 +58,25 @@ public class ProcessingBuffersSet
);
}
- @Nullable
+ /**
+ * Acquire buffers if a particular stages needs them; otherwise, returns a
holder that throws an exception on
+ * {@link ResourceHolder#get()}.
+ */
public ResourceHolder<ProcessingBuffers> acquireForStage(final
StageDefinition stageDef)
{
if (!stageDef.getProcessorFactory().usesProcessingBuffers()) {
- return null;
+ return new NilResourceHolder<>();
+ } else {
+ return acquire();
}
+ }
+ /**
+ * Acquire buffers unconditionally. In production, it is expected that
callers will use
+ * {@link #acquireForStage(StageDefinition)}.
+ */
+ public ResourceHolder<ProcessingBuffers> acquire()
+ {
final ProcessingBuffers buffers = pool.poll();
if (buffers == null) {
@@ -89,4 +100,22 @@ public class ProcessingBuffersSet
}
};
}
+
+ /**
+ * Resource holder that throws an exception on {@link #get()}.
+ */
+ static class NilResourceHolder<T> implements ResourceHolder<T>
+ {
+ @Override
+ public T get()
+ {
+ throw DruidException.defensive("Unexpected call to get()");
+ }
+
+ @Override
+ public void close()
+ {
+ // Do nothing.
+ }
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
index f7e019c9aca..f8fa9b6f9e0 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
@@ -21,7 +21,6 @@ package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.collections.ResourceHolder;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.ProcessingBuffers;
@@ -38,7 +37,6 @@ import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.loading.DataSegmentPusher;
-import javax.annotation.Nullable;
import java.io.File;
public class IndexerFrameContext implements FrameContext
@@ -47,7 +45,6 @@ public class IndexerFrameContext implements FrameContext
private final IndexerWorkerContext context;
private final IndexIO indexIO;
private final DataSegmentProvider dataSegmentProvider;
- @Nullable
private final ResourceHolder<ProcessingBuffers> processingBuffers;
private final WorkerMemoryParameters memoryParameters;
private final WorkerStorageParameters storageParameters;
@@ -58,7 +55,7 @@ public class IndexerFrameContext implements FrameContext
IndexerWorkerContext context,
IndexIO indexIO,
DataSegmentProvider dataSegmentProvider,
- @Nullable ResourceHolder<ProcessingBuffers> processingBuffers,
+ ResourceHolder<ProcessingBuffers> processingBuffers,
DataServerQueryHandlerFactory dataServerQueryHandlerFactory,
WorkerMemoryParameters memoryParameters,
WorkerStorageParameters storageParameters
@@ -151,11 +148,7 @@ public class IndexerFrameContext implements FrameContext
@Override
public ProcessingBuffers processingBuffers()
{
- if (processingBuffers != null) {
- return processingBuffers.get();
- } else {
- throw new ISE("No processing buffers");
- }
+ return processingBuffers.get();
}
@Override
@@ -173,8 +166,6 @@ public class IndexerFrameContext implements FrameContext
@Override
public void close()
{
- if (processingBuffers != null) {
- processingBuffers.close();
- }
+ processingBuffers.close();
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerProcessingBuffersProvider.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerProcessingBuffersProvider.java
index dcf499c3f2f..52679462fe5 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerProcessingBuffersProvider.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerProcessingBuffersProvider.java
@@ -49,7 +49,7 @@ public class IndexerProcessingBuffersProvider implements
ProcessingBuffersProvid
}
@Override
- public ResourceHolder<ProcessingBuffersSet> acquire(int poolSize)
+ public ResourceHolder<ProcessingBuffersSet> acquire(int poolSize, final long
timeoutMillis)
{
if (poolSize == 0) {
return new ReferenceCountingResourceHolder<>(ProcessingBuffersSet.EMPTY,
() -> {});
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/PeonProcessingBuffersProvider.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/PeonProcessingBuffersProvider.java
index 264c7af112f..56a14b516cb 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/PeonProcessingBuffersProvider.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/PeonProcessingBuffersProvider.java
@@ -53,7 +53,7 @@ public class PeonProcessingBuffersProvider implements
ProcessingBuffersProvider
}
@Override
- public ResourceHolder<ProcessingBuffersSet> acquire(int poolSize)
+ public ResourceHolder<ProcessingBuffersSet> acquire(int poolSize, long
timeoutMillis)
{
if (poolSize == 0) {
return new ReferenceCountingResourceHolder<>(ProcessingBuffersSet.EMPTY,
() -> {});
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartProcessingBuffersProviderTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartProcessingBuffersProviderTest.java
new file mode 100644
index 00000000000..75afddf3fca
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartProcessingBuffersProviderTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.worker;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.collections.ReferenceCountingResourceHolder;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.msq.exec.ProcessingBuffers;
+import org.apache.druid.msq.exec.ProcessingBuffersSet;
+import org.apache.druid.msq.indexing.error.CanceledFault;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DartProcessingBuffersProviderTest
+{
+ private static final int PROCESSING_THREADS = 2;
+ private static final long TIMEOUT_MILLIS = 1000L;
+ private static final int BUFFER_SIZE = 1024;
+
+ @Mock
+ private BlockingPool<ByteBuffer> mockMergeBufferPool;
+
+ @Mock
+ private ReferenceCountingResourceHolder<ByteBuffer> mockBufferHolder;
+
+ private DartProcessingBuffersProvider provider;
+ private ByteBuffer testBuffer;
+
+ @Before
+ public void setUp()
+ {
+ provider = new DartProcessingBuffersProvider(mockMergeBufferPool,
PROCESSING_THREADS);
+ testBuffer = ByteBuffer.allocate(BUFFER_SIZE);
+ }
+
+ @Test
+ public void test_acquire_poolSizeZero()
+ {
+ final ResourceHolder<ProcessingBuffersSet> result = provider.acquire(0,
TIMEOUT_MILLIS);
+
+ Assert.assertNotNull(result);
+ Assert.assertEquals(ProcessingBuffersSet.EMPTY, result.get());
+
+ // Should be able to close without issues
+ result.close();
+ }
+
+ @Test
+ public void test_acquire_poolSizeTwo()
+ {
+ // Setup mock to return a buffer
+ when(mockBufferHolder.get()).thenReturn(testBuffer);
+ when(mockMergeBufferPool.takeBatch(eq(1), eq(TIMEOUT_MILLIS)))
+ .thenReturn(List.of(mockBufferHolder));
+
+ // Test successful acquisition
+ final int poolSize = 2;
+ final ResourceHolder<ProcessingBuffersSet> result =
provider.acquire(poolSize, TIMEOUT_MILLIS);
+
+ Assert.assertNotNull(result);
+ final ProcessingBuffersSet buffersSet = result.get();
+ Assert.assertNotNull(buffersSet);
+
+ // Verify we can acquire buffers from the set
+ for (int i = 0; i < poolSize; i++) {
+ final ResourceHolder<ProcessingBuffers> buffersHolder =
buffersSet.acquire();
+ Assert.assertNotNull(buffersHolder);
+
+ final ProcessingBuffers buffers = buffersHolder.get();
+ Assert.assertNotNull(buffers);
+ Assert.assertNotNull(buffers.getBufferPool());
+ Assert.assertNotNull(buffers.getBouncer());
+
+ // The bouncer should have the correct max count (PROCESSING_THREADS)
+ Assert.assertEquals(PROCESSING_THREADS,
buffers.getBouncer().getMaxCount());
+
+ // Verify that we can get processing threads number of buffers
+ final List<ResourceHolder<ByteBuffer>> resourceHolders = new
ArrayList<>();
+ for (int j = 0; j < PROCESSING_THREADS; j++) {
+ final ResourceHolder<ByteBuffer> bufferResource =
buffers.getBufferPool().take();
+ Assert.assertNotNull(bufferResource);
+ Assert.assertNotNull(bufferResource.get());
+ resourceHolders.add(bufferResource);
+ }
+
+ for (final ResourceHolder<ByteBuffer> resourceHolder : resourceHolders) {
+ resourceHolder.close();
+ }
+
+ buffersHolder.close(); // Return to pool
+ }
+
+ result.close();
+ }
+
+ @Test
+ public void test_acquire_timeout()
+ {
+ // Setup mock pool to return empty list (as happens during a timeout)
+ when(mockMergeBufferPool.takeBatch(eq(1), eq(TIMEOUT_MILLIS)))
+ .thenReturn(Collections.emptyList());
+
+ MSQException exception = Assert.assertThrows(
+ MSQException.class,
+ () -> provider.acquire(1, TIMEOUT_MILLIS)
+ );
+
+ Assert.assertTrue(exception.getFault() instanceof CanceledFault);
+ Assert.assertEquals("Canceled", exception.getFault().getErrorCode());
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ProcessingBuffersSetTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ProcessingBuffersSetTest.java
new file mode 100644
index 00000000000..7a03a8cfd8e
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ProcessingBuffersSetTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.msq.kernel.FrameProcessorFactory;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.utils.CloseableUtils;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class ProcessingBuffersSetTest
+{
+ @Test
+ public void test_empty_acquire()
+ {
+ final DruidException e = Assert.assertThrows(
+ DruidException.class,
+ ProcessingBuffersSet.EMPTY::acquire
+ );
+
+ Assert.assertEquals("Processing buffers not available", e.getMessage());
+ }
+
+ @Test
+ public void test_fromCollection() throws IOException
+ {
+ // Create byte buffers
+ final ByteBuffer buffer1 = ByteBuffer.allocate(1024);
+ final ByteBuffer buffer2 = ByteBuffer.allocate(1024);
+ final ByteBuffer buffer3 = ByteBuffer.allocate(1024);
+
+ final List<ByteBuffer> bufferList1 = ImmutableList.of(buffer1);
+ final List<ByteBuffer> bufferList2 = ImmutableList.of(buffer2);
+ final List<ByteBuffer> bufferList3 = ImmutableList.of(buffer3);
+
+ final List<List<ByteBuffer>> bufferLists = ImmutableList.of(bufferList1,
bufferList2, bufferList3);
+
+ final ProcessingBuffersSet buffersSet =
ProcessingBuffersSet.fromCollection(bufferLists);
+
+ // Should be able to acquire all three
+ final ResourceHolder<ProcessingBuffers> holder1 = buffersSet.acquire();
+ final ResourceHolder<ProcessingBuffers> holder2 = buffersSet.acquire();
+ final ResourceHolder<ProcessingBuffers> holder3 = buffersSet.acquire();
+
+ Assert.assertNotNull(holder1.get());
+ Assert.assertNotNull(holder2.get());
+ Assert.assertNotNull(holder3.get());
+
+ // Verify each has a buffer pool and bouncer
+ Assert.assertNotNull(holder1.get().getBufferPool());
+ Assert.assertNotNull(holder1.get().getBouncer());
+ Assert.assertNotNull(holder2.get().getBufferPool());
+ Assert.assertNotNull(holder2.get().getBouncer());
+ Assert.assertNotNull(holder3.get().getBufferPool());
+ Assert.assertNotNull(holder3.get().getBouncer());
+
+ // Clean up
+ CloseableUtils.closeAll(holder1, holder2, holder3);
+ }
+
+ @Test
+ public void test_nilResourceHolder()
+ {
+ final ProcessingBuffersSet.NilResourceHolder<Object> nilHolder = new
ProcessingBuffersSet.NilResourceHolder<>();
+
+ final DruidException e = Assert.assertThrows(
+ DruidException.class,
+ nilHolder::get
+ );
+
+ Assert.assertEquals("Unexpected call to get()", e.getMessage());
+
+ nilHolder.close(); // Should do nothing
+ }
+
+ @Test
+ public void test_acquireForStage_usesProcessingBuffersFalse()
+ {
+ // Create a mock StageDefinition and FrameProcessorFactory
+ final StageDefinition stageDef = Mockito.mock(StageDefinition.class);
+ final FrameProcessorFactory<?, ?, ?> processorFactory =
Mockito.mock(FrameProcessorFactory.class);
+
+ // Configure mocks: processor factory does not use processing buffers
+ Mockito.when(stageDef.getProcessorFactory()).thenReturn(processorFactory);
+ Mockito.when(processorFactory.usesProcessingBuffers()).thenReturn(false);
+
+ // Create a ProcessingBuffersSet
+ final ProcessingBuffersSet buffersSet =
+ ProcessingBuffersSet.fromCollection(
+ Collections.singletonList(
+ Collections.singletonList(ByteBuffer.allocate(1024))));
+
+ // Acquire for stage
+ final ResourceHolder<ProcessingBuffers> holder =
buffersSet.acquireForStage(stageDef);
+ MatcherAssert.assertThat(holder,
CoreMatchers.instanceOf(ProcessingBuffersSet.NilResourceHolder.class));
+ }
+
+ @Test
+ public void test_acquireForStage_usesProcessingBuffersTrue()
+ {
+ // Create a mock StageDefinition and FrameProcessorFactory
+ final StageDefinition stageDef = Mockito.mock(StageDefinition.class);
+ final FrameProcessorFactory<?, ?, ?> processorFactory =
Mockito.mock(FrameProcessorFactory.class);
+
+ // Configure mocks: processor factory does not use processing buffers
+ Mockito.when(stageDef.getProcessorFactory()).thenReturn(processorFactory);
+ Mockito.when(processorFactory.usesProcessingBuffers()).thenReturn(true);
+
+ // Create a ProcessingBuffersSet
+ final ProcessingBuffersSet buffersSet =
+ ProcessingBuffersSet.fromCollection(
+ Collections.singletonList(
+ Collections.singletonList(ByteBuffer.allocate(1024))));
+
+ // Acquire for stage
+ final ResourceHolder<ProcessingBuffers> holder =
buffersSet.acquireForStage(stageDef);
+ final ProcessingBuffers buffers = holder.get();
+ Assert.assertEquals(1024, buffers.getBufferPool().take().get().capacity());
+ Assert.assertThrows(NoSuchElementException.class, () ->
buffers.getBufferPool().take());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]