This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 31.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/31.0.0 by this push:
     new c1622be5272 [Backport] ScanQueryFrameProcessor: Close CursorHolders as 
we go along. (#17152) (#17168) (#17245)
c1622be5272 is described below

commit c1622be52727883af14ff4900e7db43b7a5937b5
Author: Kashif Faraz <kashif.fa...@gmail.com>
AuthorDate: Fri Oct 4 17:13:55 2024 +0530

    [Backport] ScanQueryFrameProcessor: Close CursorHolders as we go along. 
(#17152) (#17168) (#17245)
    
    * ScanQueryFrameProcessor: Close CursorHolders as we go along. (#17152)
    * fix issue with ScanQueryFrameProcessor cursor build not adjusting 
intervals (#17168)
    ---------
    Co-authored-by: Gian Merlino <gianmerl...@gmail.com>
    Co-authored-by: Clint Wylie <cwy...@apache.org>
---
 .../msq/querykit/scan/ScanQueryFrameProcessor.java |  49 +++++++---
 .../scan/ScanQueryFrameProcessorFactory.java       |   3 +-
 .../querykit/scan/ScanQueryFrameProcessorTest.java | 104 ++++++++++++++++++++-
 3 files changed, 141 insertions(+), 15 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index e5fa0a03d62..06dce22a189 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -40,6 +40,7 @@ import org.apache.druid.frame.write.FrameWriterFactory;
 import org.apache.druid.frame.write.InvalidFieldException;
 import org.apache.druid.frame.write.InvalidNullByteException;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.Unit;
 import org.apache.druid.java.util.common.guava.Sequence;
@@ -63,6 +64,8 @@ import org.apache.druid.query.Order;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.scan.ScanQueryEngine;
 import org.apache.druid.query.scan.ScanResultValue;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.spec.SpecificSegmentSpec;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.CompleteSegment;
 import org.apache.druid.segment.Cursor;
@@ -102,6 +105,7 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
   private final Closer closer = Closer.create();
 
   private Cursor cursor;
+  private Closeable cursorCloser;
   private Segment segment;
   private final SimpleSettableOffset cursorOffset = new 
SimpleAscendingOffset(Integer.MAX_VALUE);
   private FrameWriter frameWriter;
@@ -156,6 +160,7 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
   @Override
   public void cleanup() throws IOException
   {
+    closer.register(cursorCloser);
     closer.register(frameWriter);
     closer.register(super::cleanup);
     closer.close();
@@ -221,7 +226,7 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
         cursorYielder.close();
         return ReturnOrAwait.returnObject(handedOffSegments);
       } else {
-        final long rowsFlushed = setNextCursor(cursorYielder.get(), null);
+        final long rowsFlushed = setNextCursor(cursorYielder.get(), null, 
null);
         closer.register(cursorYielder);
         if (rowsFlushed > 0) {
           return ReturnOrAwait.runAgain();
@@ -256,16 +261,21 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
         );
       }
 
-      final CursorHolder cursorHolder = closer.register(
-          
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null))
-      );
-      final Cursor nextCursor = cursorHolder.asCursor();
+      final CursorHolder nextCursorHolder =
+          cursorFactory.makeCursorHolder(
+              ScanQueryEngine.makeCursorBuildSpec(
+                  query.withQuerySegmentSpec(new 
SpecificSegmentSpec(segment.getDescriptor())),
+                  null
+              )
+          );
+      final Cursor nextCursor = nextCursorHolder.asCursor();
 
       if (nextCursor == null) {
         // No cursors!
+        nextCursorHolder.close();
         return ReturnOrAwait.returnObject(Unit.instance());
       } else {
-        final long rowsFlushed = setNextCursor(nextCursor, 
segmentHolder.get().getSegment());
+        final long rowsFlushed = setNextCursor(nextCursor, nextCursorHolder, 
segmentHolder.get().getSegment());
         assert rowsFlushed == 0; // There's only ever one cursor when running 
with a segment
       }
     }
@@ -302,16 +312,21 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
           );
         }
 
-        final CursorHolder cursorHolder = closer.register(
-            
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null))
-        );
-        final Cursor nextCursor = cursorHolder.asCursor();
+        final CursorHolder nextCursorHolder =
+            cursorFactory.makeCursorHolder(
+                ScanQueryEngine.makeCursorBuildSpec(
+                    query.withQuerySegmentSpec(new 
MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)),
+                    null
+                )
+            );
+        final Cursor nextCursor = nextCursorHolder.asCursor();
 
         if (nextCursor == null) {
           // no cursor
+          nextCursorHolder.close();
           return ReturnOrAwait.returnObject(Unit.instance());
         }
-        final long rowsFlushed = setNextCursor(nextCursor, frameSegment);
+        final long rowsFlushed = setNextCursor(nextCursor, nextCursorHolder, 
frameSegment);
 
         if (rowsFlushed > 0) {
           return ReturnOrAwait.runAgain();
@@ -415,10 +430,20 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
     }
   }
 
