This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 97f9412b4e8 MSQ: Fix excessive retention of writable stage output 
channels. (#18322)
97f9412b4e8 is described below

commit 97f9412b4e8b1c43b689c2b74b4836ef419ccb0b
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Jul 24 20:06:52 2025 -0700

    MSQ: Fix excessive retention of writable stage output channels. (#18322)
    
    Fixes a regression from #18144. The refactoring in that patch lost some
    logic that ensured stage output channels were stored in read-only form.
    This is important, because the writable form includes a 1MB frame memory
    allocation buffer. It can add up to a lot of memory if retained across
    lots of channels.
    
    This patch simplifies things by unconditionally converting all
    stage outputs to read-only before they are retained, by replacing the
    channel in "stageOutputChannels.add(channel)" with "channel.readOnly()".
    It also simplifies various other bits of code that deal with intermediate
    output channels, by converting them to read-only in the constructors of
    ProcessorsAndChannels and ResultAndChannels, rather than at only some call
    sites.
---
 .../org/apache/druid/msq/exec/RunWorkOrder.java    |  3 +-
 .../druid/msq/exec/std/ProcessorsAndChannels.java  |  2 +-
 .../druid/msq/exec/std/ResultAndChannels.java      |  2 +-
 .../msq/exec/std/StandardShuffleOperations.java    |  2 +-
 .../druid/msq/querykit/BaseLeafStageProcessor.java |  2 +-
 .../WindowOperatorQueryStageProcessor.java         |  2 +-
 .../querykit/common/OffsetLimitStageProcessor.java |  2 +-
 .../groupby/GroupByPostShuffleStageProcessor.java  |  2 +-
 .../results/QueryResultStageProcessor.java         |  2 +-
 .../druid/frame/processor/OutputChannel.java       | 65 ++++++++++++++--------
 .../druid/frame/processor/OutputChannels.java      | 29 ++++++----
 .../channel/ComposingWritableFrameChannelTest.java | 14 ++---
 .../frame/processor/OutputChannelFactoryTest.java  |  3 +-
 .../druid/frame/processor/OutputChannelTest.java   | 11 ++--
 .../druid/frame/processor/OutputChannelsTest.java  | 15 +++--
 15 files changed, 91 insertions(+), 65 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index c064366ccde..3c8d772ad82 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -327,7 +327,6 @@ public class RunWorkOrder
                 
stageOutputChannels.sort(Comparator.comparing(OutputChannel::getPartitionNumber));
                 outputChannels = OutputChannels.wrap(
                     stageOutputChannels.stream()
-                                       .map(OutputChannel::readOnly)
                                        
.sorted(Comparator.comparing(OutputChannel::getPartitionNumber))
                                        .collect(Collectors.toList()));
                 stageOutputChannels.clear();
@@ -493,7 +492,7 @@ public class RunWorkOrder
         outputChannelFactory,
         channel -> {
           synchronized (stageOutputChannels) {
-            stageOutputChannels.add(channel);
+            stageOutputChannels.add(channel.readOnly());
           }
         }
     );
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/ProcessorsAndChannels.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/ProcessorsAndChannels.java
index 5c5b0aa70ee..5c6dd378950 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/ProcessorsAndChannels.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/ProcessorsAndChannels.java
@@ -41,7 +41,7 @@ public class ProcessorsAndChannels<T, R>
   )
   {
     this.processorManager = processorManager;
-    this.outputChannels = outputChannels;
+    this.outputChannels = outputChannels.readOnly();
   }
 
   public ProcessorManager<T, R> getProcessorManager()
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/ResultAndChannels.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/ResultAndChannels.java
index b0d93041a1b..38dfc20914c 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/ResultAndChannels.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/ResultAndChannels.java
@@ -42,7 +42,7 @@ public class ResultAndChannels<T>
   )
   {
     this.result = result;
-    this.outputChannels = outputChannels;
+    this.outputChannels = outputChannels.readOnly();
   }
 
   /**
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java
index 809b96e05bc..d46f30be00e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java
@@ -523,7 +523,7 @@ public class StandardShuffleOperations
         ),
         newResultAndChannels -> new ResultAndChannels<>(
             newResultAndChannels.resultFuture(),
-            newResultAndChannels.outputChannels().readOnly()
+            newResultAndChannels.outputChannels()
         )
     );
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
index 83a3829a564..6d53b9d90af 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
@@ -176,7 +176,7 @@ public abstract class BaseLeafStageProcessor extends 
BasicStandardStageProcessor
     }
 
     //noinspection unchecked,rawtypes
-    return new ProcessorsAndChannels<>(processorManager, 
OutputChannels.wrapReadOnly(outputChannels));
+    return new ProcessorsAndChannels<>(processorManager, 
OutputChannels.wrap(outputChannels));
   }
 
   private ProcessorManager<Object, Long> 
createBaseLeafProcessorManagerWithHandoff(
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryStageProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryStageProcessor.java
index 85904a9ec48..10a4e8187e1 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryStageProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryStageProcessor.java
@@ -172,7 +172,7 @@ public class WindowOperatorQueryStageProcessor extends 
BasicStandardStageProcess
 
     return new ProcessorsAndChannels<>(
         ProcessorManagers.of(processors),
-        
OutputChannels.wrapReadOnly(ImmutableList.copyOf(outputChannels.values()))
+        OutputChannels.wrap(ImmutableList.copyOf(outputChannels.values()))
     );
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/OffsetLimitStageProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/OffsetLimitStageProcessor.java
index 3f8985ef289..18b1198264e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/OffsetLimitStageProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/OffsetLimitStageProcessor.java
@@ -135,7 +135,7 @@ public class OffsetLimitStageProcessor extends 
BasicStandardStageProcessor
 
     return new ProcessorsAndChannels<>(
         ProcessorManagers.of(workerSupplier),
-        OutputChannels.wrapReadOnly(Collections.singletonList(outputChannel))
+        OutputChannels.wrap(Collections.singletonList(outputChannel))
     );
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java
index 6271db56566..6d0441e34d8 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java
@@ -128,7 +128,7 @@ public class GroupByPostShuffleStageProcessor extends 
BasicStandardStageProcesso
 
     return new ProcessorsAndChannels<>(
         ProcessorManagers.of(processors),
-        
OutputChannels.wrapReadOnly(ImmutableList.copyOf(outputChannels.values()))
+        OutputChannels.wrap(ImmutableList.copyOf(outputChannels.values()))
     );
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/QueryResultStageProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/QueryResultStageProcessor.java
index 8a4ecd1bdd2..d56958953d9 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/QueryResultStageProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/QueryResultStageProcessor.java
@@ -111,7 +111,7 @@ public class QueryResultStageProcessor extends 
BasicStandardStageProcessor
 
     return new ProcessorsAndChannels<>(
         ProcessorManagers.of(processors),
-        
OutputChannels.wrapReadOnly(ImmutableList.copyOf(outputChannels.values()))
+        OutputChannels.wrap(ImmutableList.copyOf(outputChannels.values()))
     );
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java 
b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java
index 9f309f924e1..09fae34bb07 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java
@@ -21,14 +21,13 @@ package org.apache.druid.frame.processor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Suppliers;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.allocation.MemoryAllocator;
 import org.apache.druid.frame.channel.FrameWithPartition;
 import org.apache.druid.frame.channel.ReadableFrameChannel;
 import org.apache.druid.frame.channel.ReadableNilFrameChannel;
 import org.apache.druid.frame.channel.WritableFrameChannel;
 import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.ISE;
 
 import javax.annotation.Nullable;
 import java.util.function.Function;
@@ -43,13 +42,11 @@ import java.util.function.Supplier;
  */
 public class OutputChannel
 {
-  @GuardedBy("this")
   @Nullable
-  private WritableFrameChannel writableChannel;
+  private volatile WritableFrameChannel writableChannel;
 
-  @GuardedBy("this")
   @Nullable
-  private MemoryAllocator frameMemoryAllocator;
+  private volatile MemoryAllocator frameMemoryAllocator;
 
   private final Supplier<ReadableFrameChannel> readableChannelSupplier;
   private final int partitionNumber;
@@ -160,13 +157,18 @@ public class OutputChannel
    * Returns the writable channel of this pair. The producer writes to this 
channel. Throws ISE if the output channel is
    * read only.
    */
-  public synchronized WritableFrameChannel getWritableChannel()
+  public WritableFrameChannel getWritableChannel()
   {
-    if (writableChannel == null) {
-      throw new ISE("Writable channel is not available. The output channel 
might be marked as read-only,"
-                    + " hence no writes are allowed.");
+    // Store into a local since the field is volatile.
+    final WritableFrameChannel theWritableChannel = writableChannel;
+
+    if (theWritableChannel == null) {
+      throw DruidException.defensive(
+          "Writable channel is not available. "
+          + "The output channel might be marked as read-only, hence no writes 
are allowed."
+      );
     } else {
-      return writableChannel;
+      return theWritableChannel;
     }
   }
 
@@ -174,13 +176,18 @@ public class OutputChannel
    * Returns the memory allocator for the writable channel. The producer uses 
this to generate frames for the channel.
    * Throws ISE if the output channel is read only.
    */
-  public synchronized MemoryAllocator getFrameMemoryAllocator()
+  public MemoryAllocator getFrameMemoryAllocator()
   {
-    if (frameMemoryAllocator == null) {
-      throw new ISE("Frame allocator is not available. The output channel 
might be marked as read-only,"
-                    + " hence memory allocator is not required.");
+    // Store into a local since the field is volatile.
+    final MemoryAllocator theFrameMemoryAllocator = frameMemoryAllocator;
+
+    if (theFrameMemoryAllocator == null) {
+      throw DruidException.defensive(
+          "Frame memory allocator is not available. "
+          + "The output channel might be marked as read-only, hence no writes 
are allowed."
+      );
     } else {
-      return frameMemoryAllocator;
+      return theFrameMemoryAllocator;
     }
   }
 
@@ -203,14 +210,18 @@ public class OutputChannel
     return partitionNumber;
   }
 
-  public synchronized OutputChannel mapWritableChannel(final 
Function<WritableFrameChannel, WritableFrameChannel> mapFn)
+  public OutputChannel mapWritableChannel(final Function<WritableFrameChannel, 
WritableFrameChannel> mapFn)
   {
-    if (writableChannel == null) {
+    // Store into locals since the fields are volatile.
+    final WritableFrameChannel theWritableChannel = writableChannel;
+    final MemoryAllocator theFrameMemoryAllocator = frameMemoryAllocator;
+
+    if (theWritableChannel == null || theFrameMemoryAllocator == null) {
       return this;
     } else {
       return new OutputChannel(
-          mapFn.apply(writableChannel),
-          frameMemoryAllocator,
+          mapFn.apply(theWritableChannel),
+          theFrameMemoryAllocator,
           readableChannelSupplier,
           partitionNumber
       );
@@ -220,17 +231,27 @@ public class OutputChannel
   /**
    * Returns a read-only version of this instance. Read-only versions have 
neither {@link #getWritableChannel()} nor
    * {@link #getFrameMemoryAllocator()}, and therefore require substantially 
less memory.
+   *
+   * Returns the same instance if it is already read-only.
    */
   public OutputChannel readOnly()
   {
-    return OutputChannel.readOnly(readableChannelSupplier, partitionNumber);
+    return isReadOnly() ? this : 
OutputChannel.readOnly(readableChannelSupplier, partitionNumber);
+  }
+
+  /**
+   * Returns whether this instance is read-only (has no writable channel).
+   */
+  public boolean isReadOnly()
+  {
+    return writableChannel == null;
   }
 
   /**
    * Removes the reference to the {@link #writableChannel} and {@link 
#frameMemoryAllocator} from the object, making
    * it more efficient
    */
-  public synchronized void convertToReadOnly()
+  public void convertToReadOnly()
   {
     this.writableChannel = null;
     this.frameMemoryAllocator = null;
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannels.java 
b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannels.java
index 76aba9228a7..ec56d7614ba 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannels.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannels.java
@@ -66,15 +66,6 @@ public class OutputChannels
     return new OutputChannels(outputChannels);
   }
 
-  /**
-   * Creates an instance wrapping read-only versions (see {@link 
OutputChannel#readOnly()}) of all the
-   * provided channels.
-   */
-  public static OutputChannels wrapReadOnly(final List<OutputChannel> 
outputChannels)
-  {
-    return new 
OutputChannels(outputChannels.stream().map(OutputChannel::readOnly).collect(Collectors.toList()));
-  }
-
   /**
    * Verifies there is exactly one channel per partition.
    */
@@ -139,6 +130,24 @@ public class OutputChannels
    */
   public OutputChannels readOnly()
   {
-    return wrapReadOnly(outputChannels);
+    if (isReadOnly()) {
+      return this;
+    } else {
+      return new 
OutputChannels(outputChannels.stream().map(OutputChannel::readOnly).collect(Collectors.toList()));
+    }
+  }
+
+  /**
+   * Returns whether this instance is read-only (all channels have {@link 
OutputChannel#isReadOnly()}).
+   */
+  public boolean isReadOnly()
+  {
+    for (final OutputChannel channel : outputChannels) {
+      if (!channel.isReadOnly()) {
+        return false;
+      }
+    }
+
+    return true;
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
 
b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
index 9c59cdc8a33..abcb9f03cfb 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
@@ -22,10 +22,10 @@ package org.apache.druid.frame.channel;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
 import org.apache.druid.frame.processor.OutputChannel;
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.ResourceLimitExceededException;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.MatcherAssert;
@@ -95,22 +95,20 @@ public class ComposingWritableFrameChannelTest
 
 
     // Test if the older channel has been converted to read only
-    Assert.assertThrows(ISE.class, outputChannel1::getWritableChannel);
+    Assert.assertThrows(DruidException.class, 
outputChannel1::getWritableChannel);
     composingWritableFrameChannel.close();
 
-    Exception ise1 = Assert.assertThrows(IllegalStateException.class, () -> 
outputChannel1.getFrameMemoryAllocator());
+    Exception ise1 = Assert.assertThrows(DruidException.class, 
outputChannel1::getFrameMemoryAllocator);
     MatcherAssert.assertThat(
         ise1,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
-            "Frame allocator is not available. The output channel might be 
marked as read-only, hence memory allocator is not required."))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Frame 
memory allocator is not available."))
     );
 
 
-    Exception ise2 = Assert.assertThrows(IllegalStateException.class, () -> 
outputChannel2.getFrameMemoryAllocator());
+    Exception ise2 = Assert.assertThrows(DruidException.class, 
outputChannel2::getFrameMemoryAllocator);
     MatcherAssert.assertThat(
         ise2,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
-            "Frame allocator is not available. The output channel might be 
marked as read-only, hence memory allocator is not required."))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Frame 
memory allocator is not available."))
     );
 
   }
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelFactoryTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelFactoryTest.java
index c28e98d1ad4..0e472cd29fc 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelFactoryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelFactoryTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.frame.processor;
 
 import com.google.common.collect.Iterables;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.FrameType;
 import org.apache.druid.frame.channel.FrameWithPartition;
@@ -177,6 +178,6 @@ public abstract class OutputChannelFactoryTest extends 
InitializedNullHandlingTe
 
     Assert.assertEquals(1, channel.getPartitionNumber());
     Assert.assertTrue(channel.getReadableChannel().isFinished());
-    Assert.assertThrows(IllegalStateException.class, 
channel::getWritableChannel);
+    Assert.assertThrows(DruidException.class, channel::getWritableChannel);
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java
index 438fea0fdfa..3abc6b89215 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.frame.processor;
 
+import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.allocation.HeapMemoryAllocator;
 import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
 import org.apache.druid.frame.channel.WritableFrameFileChannel;
@@ -39,19 +40,17 @@ public class OutputChannelTest
     Assert.assertTrue(channel.getReadableChannel().isFinished());
 
     // No writable channel: cannot call getWritableChannel.
-    final IllegalStateException e1 = 
Assert.assertThrows(IllegalStateException.class, channel::getWritableChannel);
+    final DruidException e1 = Assert.assertThrows(DruidException.class, 
channel::getWritableChannel);
     MatcherAssert.assertThat(
         e1,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
-            "Writable channel is not available. The output channel might be 
marked as read-only, hence no writes are allowed."))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Writable 
channel is not available."))
     );
 
     // No writable channel: cannot call getFrameMemoryAllocator.
-    final IllegalStateException e2 = 
Assert.assertThrows(IllegalStateException.class, 
channel::getFrameMemoryAllocator);
+    final DruidException e2 = Assert.assertThrows(DruidException.class, 
channel::getFrameMemoryAllocator);
     MatcherAssert.assertThat(
         e2,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
-            "Frame allocator is not available. The output channel might be 
marked as read-only, hence memory allocator is not required."))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Frame 
memory allocator is not available."))
     );
 
     // Mapping the writable channel of a nil channel has no effect, because 
there is no writable channel.
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
index 6b79045296d..0086a378f1c 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.allocation.HeapMemoryAllocator;
 import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
 import org.hamcrest.CoreMatchers;
@@ -76,26 +77,24 @@ public class OutputChannelsTest
     Assert.assertEquals(1, readOnlyChannels.getAllChannels().size());
     Assert.assertEquals(1, channels.getChannelsForPartition(1).size());
 
-    final IllegalStateException e = Assert.assertThrows(
-        IllegalStateException.class,
+    final DruidException e = Assert.assertThrows(
+        DruidException.class,
         () -> 
Iterables.getOnlyElement(readOnlyChannels.getAllChannels()).getWritableChannel()
     );
 
     MatcherAssert.assertThat(
         e,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
-            "Writable channel is not available. The output channel might be 
marked as read-only, hence no writes are allowed."))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Writable 
channel is not available."))
     );
 
-    final IllegalStateException e2 = Assert.assertThrows(
-        IllegalStateException.class,
+    final DruidException e2 = Assert.assertThrows(
+        DruidException.class,
         () -> 
Iterables.getOnlyElement(readOnlyChannels.getAllChannels()).getFrameMemoryAllocator()
     );
 
     MatcherAssert.assertThat(
         e2,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
-            "Frame allocator is not available. The output channel might be 
marked as read-only, hence memory allocator is not required."))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith("Frame 
memory allocator is not available."))
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to