-  private long setNextCursor(final Cursor cursor, final Segment segment) 
throws IOException
+  private long setNextCursor(
+      final Cursor cursor,
+      @Nullable final Closeable cursorCloser,
+      final Segment segment
+  ) throws IOException
   {
     final long rowsFlushed = flushFrameWriter();
+    if (this.cursorCloser != null) {
+      // Close here, don't add to the processor-level Closer, to avoid leaking 
CursorHolders. We may generate many
+      // CursorHolders per instance of this processor, and we need to close 
them as we go, not all at the end.
+      this.cursorCloser.close();
+    }
     this.cursor = cursor;
+    this.cursorCloser = cursorCloser;
     this.segment = segment;
     this.cursorOffset.reset();
     return rowsFlushed;
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
index 97ade19f5bc..1636e117740 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
@@ -56,8 +56,7 @@ public class ScanQueryFrameProcessorFactory extends 
BaseLeafFrameProcessorFactor
   {
     super(query);
     this.query = Preconditions.checkNotNull(query, "query");
-    this.runningCountForLimit =
-        query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong() 
: null;
+    this.runningCountForLimit = query.isLimited() && 
query.getOrderBys().isEmpty() ? new AtomicLong() : null;
   }
 
   @JsonProperty
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
index bfb511f949f..af0a7203570 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
@@ -19,9 +19,11 @@
 
 package org.apache.druid.msq.querykit.scan;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.collections.ReferenceCountingResourceHolder;
 import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.collections.StupidResourceHolder;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.FrameType;
 import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
@@ -39,6 +41,8 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Unit;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.input.table.RichSegmentDescriptor;
+import org.apache.druid.msq.input.table.SegmentWithDescriptor;
 import org.apache.druid.msq.kernel.StageId;
 import org.apache.druid.msq.kernel.StagePartition;
 import org.apache.druid.msq.querykit.FrameProcessorTestBase;
@@ -46,10 +50,15 @@ import org.apache.druid.msq.test.LimitedFrameWriterFactory;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.CompleteSegment;
 import org.apache.druid.segment.CursorFactory;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexCursorFactory;
+import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.TestIndex;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
+import org.apache.druid.timeline.SegmentId;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -60,6 +69,91 @@ import java.util.function.Function;
 
 public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase
 {
+
+  @Test
+  public void test_runWithSegments() throws Exception
+  {
+    final QueryableIndex queryableIndex = TestIndex.getMMappedTestIndex();
+
+    final CursorFactory cursorFactory =
+        new QueryableIndexCursorFactory(queryableIndex);
+
+    // put funny intervals on query to ensure it is adjusted to the segment 
interval before building cursor
+    final ScanQuery query =
+        Druids.newScanQueryBuilder()
+              .dataSource("test")
+              .intervals(
+                  new MultipleIntervalSegmentSpec(
+                      ImmutableList.of(
+                          Intervals.of("2001-01-01T00Z/2011-01-01T00Z"),
+                          Intervals.of("2011-01-02T00Z/2021-01-01T00Z")
+                      )
+                  )
+              )
+              .columns(cursorFactory.getRowSignature().getColumnNames())
+              .build();
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+
+    // Limit output frames to 1 row to ensure we test edge cases
+    final FrameWriterFactory frameWriterFactory = new 
LimitedFrameWriterFactory(
+        FrameWriters.makeRowBasedFrameWriterFactory(
+            new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+            cursorFactory.getRowSignature(),
+            Collections.emptyList(),
+            false
+        ),
+        1
+    );
+
+    final ScanQueryFrameProcessor processor = new ScanQueryFrameProcessor(
+        query,
+        null,
+        new DefaultObjectMapper(),
+        ReadableInput.segment(
+            new SegmentWithDescriptor(
+                () -> new StupidResourceHolder<>(new CompleteSegment(null, new 
QueryableIndexSegment(queryableIndex, SegmentId.dummy("test")))),
+                new RichSegmentDescriptor(queryableIndex.getDataInterval(), 
queryableIndex.getDataInterval(), "dummy_version", 0)
+            )
+        ),
+        Function.identity(),
+        new ResourceHolder<WritableFrameChannel>()
+        {
+          @Override
+          public WritableFrameChannel get()
+          {
+            return outputChannel.writable();
+          }
+
+          @Override
+          public void close()
+          {
+            try {
+              outputChannel.writable().close();
+            }
+            catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        },
+        new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {})
+    );
+
+    ListenableFuture<Object> retVal = exec.runFully(processor, null);
+
+    final Sequence<List<Object>> rowsFromProcessor = 
FrameTestUtil.readRowsFromFrameChannel(
+        outputChannel.readable(),
+        FrameReader.create(cursorFactory.getRowSignature())
+    );
+
+    FrameTestUtil.assertRowsEqual(
+        FrameTestUtil.readRowsFromCursorFactory(cursorFactory, 
cursorFactory.getRowSignature(), false),
+        rowsFromProcessor
+    );
+
+    Assert.assertEquals(Unit.instance(), retVal.get());
+  }
+
   @Test
   public void test_runWithInputChannel() throws Exception
   {
@@ -83,10 +177,18 @@ public class ScanQueryFrameProcessorTest extends 
FrameProcessorTestBase
       }
     }
 
+    // put funny intervals on query to ensure it is adjusted to the segment 
interval before building cursor
     final ScanQuery query =
         Druids.newScanQueryBuilder()
               .dataSource("test")
-              .intervals(new 
MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
+              .intervals(
+                  new MultipleIntervalSegmentSpec(
+                      ImmutableList.of(
+                          Intervals.of("2001-01-01T00Z/2011-01-01T00Z"),
+                          Intervals.of("2011-01-02T00Z/2021-01-01T00Z")
+                      )
+                  )
+              )
               .columns(cursorFactory.getRowSignature().getColumnNames())
               .build();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to