This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e3128e3fa3 Poison stupid pool (#12646)
e3128e3fa3 is described below

commit e3128e3fa322963b902ebecbdec60c740f55dd62
Author: imply-cheddar <[email protected]>
AuthorDate: Mon Jul 4 06:36:22 2022 +0900

    Poison stupid pool (#12646)
    
    * Poison StupidPool and fix resource leaks
    
    There are various resource leaks from test setup as well as some
    corners in query processing.  We poison the StupidPool to start failing
    tests when the leaks come and fix any issues uncovered from that so
    that we can start from a clean baseline.
    
    Unfortunately, because of how poisoning works,
    we can only fail future checkouts from the same pool,
    which means that there is a natural race between a
    leak happening -> GC occurs -> leak detected -> pool poisoned.
    
    This race means that, depending on interleaving of tests,
    if the very last time that an object is checked out
    from the pool leaks, then it won't get caught.
    At some point in the future, something will catch it,
     however and from that point on it will be deterministic.
    
    * Remove various things left over from iterations
    
    * Clean up FilterAnalysis and add javadoc on StupidPool
    
    * Revert changes to .idea/misc.xml that accidentally got pushed
    
    * Style and test branches
    
    * Stylistic woes
---
 .idea/mavenProjectSettings.xml                     |  10 -
 benchmarks/pom.xml                                 |   6 -
 .../ExpressionVectorSelectorBenchmark.java         |   3 +-
 core/pom.xml                                       |   6 -
 .../org/apache/druid/collections/StupidPool.java   |  74 ++++++-
 .../org/apache/druid/StupidPoolPoisonedTest.java   |  30 +--
 .../apache/druid/collections/StupidPoolTest.java   |  29 +--
 .../common/task/CompactionTaskRunTest.java         | 220 ++++++++++++++-------
 .../SeekableStreamIndexTaskTestBase.java           |  17 +-
 pom.xml                                            |   6 +-
 processing/pom.xml                                 |  19 --
 .../apache/druid/query/filter/FilterTuning.java    |   2 +-
 .../apache/druid/query/search/AutoStrategy.java    |   7 +-
 .../druid/query/search/UseIndexesStrategy.java     |   5 +-
 .../query/timeseries/TimeseriesQueryEngine.java    |   2 +-
 .../org/apache/druid/segment/AbstractIndex.java    |  60 +++---
 .../java/org/apache/druid/segment/ColumnCache.java | 119 +++++++++++
 .../org/apache/druid/segment/ColumnSelector.java   |   4 +
 .../org/apache/druid/segment/CompressedPools.java  |   9 +-
 .../DeprecatedQueryableIndexColumnSelector.java    |  66 +++++++
 .../org/apache/druid/segment/FilterAnalysis.java   | 149 ++++++++++++++
 .../java/org/apache/druid/segment/IndexIO.java     |  55 +++---
 .../org/apache/druid/segment/QueryableIndex.java   |  20 +-
 .../QueryableIndexColumnSelectorFactory.java       |  84 +++-----
 .../QueryableIndexCursorSequenceBuilder.java       | 110 ++++++-----
 .../segment/QueryableIndexIndexableAdapter.java    |  11 +-
 .../segment/QueryableIndexStorageAdapter.java      | 156 ++-------------
 .../join/table/BroadcastSegmentIndexedTable.java   |   6 +-
 .../QueryableIndexVectorColumnSelectorFactory.java |  42 ++--
 .../org/apache/druid/StupidPoolPoisonedTest.java}  |  30 +--
 .../druid/segment/MergingRowIteratorTest.java      |  63 +++---
 .../segment/data/CompressedDoublesSerdeTest.java   |  22 +--
 .../segment/data/CompressedFloatsSerdeTest.java    |  30 +--
 .../data/CompressedLongsAutoEncodingSerdeTest.java |   7 +-
 .../segment/data/CompressedLongsSerdeTest.java     |  30 +--
 ...mpressedVSizeColumnarMultiIntsSupplierTest.java | 103 +++-------
 ...sedVSizeColumnarMultiIntsSupplierTestBase.java} |  91 +++++----
 .../druid/segment/data/TestColumnCompression.java  |   3 +-
 ...mpressedVSizeColumnarMultiIntsSupplierTest.java |  47 +++--
 .../druid/segment/filter/FilterPartitionTest.java  |  31 +--
 .../segment/virtual/ExpressionSelectorsTest.java   |  32 +--
 .../virtual/ExpressionVectorSelectorsTest.java     |   3 +-
 42 files changed, 1036 insertions(+), 783 deletions(-)

diff --git a/.idea/mavenProjectSettings.xml b/.idea/mavenProjectSettings.xml
deleted file mode 100644
index 1eb1f057ae..0000000000
--- a/.idea/mavenProjectSettings.xml
+++ /dev/null
@@ -1,10 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project version="4">
-  <component name="MavenProjectSettings">
-    <option name="testRunningSettings">
-      <MavenTestRunningSettings>
-        <option name="passArgLine" value="false" />  <!-- see 
https://github.com/apache/incubator-druid/pull/8526 -->
-      </MavenTestRunningSettings>
-    </option>
-  </component>
-</project>
diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 869c664270..5ce05d9796 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -281,12 +281,6 @@
         </plugin>
         <plugin>
           <artifactId>maven-surefire-plugin</artifactId>
-          <configuration>
-            <argLine>
-              @{jacocoArgLine}
-              ${jdk.surefire.argLine}
-            </argLine>
-          </configuration>
         </plugin>
       </plugins>
     </pluginManagement>
diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionVectorSelectorBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionVectorSelectorBenchmark.java
index a262aea7ed..754eb7b6d3 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionVectorSelectorBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionVectorSelectorBenchmark.java
@@ -30,6 +30,7 @@ import org.apache.druid.math.expr.ExprType;
 import org.apache.druid.math.expr.ExpressionType;
 import org.apache.druid.math.expr.Parser;
 import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.ColumnCache;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.QueryableIndex;
@@ -124,7 +125,7 @@ public class ExpressionVectorSelectorBenchmark
     );
 
     Expr parsed = Parser.parse(expression, ExprMacroTable.nil());
-    outputType = parsed.getOutputType(index);
+    outputType = parsed.getOutputType(new ColumnCache(index, closer));
     checkSanity();
   }
 
diff --git a/core/pom.xml b/core/pom.xml
index 0b12558e24..831f73c173 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -430,12 +430,6 @@
         <configuration>
           <!-- use normal classpath instead of manifest jar for 
JvmUtilsTest.testSystemClassPath -->
           <useManifestOnlyJar>false</useManifestOnlyJar>
-          <argLine>
-            @{jacocoArgLine}
-            ${jdk.surefire.argLine}
-            
-Djava.library.path=${project.build.directory}/hyperic-sigar-${sigar.base.version}/sigar-bin/lib/
-            -Duser.language=en
-          </argLine>
         </configuration>
       </plugin>
     </plugins>
diff --git a/core/src/main/java/org/apache/druid/collections/StupidPool.java 
b/core/src/main/java/org/apache/druid/collections/StupidPool.java
index d1d6a9b9b7..9757dc6a27 100644
--- a/core/src/main/java/org/apache/druid/collections/StupidPool.java
+++ b/core/src/main/java/org/apache/druid/collections/StupidPool.java
@@ -40,6 +40,45 @@ public class StupidPool<T> implements NonBlockingPool<T>
 {
   private static final Logger log = new Logger(StupidPool.class);
 
+
+  /**
+   * We add the ability to poison all StupidPools in order to catch resource 
leaks and fail them during tests.
+   * <p>
+   * StupidPool already has a mechanism by which it will log resource leaks 
(ResourceHolder objects that are not
+   * closed), over time, we've built up a test suite that contains lots of 
those logs and generally they get swept
+   * away to a perrenial Priority #2.  This is not a good state as the 
justification is usually that the logs are
+   * coming from test harness, the production code is obviously good.  Anyway, 
we need tests to actually fail if there
+   * are leaks like this so that the tests and the code can be improved.  
Catching leaks is hard, though, because it
+   * either requires reference counting and all tests sites to check the 
counts, or it requires catching objects being
+   * GC'd, which is asynchronous.  We opt for this latter approach.
+   * <p>
+   * Specifically, when poisoned, the StupidPool will
+   * 1) Maintain an exception (i.e. stack trace) object from each time that a 
resource holder is checked out
+   * 2) If the ResourceHolder is GCd without being closed, the exception 
object will be registered back with the
+   * stupid pool
+   * 3) If an exception is registered with the StupidPool, then any attempt to 
take an object from that Pool will have
+   * the exception thrown instead.
+   * <p>
+   * This means that we have a delayed reaction to the leak, in that the 
object must first be GCd before we can
+   * identify the leak.  *Also* it means that the test that failed is not 
actually the test that leaked the object,
+   * instead, developers must look at the stacktrace thrown to see which test 
actually checked out the object and did
+   * not return it.  Additionally, it means that one test run can only 
discover a single leak (as once the pool is
+   * poisoned, it will return the same exception constantly).  So, if there is 
some leaky code, it will likely require
+   * multiple test runs to actually whack-a-mole all of the sources of the 
leaks.
+   */
+  private static final AtomicBoolean POISONED = new AtomicBoolean(false);
+
+  static {
+    if 
(Boolean.parseBoolean(System.getProperty("druid.test.stupidPool.poison"))) {
+      POISONED.set(true);
+    }
+  }
+
+  public static boolean isPoisoned()
+  {
+    return POISONED.get();
+  }
+
   /**
    * StupidPool Implementation Note
    * It is assumed that StupidPools are never reclaimed by the GC, either 
stored in static fields or global singleton
@@ -65,6 +104,8 @@ public class StupidPool<T> implements NonBlockingPool<T>
   private final AtomicLong createdObjectsCounter = new AtomicLong(0);
   private final AtomicLong leakedObjectsCounter = new AtomicLong(0);
 
+  private final AtomicReference<RuntimeException> capturedException = new 
AtomicReference<>(null);
+
   //note that this is just the max entries in the cache, pool can still create 
as many buffers as needed.
   private final int objectsCacheMaxCount;
 
@@ -106,9 +147,20 @@ public class StupidPool<T> implements NonBlockingPool<T>
   {
     ObjectResourceHolder resourceHolder = objects.poll();
     if (resourceHolder == null) {
+      if (POISONED.get() && capturedException.get() != null) {
+        throw capturedException.get();
+      }
       return makeObjectWithHandler();
     } else {
       poolSize.decrementAndGet();
+      if (POISONED.get()) {
+        final RuntimeException exception = capturedException.get();
+        if (exception == null) {
+          resourceHolder.notifier.except = new RuntimeException("leaky leak!");
+        } else {
+          throw exception;
+        }
+      }
       return resourceHolder;
     }
   }
@@ -118,7 +170,7 @@ public class StupidPool<T> implements NonBlockingPool<T>
     T object = generator.get();
     createdObjectsCounter.incrementAndGet();
     ObjectId objectId = new ObjectId();
-    ObjectLeakNotifier notifier = new ObjectLeakNotifier(this);
+    ObjectLeakNotifier notifier = new ObjectLeakNotifier(this, POISONED.get());
     // Using objectId as referent for Cleaner, because if the object itself 
(e. g. ByteBuffer) is leaked after taken
     // from the pool, and the ResourceHolder is not closed, Cleaner won't 
notify about the leak.
     return new ObjectResourceHolder(object, objectId, 
Cleaners.register(objectId, notifier), notifier);
@@ -189,7 +241,7 @@ public class StupidPool<T> implements NonBlockingPool<T>
     );
   }
 
-  private class ObjectResourceHolder implements ResourceHolder<T>
+  class ObjectResourceHolder implements ResourceHolder<T>
   {
     private final AtomicReference<T> objectRef;
     private ObjectId objectId;
@@ -241,6 +293,11 @@ public class StupidPool<T> implements NonBlockingPool<T>
         }
       }
     }
+
+    void forceClean()
+    {
+      cleanable.clean();
+    }
   }
 
   private static class ObjectLeakNotifier implements Runnable
@@ -252,10 +309,14 @@ public class StupidPool<T> implements NonBlockingPool<T>
     final AtomicLong leakedObjectsCounter;
     final AtomicBoolean disabled = new AtomicBoolean(false);
 
-    ObjectLeakNotifier(StupidPool<?> pool)
+    private RuntimeException except;
+
+    ObjectLeakNotifier(StupidPool<?> pool, boolean poisoned)
     {
       poolReference = new WeakReference<>(pool);
       leakedObjectsCounter = pool.leakedObjectsCounter;
+
+      except = poisoned ? new RuntimeException("drip drip") : null;
     }
 
     @Override
@@ -264,7 +325,12 @@ public class StupidPool<T> implements NonBlockingPool<T>
       try {
         if (!disabled.getAndSet(true)) {
           leakedObjectsCounter.incrementAndGet();
-          log.warn("Not closed! Object leaked from %s. Allowing gc to prevent 
leak.", poolReference.get());
+          final StupidPool<?> pool = poolReference.get();
+          log.warn("Not closed! Object leaked from %s. Allowing gc to prevent 
leak.", pool);
+          if (except != null && pool != null) {
+            pool.capturedException.set(except);
+            log.error(except, "notifier[%s], dumping stack trace from object 
checkout and poisoning pool", this);
+          }
         }
       }
       // Exceptions must not be thrown in Cleaner.clean(), which calls this 
ObjectReclaimer.run() method
diff --git 
a/processing/src/main/java/org/apache/druid/segment/ColumnSelector.java 
b/core/src/test/java/org/apache/druid/StupidPoolPoisonedTest.java
similarity index 57%
copy from processing/src/main/java/org/apache/druid/segment/ColumnSelector.java
copy to core/src/test/java/org/apache/druid/StupidPoolPoisonedTest.java
index 38bfc0da60..43802c5087 100644
--- a/processing/src/main/java/org/apache/druid/segment/ColumnSelector.java
+++ b/core/src/test/java/org/apache/druid/StupidPoolPoisonedTest.java
@@ -17,31 +17,17 @@
  * under the License.
  */
 
-package org.apache.druid.segment;
+package org.apache.druid;
 
-import org.apache.druid.segment.column.ColumnCapabilities;
-import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.collections.StupidPool;
+import org.junit.Assert;
+import org.junit.Test;
 
-import javax.annotation.Nullable;
-import java.util.List;
-
-/**
- */
-public interface ColumnSelector extends ColumnInspector
+public class StupidPoolPoisonedTest
 {
-  List<String> getColumnNames();
-
-  @Nullable
-  ColumnHolder getColumnHolder(String columnName);
-
-  @Nullable
-  @Override
-  default ColumnCapabilities getColumnCapabilities(String column)
+  @Test
+  public void testStupidPoolPoisoned()
   {
-    final ColumnHolder columnHolder = getColumnHolder(column);
-    if (columnHolder == null) {
-      return null;
-    }
-    return columnHolder.getCapabilities();
+    Assert.assertTrue(StupidPool.isPoisoned());
   }
 }
diff --git 
a/core/src/test/java/org/apache/druid/collections/StupidPoolTest.java 
b/core/src/test/java/org/apache/druid/collections/StupidPoolTest.java
index c4d4108ce3..6a25e62a05 100644
--- a/core/src/test/java/org/apache/druid/collections/StupidPoolTest.java
+++ b/core/src/test/java/org/apache/druid/collections/StupidPoolTest.java
@@ -70,21 +70,22 @@ public class StupidPoolTest
   }
 
   @Test(timeout = 60_000L)
-  public void testResourceHandlerClearedByJVM() throws InterruptedException
+  public void testResourceHandlerClearedByJVM()
   {
-    String leakedString = createDanglingObjectHandler();
-    // Wait until dangling object string is returned to the pool
-    for (int i = 0; i < 6000 && poolOfString.leakedObjectsCount() == 0; i++) {
-      System.gc();
-      @SuppressWarnings("unused")
-      byte[] garbage = new byte[10_000_000];
-      Thread.sleep(10);
-    }
-    Assert.assertEquals(leakedString, 1, poolOfString.leakedObjectsCount());
-  }
+    StupidPool<String> poolOfString = new StupidPool<>("poolOfString", () -> 
"billybob");
 
-  private String createDanglingObjectHandler()
-  {
-    return poolOfString.take().get();
+    final StupidPool.ObjectResourceHolder take = 
(StupidPool.ObjectResourceHolder) poolOfString.take();
+    take.forceClean();
+
+    Assert.assertEquals("Expected there to be one leak", 1, 
poolOfString.leakedObjectsCount());
+
+    boolean exceptionThrown = false;
+    try {
+      poolOfString.take();
+    }
+    catch (Exception e) {
+      exceptionThrown = true;
+    }
+    Assert.assertTrue("Expect the pool to throw an exception as it should be 
poisoned", exceptionThrown);
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 7652a043ad..b4cf217d8f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -192,9 +192,11 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     this.lockGranularity = lockGranularity;
   }
 
-  public static CompactionState getDefaultCompactionState(Granularity 
segmentGranularity,
-                                                          Granularity 
queryGranularity,
-                                                          List<Interval> 
intervals) throws JsonProcessingException
+  public static CompactionState getDefaultCompactionState(
+      Granularity segmentGranularity,
+      Granularity queryGranularity,
+      List<Interval> intervals
+  ) throws JsonProcessingException
   {
     ObjectMapper mapper = new DefaultObjectMapper();
     // Expected compaction state to exist after compaction as we store 
compaction state by default
@@ -203,22 +205,22 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     expectedLongSumMetric.put("name", "val");
     expectedLongSumMetric.put("fieldName", "val");
     return new CompactionState(
-      new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
-      new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
-      ImmutableList.of(expectedLongSumMetric),
-      null,
-      mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class),
-      mapper.readValue(
-          mapper.writeValueAsString(
-              new UniformGranularitySpec(
-                  segmentGranularity,
-                  queryGranularity,
-                  true,
-                  intervals
-              )
-          ),
-          Map.class
-      )
+        new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
+        new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
+        ImmutableList.of(expectedLongSumMetric),
+        null,
+        mapper.readValue(mapper.writeValueAsString(new IndexSpec()), 
Map.class),
+        mapper.readValue(
+            mapper.writeValueAsString(
+                new UniformGranularitySpec(
+                    segmentGranularity,
+                    queryGranularity,
+                    true,
+                    intervals
+                )
+            ),
+            Map.class
+        )
     );
   }
 
@@ -264,7 +266,14 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
           segments.get(i).getInterval()
       );
       Assert.assertEquals(
-          getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, 
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1))),
+          getDefaultCompactionState(
+              Granularities.HOUR,
+              Granularities.MINUTE,
+              
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00",
+                                            i,
+                                            i + 1
+              ))
+          ),
           segments.get(i).getLastCompactionState()
       );
       if (lockGranularity == LockGranularity.SEGMENT) {
@@ -410,7 +419,14 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
           segments.get(i).getInterval()
       );
       Assert.assertEquals(
-          getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, 
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1))),
+          getDefaultCompactionState(
+              Granularities.HOUR,
+              Granularities.MINUTE,
+              
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00",
+                                            i,
+                                            i + 1
+              ))
+          ),
           segments.get(i).getLastCompactionState()
       );
       if (lockGranularity == LockGranularity.SEGMENT) {
@@ -440,7 +456,14 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
           segments.get(i).getInterval()
       );
       Assert.assertEquals(
-          getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, 
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1))),
+          getDefaultCompactionState(
+              Granularities.HOUR,
+              Granularities.MINUTE,
+              
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00",
+                                            i,
+                                            i + 1
+              ))
+          ),
           segments.get(i).getLastCompactionState()
       );
       if (lockGranularity == LockGranularity.SEGMENT) {
@@ -524,7 +547,10 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     Assert.assertEquals(6, segments.size());
 
     for (int i = 0; i < 6; i++) {
-      
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", 3 
+ i / 2, 3 + i / 2 + 1), segments.get(i).getInterval());
+      Assert.assertEquals(
+          Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", 3 + i / 2, 
3 + i / 2 + 1),
+          segments.get(i).getInterval()
+      );
       if (lockGranularity == LockGranularity.SEGMENT) {
         Assert.assertEquals(new NumberedShardSpec(i % 2, 0), 
segments.get(i).getShardSpec());
       } else {
@@ -543,7 +569,14 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
           segments.get(i).getInterval()
       );
       Assert.assertEquals(
-          getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, 
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1))),
+          getDefaultCompactionState(
+              Granularities.HOUR,
+              Granularities.MINUTE,
+              
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00",
+                                            i,
+                                            i + 1
+              ))
+          ),
           segments.get(i).getLastCompactionState()
       );
       if (lockGranularity == LockGranularity.SEGMENT) {
@@ -585,7 +618,11 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), 
segments.get(0).getInterval());
     Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(0).getShardSpec());
     Assert.assertEquals(
-        getDefaultCompactionState(Granularities.DAY, Granularities.MINUTE, 
ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))),
+        getDefaultCompactionState(
+            Granularities.DAY,
+            Granularities.MINUTE,
+            
ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))
+        ),
         segments.get(0).getLastCompactionState()
     );
 
@@ -603,10 +640,17 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     Assert.assertEquals(3, segments.size());
 
     for (int i = 0; i < 3; i++) {
-      
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", 
i, i + 1), segments.get(i).getInterval());
+      Assert.assertEquals(
+          Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
+          segments.get(i).getInterval()
+      );
       Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(i).getShardSpec());
       Assert.assertEquals(
-          getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, 
ImmutableList.of(Intervals.of("2014-01-01/2014-01-02"))),
+          getDefaultCompactionState(
+              Granularities.HOUR,
+              Granularities.MINUTE,
+              ImmutableList.of(Intervals.of("2014-01-01/2014-01-02"))
+          ),
           segments.get(i).getLastCompactionState()
       );
     }
@@ -685,7 +729,10 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     final CompactionTask compactionTask = builder
         .interval(Intervals.of("2014-01-01/2014-01-02"))
         .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null))
-        .metricsSpec(new AggregatorFactory[] {new 
CountAggregatorFactory("cnt"), new LongSumAggregatorFactory("val", "val")})
+        .metricsSpec(new AggregatorFactory[]{
+            new CountAggregatorFactory("cnt"),
+            new LongSumAggregatorFactory("val", "val")
+        })
         .build();
 
     Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask);
@@ -759,7 +806,11 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), 
segments.get(0).getInterval());
     Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(0).getShardSpec());
     Assert.assertEquals(
-        getDefaultCompactionState(Granularities.DAY, Granularities.MINUTE, 
ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))),
+        getDefaultCompactionState(
+            Granularities.DAY,
+            Granularities.MINUTE,
+            
ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))
+        ),
         segments.get(0).getLastCompactionState()
     );
 
@@ -777,10 +828,17 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     Assert.assertEquals(3, segments.size());
 
     for (int i = 0; i < 3; i++) {
-      
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", 
i, i + 1), segments.get(i).getInterval());
+      Assert.assertEquals(
+          Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
+          segments.get(i).getInterval()
+      );
       Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(i).getShardSpec());
       Assert.assertEquals(
-          getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, 
ImmutableList.of(Intervals.of("2014-01-01/2014-01-02"))),
+          getDefaultCompactionState(
+              Granularities.HOUR,
+              Granularities.MINUTE,
+              ImmutableList.of(Intervals.of("2014-01-01/2014-01-02"))
+          ),
           segments.get(i).getLastCompactionState()
       );
     }
@@ -817,7 +875,14 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
           segments.get(i).getInterval()
       );
       Assert.assertEquals(
-          getDefaultCompactionState(Granularities.HOUR, Granularities.SECOND, 
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1))),
+          getDefaultCompactionState(
+              Granularities.HOUR,
+              Granularities.SECOND,
+              
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00",
+                                            i,
+                                            i + 1
+              ))
+          ),
           segments.get(i).getLastCompactionState()
       );
       if (lockGranularity == LockGranularity.SEGMENT) {
@@ -859,7 +924,11 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), 
segments.get(0).getInterval());
     Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(0).getShardSpec());
     Assert.assertEquals(
-        getDefaultCompactionState(Granularities.DAY, Granularities.DAY, 
ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))),
+        getDefaultCompactionState(
+            Granularities.DAY,
+            Granularities.DAY,
+            
ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))
+        ),
         segments.get(0).getLastCompactionState()
     );
   }
@@ -894,7 +963,14 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
           segments.get(i).getInterval()
       );
       Assert.assertEquals(
-          getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, 
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1))),
+          getDefaultCompactionState(
+              Granularities.HOUR,
+              Granularities.MINUTE,
+              
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00",
+                                            i,
+                                            i + 1
+              ))
+          ),
           segments.get(i).getLastCompactionState()
       );
       if (lockGranularity == LockGranularity.SEGMENT) {
@@ -943,7 +1019,8 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
   }
 
   @Test
-  public void 
testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingTrue()
 throws Exception
+  public void 
testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingTrue()
+      throws Exception
   {
     // This test fails with segment lock because of the bug reported in 
https://github.com/apache/druid/issues/10911.
     if (lockGranularity == LockGranularity.SEGMENT) {
@@ -1030,8 +1107,8 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
                                       .collect(Collectors.toList());
     final List<DataSegment> tombstonesAfterPartialCompaction =
         segmentsAfterPartialCompaction.stream()
-                                   .filter(s -> s.isTombstone())
-                                   .collect(Collectors.toList());
+                                      .filter(s -> s.isTombstone())
+                                      .collect(Collectors.toList());
     Assert.assertEquals(59, tombstonesAfterPartialCompaction.size());
     Assert.assertEquals(5, realSegmentsAfterPartialCompaction.size());
     Assert.assertEquals(64, segmentsAfterPartialCompaction.size());
@@ -1211,7 +1288,8 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
   }
 
   @Test
-  public void 
testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingFalse()
 throws Exception
+  public void 
testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingFalse()
+      throws Exception
   {
     // This test fails with segment lock because of the bug reported in 
https://github.com/apache/druid/issues/10911.
     if (lockGranularity == LockGranularity.SEGMENT) {
@@ -1413,10 +1491,10 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
   /**
    * Run a regular index task that's equivalent to the compaction task in 
{@link #testRunWithDynamicPartitioning()},
    * using {@link IngestSegmentFirehoseFactory}.
-   *
+   * <p>
    * This is not entirely CompactionTask related, but it's similar 
conceptually and it requires
    * similar setup to what this test suite already has.
-   *
+   * <p>
    * It could be moved to a separate test class if needed.
    */
   @Test
@@ -1642,11 +1720,10 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
   private List<String> getCSVFormatRowsFromSegments(List<DataSegment> 
segments) throws Exception
   {
-
     final File cacheDir = temporaryFolder.newFolder();
     final SegmentCacheManager segmentCacheManager = 
segmentCacheManagerFactory.manufacturate(cacheDir);
 
-    List<Cursor> cursors = new ArrayList<>();
+    List<String> rowsFromSegment = new ArrayList<>();
     for (DataSegment segment : segments) {
       final File segmentFile = segmentCacheManager.getSegmentFiles(segment);
 
@@ -1662,39 +1739,40 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
           false,
           null
       );
-      cursors.addAll(cursorSequence.toList());
-    }
 
-    List<String> rowsFromSegment = new ArrayList<>();
-    for (Cursor cursor : cursors) {
-      cursor.reset();
-      while (!cursor.isDone()) {
-        final DimensionSelector selector1 = cursor.getColumnSelectorFactory()
-                                                  .makeDimensionSelector(new 
DefaultDimensionSpec("ts", "ts"));
-        final DimensionSelector selector2 = cursor.getColumnSelectorFactory()
-                                                  .makeDimensionSelector(new 
DefaultDimensionSpec("dim", "dim"));
-        final DimensionSelector selector3 = cursor.getColumnSelectorFactory()
-                                                  .makeDimensionSelector(new 
DefaultDimensionSpec("val", "val"));
-
-        Object dimObject = selector2.getObject();
-        String dimVal = null;
-        if (dimObject instanceof String) {
-          dimVal = (String) dimObject;
-        } else if (dimObject instanceof List) {
-          dimVal = String.join("|", (List<String>) dimObject);
-        }
+      cursorSequence.accumulate(rowsFromSegment, (accumulated, cursor) -> {
+        cursor.reset();
+        while (!cursor.isDone()) {
+          final DimensionSelector selector1 = cursor.getColumnSelectorFactory()
+                                                    .makeDimensionSelector(new 
DefaultDimensionSpec("ts", "ts"));
+          final DimensionSelector selector2 = cursor.getColumnSelectorFactory()
+                                                    .makeDimensionSelector(new 
DefaultDimensionSpec("dim", "dim"));
+          final DimensionSelector selector3 = cursor.getColumnSelectorFactory()
+                                                    .makeDimensionSelector(new 
DefaultDimensionSpec("val", "val"));
+
+          Object dimObject = selector2.getObject();
+          String dimVal = null;
+          if (dimObject instanceof String) {
+            dimVal = (String) dimObject;
+          } else if (dimObject instanceof List) {
+            dimVal = String.join("|", (List<String>) dimObject);
+          }
 
-        rowsFromSegment.add(
-            makeCSVFormatRow(
-                selector1.getObject().toString(),
-                dimVal,
-                selector3.defaultGetObject().toString()
-            )
-        );
+          rowsFromSegment.add(
+              makeCSVFormatRow(
+                  selector1.getObject().toString(),
+                  dimVal,
+                  selector3.defaultGetObject().toString()
+              )
+          );
 
-        cursor.advance();
-      }
+          cursor.advance();
+        }
+
+        return accumulated;
+      });
     }
+
     return rowsFromSegment;
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index d1bc388752..92bb04d6cb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -325,15 +325,18 @@ public class SeekableStreamIndexTaskTestBase extends 
EasyMockSupport
         false
     );
     IndexIO indexIO = new TestUtils().getTestIndexIO();
-    QueryableIndex index = indexIO.loadIndex(outputLocation);
-    DictionaryEncodedColumn<String> theColumn =
-        (DictionaryEncodedColumn<String>) 
index.getColumnHolder(column).getColumn();
     List<String> values = new ArrayList<>();
-    for (int i = 0; i < theColumn.length(); i++) {
-      int id = theColumn.getSingleValueRow(i);
-      String value = theColumn.lookupName(id);
-      values.add(value);
+
+    QueryableIndex index = indexIO.loadIndex(outputLocation);
+    try (DictionaryEncodedColumn<String> theColumn =
+        (DictionaryEncodedColumn<String>) 
index.getColumnHolder(column).getColumn()) {
+      for (int i = 0; i < theColumn.length(); i++) {
+        int id = theColumn.getSingleValueRow(i);
+        String value = theColumn.lookupName(id);
+        values.add(value);
+      }
     }
+
     return values;
   }
 
diff --git a/pom.xml b/pom.xml
index 8aa6b4c71b..aee015cc37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1516,8 +1516,9 @@
                         <!-- set default options -->
                         <argLine>
                             @{jacocoArgLine}
+                            ${jdk.surefire.argLine}
                             -Xmx1500m
-                            -XX:MaxDirectMemorySize=512m
+                            -XX:MaxDirectMemorySize=2500m
                             -XX:+ExitOnOutOfMemoryError
                             -XX:+HeapDumpOnOutOfMemoryError
                             -Duser.language=en
@@ -1526,6 +1527,7 @@
                             -Duser.timezone=UTC
                             
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
                             -Daws.region=us-east-1 <!-- required for 
s3-related unit tests -->
+                            -Ddruid.test.stupidPool.poison=true
                             <!--@TODO After fixing 
https://github.com/apache/druid/issues/4964 remove this parameter-->
                             -Ddruid.indexing.doubleStorage=double
                         </argLine>
@@ -1668,6 +1670,7 @@
                     <!-- required for JvmMonitor tests on Java 11+ -->
                     --add-exports=java.base/jdk.internal.perf=ALL-UNNAMED
                     
--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED
+                    --add-opens java.base/jdk.internal.ref=ALL-UNNAMED
                 </jdk.surefire.argLine>
             </properties>
             <build>
@@ -1760,6 +1763,7 @@
                                 -XX:+HeapDumpOnOutOfMemoryError
                                 -Duser.timezone=UTC 
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
                                 -Daws.region=us-east-1 <!-- required for 
s3-related unit tests -->
+                                -Ddruid.test.stupidPool.poison=true
                                 <!--@TODO After fixing 
https://github.com/apache/druid/issues/4964 remove this parameter-->
                                 -Ddruid.indexing.doubleStorage=double
                             </argLine>
diff --git a/processing/pom.xml b/processing/pom.xml
index 4bd7a2c3c7..80a8c80016 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -285,25 +285,6 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
                 <configuration>
-                    <!-- locale settings must be set on the command line 
before startup -->
-                    <!-- set default options -->
-                    <argLine>
-                        @{jacocoArgLine}
-                        ${jdk.surefire.argLine}
-                        -Xmx512m
-                        -XX:MaxDirectMemorySize=2500m
-                        -XX:+ExitOnOutOfMemoryError
-                        -XX:+HeapDumpOnOutOfMemoryError
-                        -Duser.language=en
-                        -Duser.GroupByQueryRunnerTest.javacountry=US
-                        -Dfile.encoding=UTF-8
-                        -Duser.timezone=UTC
-                        
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
-                        <!--@TODO After fixing 
https://github.com/apache/druid/issues/4964 remove this parameter-->
-                        -Ddruid.indexing.doubleStorage=double
-                    </argLine>
-                    <!-- our tests are very verbose, let's keep the volume 
down -->
-                    <redirectTestOutputToFile>true</redirectTestOutputToFile>
                     
<excludedGroups>org.apache.druid.collections.test.annotation.Benchmark</excludedGroups>
                 </configuration>
             </plugin>
diff --git 
a/processing/src/main/java/org/apache/druid/query/filter/FilterTuning.java 
b/processing/src/main/java/org/apache/druid/query/filter/FilterTuning.java
index 61210e695c..415d2e8369 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/FilterTuning.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/FilterTuning.java
@@ -29,7 +29,7 @@ import java.util.Objects;
 
 /**
  * This class provides a mechanism to influence whether or not indexes are 
used for a {@link Filter} during processing
- * by {@link 
org.apache.druid.segment.QueryableIndexStorageAdapter#analyzeFilter} (i.e. will 
a {@link Filter} be a "pre"
+ * by {@link org.apache.druid.segment.FilterAnalysis#analyzeFilter} (i.e. will 
a {@link Filter} be a "pre"
  * filter in which we union indexes for all values that match the filter to 
create a
  * {@link org.apache.druid.segment.BitmapOffset}/{@link 
org.apache.druid.segment.vector.BitmapVectorOffset}, or will it
  * be used as a "post" filter and evaluated while scanning row values from the
diff --git 
a/processing/src/main/java/org/apache/druid/query/search/AutoStrategy.java 
b/processing/src/main/java/org/apache/druid/query/search/AutoStrategy.java
index a62d5a6e8e..89d7d47e3f 100644
--- a/processing/src/main/java/org/apache/druid/query/search/AutoStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/search/AutoStrategy.java
@@ -22,7 +22,9 @@ package org.apache.druid.query.search;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.query.dimension.DimensionSpec;
 import org.apache.druid.query.filter.ColumnIndexSelector;
+import org.apache.druid.segment.ColumnSelector;
 import org.apache.druid.segment.ColumnSelectorColumnIndexSelector;
+import org.apache.druid.segment.DeprecatedQueryableIndexColumnSelector;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.VirtualColumns;
@@ -54,10 +56,11 @@ public class AutoStrategy extends SearchStrategy
     final QueryableIndex index = segment.asQueryableIndex();
 
     if (index != null) {
+      final ColumnSelector columnSelector = new 
DeprecatedQueryableIndexColumnSelector(index);
       final ColumnIndexSelector selector = new 
ColumnSelectorColumnIndexSelector(
           index.getBitmapFactoryForDimensions(),
           VirtualColumns.EMPTY,
-          index
+          columnSelector
       );
 
       // Index-only plan is used only when any filter is not specified or the 
filter supports bitmap indexes.
@@ -65,7 +68,7 @@ public class AutoStrategy extends SearchStrategy
       // Note: if some filters support bitmap indexes but others are not, the 
current implementation always employs
       // the cursor-based plan. This can be more optimized. One possible 
optimization is generating a bitmap index
       // from the non-bitmap-support filters, and then use it to compute the 
filtered result by intersecting bitmaps.
-      if (filter == null || filter.supportsSelectivityEstimation(index, 
selector)) {
+      if (filter == null || 
filter.supportsSelectivityEstimation(columnSelector, selector)) {
         final List<DimensionSpec> dimsToSearch = getDimsToSearch(
             index.getAvailableDimensions(),
             query.getDimensions()
diff --git 
a/processing/src/main/java/org/apache/druid/query/search/UseIndexesStrategy.java
 
b/processing/src/main/java/org/apache/druid/query/search/UseIndexesStrategy.java
index 70f992155e..a08ee56985 100644
--- 
a/processing/src/main/java/org/apache/druid/query/search/UseIndexesStrategy.java
+++ 
b/processing/src/main/java/org/apache/druid/query/search/UseIndexesStrategy.java
@@ -34,6 +34,7 @@ import org.apache.druid.query.filter.ColumnIndexSelector;
 import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.search.CursorOnlyStrategy.CursorBasedExecutor;
 import org.apache.druid.segment.ColumnSelectorColumnIndexSelector;
+import org.apache.druid.segment.DeprecatedQueryableIndexColumnSelector;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.StorageAdapter;
@@ -84,7 +85,7 @@ public class UseIndexesStrategy extends SearchStrategy
         final ColumnIndexSelector selector = new 
ColumnSelectorColumnIndexSelector(
             index.getBitmapFactoryForDimensions(),
             VirtualColumns.EMPTY,
-            index
+            new DeprecatedQueryableIndexColumnSelector(index)
         );
 
         // Index-only plan is used only when any filter is not specified or 
the filter supports bitmap indexes.
@@ -158,7 +159,7 @@ public class UseIndexesStrategy extends SearchStrategy
       final ColumnIndexSelector selector = new 
ColumnSelectorColumnIndexSelector(
           index.getBitmapFactoryForDimensions(),
           VirtualColumns.EMPTY,
-          index
+          new DeprecatedQueryableIndexColumnSelector(index)
       );
       final BitmapColumnIndex columnIndex = 
filter.getBitmapColumnIndex(selector);
       Preconditions.checkNotNull(
diff --git 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
index 6981751e12..f65020a8b9 100644
--- 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
+++ 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
@@ -161,7 +161,7 @@ public class TimeseriesQueryEngine
       );
 
       if (granularizer == null) {
-        return Sequences.empty();
+        return Sequences.withBaggage(Sequences.empty(), closer);
       }
 
       final VectorColumnSelectorFactory columnSelectorFactory = 
cursor.getColumnSelectorFactory();
diff --git 
a/processing/src/main/java/org/apache/druid/segment/AbstractIndex.java 
b/processing/src/main/java/org/apache/druid/segment/AbstractIndex.java
index 7425805341..42ec6c8b4e 100644
--- a/processing/src/main/java/org/apache/druid/segment/AbstractIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/AbstractIndex.java
@@ -36,38 +36,38 @@ public abstract class AbstractIndex
   @Override
   public String toString()
   {
-    StringBuilder sb = new StringBuilder();
-    StorageAdapter storageAdapter = toStorageAdapter();
-    List<Cursor> cursors = storageAdapter.makeCursors(
-        null,
-        Intervals.ETERNITY,
-        VirtualColumns.EMPTY,
-        Granularities.ALL,
-        false,
-        null
-    ).toList();
     List<String> columnNames = new ArrayList<>();
     columnNames.add(ColumnHolder.TIME_COLUMN_NAME);
     columnNames.addAll(getColumnNames());
-    for (Cursor cursor : cursors) {
-      ColumnSelectorFactory columnSelectorFactory = 
cursor.getColumnSelectorFactory();
-      List<ColumnValueSelector> selectors =
-          
columnNames.stream().map(columnSelectorFactory::makeColumnValueSelector).collect(Collectors.toList());
-      while (!cursor.isDone()) {
-        sb.append('[');
-        for (int i = 0; i < selectors.size(); i++) {
-          sb.append(columnNames.get(i)).append('=');
-          ColumnValueSelector selector = selectors.get(i);
-          Object columnValue = selector.getObject();
-          sb.append(columnValue);
-          sb.append(", ");
-        }
-        sb.setLength(sb.length() - 2);
-        sb.append("]\n");
-        cursor.advance();
-      }
-      sb.append("\n");
-    }
-    return sb.toString();
+
+    return toStorageAdapter()
+        .makeCursors(
+            null,
+            Intervals.ETERNITY,
+            VirtualColumns.EMPTY,
+            Granularities.ALL,
+            false,
+            null
+        )
+        .accumulate(new StringBuilder(), (sb, cursor) -> {
+          ColumnSelectorFactory columnSelectorFactory = 
cursor.getColumnSelectorFactory();
+          List<ColumnValueSelector> selectors =
+              
columnNames.stream().map(columnSelectorFactory::makeColumnValueSelector).collect(Collectors.toList());
+          while (!cursor.isDone()) {
+            sb.append('[');
+            for (int i = 0; i < selectors.size(); i++) {
+              sb.append(columnNames.get(i)).append('=');
+              ColumnValueSelector selector = selectors.get(i);
+              Object columnValue = selector.getObject();
+              sb.append(columnValue);
+              sb.append(", ");
+            }
+            sb.setLength(sb.length() - 2);
+            sb.append("]\n");
+            cursor.advance();
+          }
+          sb.append("\n");
+          return sb;
+        }).toString();
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/ColumnCache.java 
b/processing/src/main/java/org/apache/druid/segment/ColumnCache.java
new file mode 100644
index 0000000000..fb285fa282
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/ColumnCache.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.column.BaseColumn;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+
+public class ColumnCache implements ColumnSelector
+{
+  private final HashMap<String, ColumnHolder> holderCache;
+  private final QueryableIndex index;
+  private final Closer closer;
+
+  public ColumnCache(QueryableIndex index, Closer closer)
+  {
+    this.index = index;
+    this.closer = closer;
+
+    this.holderCache = new HashMap<>();
+  }
+
+
+  @Override
+  public List<String> getColumnNames()
+  {
+    return index.getColumnNames();
+  }
+
+  @Nullable
+  @Override
+  public ColumnHolder getColumnHolder(String columnName)
+  {
+    return holderCache.computeIfAbsent(columnName, dimension -> {
+      // Here we do a funny little dance to memoize the BaseColumn and 
register it with the closer.
+      // It would probably be cleaner if the ColumnHolder itself was 
`Closeable` and did its own memoization,
+      // but that change is much wider and runs the risk of even more things 
that need to close the thing
+      // not actually closing it.  So, maybe this is a hack, maybe it's a wise 
decision, who knows, but at
+      // least for now, we grab the holder, grab the column, register the 
column with the closer and then return
+      // a new holder that always returns the same reference for the column.
+
+      final ColumnHolder holder = index.getColumnHolder(columnName);
+      if (holder == null) {
+        return null;
+      }
+
+      return new ColumnHolder()
+      {
+        @Nullable
+        private BaseColumn theColumn = null;
+
+        @Override
+        public ColumnCapabilities getCapabilities()
+        {
+          return holder.getCapabilities();
+        }
+
+        @Override
+        public int getLength()
+        {
+          return holder.getLength();
+        }
+
+        @Override
+        public BaseColumn getColumn()
+        {
+          if (theColumn == null) {
+            theColumn = closer.register(holder.getColumn());
+          }
+          return theColumn;
+        }
+
+        @Nullable
+        @Override
+        public ColumnIndexSupplier getIndexSupplier()
+        {
+          return holder.getIndexSupplier();
+        }
+
+        @Override
+        public SettableColumnValueSelector makeNewSettableColumnValueSelector()
+        {
+          return holder.makeNewSettableColumnValueSelector();
+        }
+      };
+    });
+  }
+
+  @Nullable
+  public BaseColumn getColumn(String columnName)
+  {
+    final ColumnHolder retVal = getColumnHolder(columnName);
+    return retVal == null ? null : retVal.getColumn();
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/ColumnSelector.java 
b/processing/src/main/java/org/apache/druid/segment/ColumnSelector.java
index 38bfc0da60..32a3e95fb4 100644
--- a/processing/src/main/java/org/apache/druid/segment/ColumnSelector.java
+++ b/processing/src/main/java/org/apache/druid/segment/ColumnSelector.java
@@ -29,6 +29,10 @@ import java.util.List;
  */
 public interface ColumnSelector extends ColumnInspector
 {
+  /**
+   * This method is apparently no longer used anymore, so deprecating it.
+   */
+  @Deprecated
   List<String> getColumnNames();
 
   @Nullable
diff --git 
a/processing/src/main/java/org/apache/druid/segment/CompressedPools.java 
b/processing/src/main/java/org/apache/druid/segment/CompressedPools.java
index d62443270b..449a25f7c1 100644
--- a/processing/src/main/java/org/apache/druid/segment/CompressedPools.java
+++ b/processing/src/main/java/org/apache/druid/segment/CompressedPools.java
@@ -21,7 +21,6 @@ package org.apache.druid.segment;
 
 import com.google.common.base.Supplier;
 import com.ning.compress.BufferRecycler;
-import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.collections.StupidPool;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -35,7 +34,7 @@ public class CompressedPools
   private static final Logger log = new Logger(CompressedPools.class);
 
   public static final int BUFFER_SIZE = 0x10000;
-  private static final NonBlockingPool<BufferRecycler> BUFFER_RECYCLER_POOL = 
new StupidPool<>(
+  private static final StupidPool<BufferRecycler> BUFFER_RECYCLER_POOL = new 
StupidPool<>(
       "bufferRecyclerPool",
       new Supplier<BufferRecycler>()
       {
@@ -55,7 +54,7 @@ public class CompressedPools
     return BUFFER_RECYCLER_POOL.take();
   }
 
-  private static final NonBlockingPool<byte[]> OUTPUT_BYTES_POOL = new 
StupidPool<byte[]>(
+  private static final StupidPool<byte[]> OUTPUT_BYTES_POOL = new 
StupidPool<byte[]>(
       "outputBytesPool",
       new Supplier<byte[]>()
       {
@@ -75,7 +74,7 @@ public class CompressedPools
     return OUTPUT_BYTES_POOL.take();
   }
 
-  private static final NonBlockingPool<ByteBuffer> BIG_ENDIAN_BYTE_BUF_POOL = 
new StupidPool<ByteBuffer>(
+  private static final StupidPool<ByteBuffer> BIG_ENDIAN_BYTE_BUF_POOL = new 
StupidPool<ByteBuffer>(
       "bigEndByteBufPool",
       new Supplier<ByteBuffer>()
       {
@@ -90,7 +89,7 @@ public class CompressedPools
       }
   );
 
-  private static final NonBlockingPool<ByteBuffer> LITTLE_ENDIAN_BYTE_BUF_POOL 
= new StupidPool<ByteBuffer>(
+  private static final StupidPool<ByteBuffer> LITTLE_ENDIAN_BYTE_BUF_POOL = 
new StupidPool<>(
       "littleEndByteBufPool",
       new Supplier<ByteBuffer>()
       {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/DeprecatedQueryableIndexColumnSelector.java
 
b/processing/src/main/java/org/apache/druid/segment/DeprecatedQueryableIndexColumnSelector.java
new file mode 100644
index 0000000000..0eb0180ea0
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/DeprecatedQueryableIndexColumnSelector.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.segment.column.ColumnHolder;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * It likely looks weird that we are creating a new instance of ColumnSelector 
here that begins its life deprecated
+ * and only delegates methods to the Queryable Index. This is done 
intentionally so that the QueryableIndex doesn't
+ * accidentally get used as a ColumnSelector.
+ *
+ * The lifecycle of the QueryableIndex is over the lifetime of the segment on 
a specific process, while
+ * the ColumnSelector's lifecycle is for a given query.  When we don't use the 
same ColumnSelector for an
+ * entire query, we defeat caching and use a lot more resources than necessary 
for queries.
+ *
+ * Places that use this class are intentionally circumventing column caching 
and column lifecycle management,
+ * ostensibly because those code locations know that they are only looking at 
metadata.  If a code path uses this
+ * and actually accesses a column instead of just looking at metadata, it will 
leak any resources that said column
+ * requires.
+ *
+ * The ColumnCache is the preferred implementation of a ColumnSelector, it 
takes a Closer and that closer can be used
+ * to ensure that resources are cleaned up.
+ */
+@Deprecated
+public class DeprecatedQueryableIndexColumnSelector implements ColumnSelector
+{
+  private final QueryableIndex index;
+
+  public DeprecatedQueryableIndexColumnSelector(QueryableIndex index)
+  {
+    this.index = index;
+  }
+
+  @Override
+  public List<String> getColumnNames()
+  {
+    return index.getColumnNames();
+  }
+
+  @Nullable
+  @Override
+  public ColumnHolder getColumnHolder(String columnName)
+  {
+    return index.getColumnHolder(columnName);
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/FilterAnalysis.java 
b/processing/src/main/java/org/apache/druid/segment/FilterAnalysis.java
new file mode 100644
index 0000000000..fc9529f7d2
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/FilterAnalysis.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.query.BitmapResultFactory;
+import org.apache.druid.query.DefaultBitmapResultFactory;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.segment.column.BitmapColumnIndex;
+import org.apache.druid.segment.filter.AndFilter;
+import org.apache.druid.segment.filter.Filters;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class FilterAnalysis
+{
+  @SuppressWarnings("rawtypes")
+  public static FilterAnalysis analyzeFilter(
+      @Nullable final Filter filter,
+      ColumnSelectorColumnIndexSelector indexSelector,
+      @Nullable QueryMetrics queryMetrics,
+      int numRows
+  )
+  {
+
+    /*
+     * Filters can be applied in two stages:
+     * pre-filtering: Use bitmap indexes to prune the set of rows to be 
scanned.
+     * post-filtering: Iterate through rows and apply the filter to the row 
values
+     *
+     * The pre-filter and post-filter step have an implicit AND relationship. 
(i.e., final rows are those that
+     * were not pruned AND those that matched the filter during row scanning)
+     *
+     * An AND filter can have its subfilters partitioned across the two steps. 
The subfilters that can be
+     * processed entirely with bitmap indexes (subfilter returns non-null 
value for getBitmapColumnIndex)
+     * will be moved to the pre-filtering stage.
+     *
+     * Any subfilters that cannot be processed entirely with bitmap indexes 
will be moved to the post-filtering stage.
+     */
+    final List<Filter> preFilters;
+    final List<Filter> postFilters = new ArrayList<>();
+    int preFilteredRows = numRows;
+    if (filter == null) {
+      preFilters = Collections.emptyList();
+    } else {
+      preFilters = new ArrayList<>();
+
+      if (filter instanceof AndFilter) {
+        // If we get an AndFilter, we can split the subfilters across both 
filtering stages
+        for (Filter subfilter : ((AndFilter) filter).getFilters()) {
+
+          final BitmapColumnIndex columnIndex = 
subfilter.getBitmapColumnIndex(indexSelector);
+
+          if (columnIndex == null) {
+            postFilters.add(subfilter);
+          } else {
+            preFilters.add(subfilter);
+            if (!columnIndex.getIndexCapabilities().isExact()) {
+              postFilters.add(subfilter);
+            }
+          }
+        }
+      } else {
+        // If we get an OrFilter or a single filter, handle the filter in one 
stage
+        final BitmapColumnIndex columnIndex = 
filter.getBitmapColumnIndex(indexSelector);
+        if (columnIndex == null) {
+          postFilters.add(filter);
+        } else {
+          preFilters.add(filter);
+          if (!columnIndex.getIndexCapabilities().isExact()) {
+            postFilters.add(filter);
+          }
+        }
+      }
+    }
+
+    final ImmutableBitmap preFilterBitmap;
+    if (preFilters.isEmpty()) {
+      preFilterBitmap = null;
+    } else {
+      if (queryMetrics != null) {
+        BitmapResultFactory<?> bitmapResultFactory =
+            
queryMetrics.makeBitmapResultFactory(indexSelector.getBitmapFactory());
+        long bitmapConstructionStartNs = System.nanoTime();
+        // Use AndFilter.getBitmapIndex to intersect the preFilters to get its 
short-circuiting behavior.
+        preFilterBitmap = AndFilter.getBitmapIndex(indexSelector, 
bitmapResultFactory, preFilters);
+        preFilteredRows = preFilterBitmap.size();
+        queryMetrics.reportBitmapConstructionTime(System.nanoTime() - 
bitmapConstructionStartNs);
+      } else {
+        BitmapResultFactory<?> bitmapResultFactory = new 
DefaultBitmapResultFactory(indexSelector.getBitmapFactory());
+        preFilterBitmap = AndFilter.getBitmapIndex(indexSelector, 
bitmapResultFactory, preFilters);
+      }
+    }
+
+    if (queryMetrics != null) {
+      queryMetrics.preFilters(new ArrayList<>(preFilters));
+      queryMetrics.postFilters(postFilters);
+      queryMetrics.reportSegmentRows(numRows);
+      queryMetrics.reportPreFilteredRows(preFilteredRows);
+    }
+
+    return new FilterAnalysis(preFilterBitmap, 
Filters.maybeAnd(postFilters).orElse(null));
+  }
+
+  private final Filter postFilter;
+  private final ImmutableBitmap preFilterBitmap;
+
+  public FilterAnalysis(
+      @Nullable final ImmutableBitmap preFilterBitmap,
+      @Nullable final Filter postFilter
+  )
+  {
+    this.preFilterBitmap = preFilterBitmap;
+    this.postFilter = postFilter;
+  }
+
+  @Nullable
+  public ImmutableBitmap getPreFilterBitmap()
+  {
+    return preFilterBitmap;
+  }
+
+  @Nullable
+  public Filter getPostFilter()
+  {
+    return postFilter;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java 
b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
index 6068e8c748..f593f104eb 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
@@ -151,36 +151,39 @@ public class IndexIO
         throw new SegmentValidationException("Metric names differ. Expected 
[%s] found [%s]", metNames1, metNames2);
       }
     }
-    final RowIterator it1 = adapter1.getRows();
-    final RowIterator it2 = adapter2.getRows();
-    long row = 0L;
-    while (it1.moveToNext()) {
-      if (!it2.moveToNext()) {
-        throw new SegmentValidationException("Unexpected end of second 
adapter");
-      }
-      final RowPointer rp1 = it1.getPointer();
-      final RowPointer rp2 = it2.getPointer();
-      ++row;
-      if (rp1.getRowNum() != rp2.getRowNum()) {
-        throw new SegmentValidationException("Row number mismatch: [%d] vs 
[%d]", rp1.getRowNum(), rp2.getRowNum());
+    try (
+        final RowIterator it1 = adapter1.getRows();
+        final RowIterator it2 = adapter2.getRows()
+    ) {
+      long row = 0L;
+      while (it1.moveToNext()) {
+        if (!it2.moveToNext()) {
+          throw new SegmentValidationException("Unexpected end of second 
adapter");
+        }
+        final RowPointer rp1 = it1.getPointer();
+        final RowPointer rp2 = it2.getPointer();
+        ++row;
+        if (rp1.getRowNum() != rp2.getRowNum()) {
+          throw new SegmentValidationException("Row number mismatch: [%d] vs 
[%d]", rp1.getRowNum(), rp2.getRowNum());
+        }
+        try {
+          validateRowValues(rp1, adapter1, rp2, adapter2);
+        }
+        catch (SegmentValidationException ex) {
+          throw new SegmentValidationException(ex, "Validation failure on row 
%d: [%s] vs [%s]", row, rp1, rp2);
+        }
       }
-      try {
-        validateRowValues(rp1, adapter1, rp2, adapter2);
+      if (it2.moveToNext()) {
+        throw new SegmentValidationException("Unexpected end of first 
adapter");
       }
-      catch (SegmentValidationException ex) {
-        throw new SegmentValidationException(ex, "Validation failure on row 
%d: [%s] vs [%s]", row, rp1, rp2);
+      if (row != adapter1.getNumRows()) {
+        throw new SegmentValidationException(
+            "Actual Row count mismatch. Expected [%d] found [%d]",
+            row,
+            adapter1.getNumRows()
+        );
       }
     }
-    if (it2.moveToNext()) {
-      throw new SegmentValidationException("Unexpected end of first adapter");
-    }
-    if (row != adapter1.getNumRows()) {
-      throw new SegmentValidationException(
-          "Actual Row count mismatch. Expected [%d] found [%d]",
-          row,
-          adapter1.getNumRows()
-      );
-    }
   }
 
   public QueryableIndex loadIndex(File inDir) throws IOException
diff --git 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndex.java 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndex.java
index bf5f9d1832..395fe26636 100644
--- a/processing/src/main/java/org/apache/druid/segment/QueryableIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndex.java
@@ -20,12 +20,15 @@
 package org.apache.druid.segment;
 
 import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.data.Indexed;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -35,7 +38,7 @@ import java.util.Map;
  * @see QueryableIndexStorageAdapter for query path adapter
  * @see QueryableIndexIndexableAdapter for indexing path adapter
  */
-public interface QueryableIndex extends ColumnSelector, Closeable
+public interface QueryableIndex extends Closeable
 {
   Interval getDataInterval();
   int getNumRows();
@@ -44,6 +47,21 @@ public interface QueryableIndex extends ColumnSelector, 
Closeable
   @Nullable Metadata getMetadata();
   Map<String, DimensionHandler> getDimensionHandlers();
 
+  List<String> getColumnNames();
+
+  @Nullable
+  ColumnHolder getColumnHolder(String columnName);
+
+  @Nullable
+  default ColumnCapabilities getColumnCapabilities(String column)
+  {
+    final ColumnHolder columnHolder = getColumnHolder(column);
+    if (columnHolder == null) {
+      return null;
+    }
+    return columnHolder.getCapabilities();
+  }
+
   /**
    * The close method shouldn't actually be here as this is nasty. We will 
adjust it in the future.
    * @throws IOException if an exception was thrown closing the index
diff --git 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexColumnSelectorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexColumnSelectorFactory.java
index bc146f019a..d7482d1404 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexColumnSelectorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexColumnSelectorFactory.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.segment;
 
-import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.dimension.DimensionSpec;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.segment.column.BaseColumn;
@@ -41,33 +40,27 @@ import java.util.function.Function;
  */
 public class QueryableIndexColumnSelectorFactory implements 
ColumnSelectorFactory, RowIdSupplier
 {
-  private final QueryableIndex index;
   private final VirtualColumns virtualColumns;
   private final boolean descending;
-  private final Closer closer;
   protected final ReadableOffset offset;
 
   // Share Column objects, since they cache decompressed buffers internally, 
and we can avoid recomputation if the
   // same column is used by more than one part of a query.
-  private final Map<String, BaseColumn> columnCache;
+  private final ColumnCache columnCache;
 
   // Share selectors too, for the same reason that we cache columns (they may 
cache things internally).
   private final Map<DimensionSpec, DimensionSelector> dimensionSelectorCache;
   private final Map<String, ColumnValueSelector> valueSelectorCache;
 
   public QueryableIndexColumnSelectorFactory(
-      QueryableIndex index,
       VirtualColumns virtualColumns,
       boolean descending,
-      Closer closer,
       ReadableOffset offset,
-      Map<String, BaseColumn> columnCache
+      ColumnCache columnCache
   )
   {
-    this.index = index;
     this.virtualColumns = virtualColumns;
     this.descending = descending;
-    this.closer = closer;
     this.offset = offset;
     this.columnCache = columnCache;
     this.dimensionSelectorCache = new HashMap<>();
@@ -79,7 +72,7 @@ public class QueryableIndexColumnSelectorFactory implements 
ColumnSelectorFactor
   {
     Function<DimensionSpec, DimensionSelector> mappingFunction = spec -> {
       if (virtualColumns.exists(spec.getDimension())) {
-        DimensionSelector dimensionSelector = 
virtualColumns.makeDimensionSelector(dimensionSpec, index, offset);
+        DimensionSelector dimensionSelector = 
virtualColumns.makeDimensionSelector(dimensionSpec, columnCache, offset);
         if (dimensionSelector == null) {
           return virtualColumns.makeDimensionSelector(dimensionSpec, this);
         } else {
@@ -107,7 +100,7 @@ public class QueryableIndexColumnSelectorFactory implements 
ColumnSelectorFactor
     final String dimension = dimensionSpec.getDimension();
     final ExtractionFn extractionFn = dimensionSpec.getExtractionFn();
 
-    final ColumnHolder columnHolder = index.getColumnHolder(dimension);
+    final ColumnHolder columnHolder = columnCache.getColumnHolder(dimension);
     if (columnHolder == null) {
       return DimensionSelector.constant(null, extractionFn);
     }
@@ -125,10 +118,10 @@ public class QueryableIndexColumnSelectorFactory 
implements ColumnSelectorFactor
       );
     }
 
-    final DictionaryEncodedColumn column = getCachedColumn(dimension, 
DictionaryEncodedColumn.class);
+    final BaseColumn column = columnCache.getColumn(dimension);
 
-    if (column != null) {
-      return column.makeDimensionSelector(offset, extractionFn);
+    if (column instanceof DictionaryEncodedColumn) {
+      return ((DictionaryEncodedColumn<?>) 
column).makeDimensionSelector(offset, extractionFn);
     } else {
       return DimensionSelector.constant(null, extractionFn);
     }
@@ -137,62 +130,33 @@ public class QueryableIndexColumnSelectorFactory 
implements ColumnSelectorFactor
   @Override
   public ColumnValueSelector<?> makeColumnValueSelector(String columnName)
   {
-    Function<String, ColumnValueSelector<?>> mappingFunction = name -> {
+    // We cannot use valueSelectorCache.computeIfAbsent() here since the 
function being
+    // applied may modify the valueSelectorCache itself through virtual column 
references,
+    // triggering a ConcurrentModificationException in JDK 9 and above.
+    ColumnValueSelector<?> columnValueSelector = 
valueSelectorCache.get(columnName);
+    if (columnValueSelector == null) {
       if (virtualColumns.exists(columnName)) {
-        ColumnValueSelector<?> selector = 
virtualColumns.makeColumnValueSelector(columnName, index, offset);
+        ColumnValueSelector<?> selector = 
virtualColumns.makeColumnValueSelector(columnName, columnCache, offset);
         if (selector == null) {
-          return virtualColumns.makeColumnValueSelector(columnName, this);
+          columnValueSelector = 
virtualColumns.makeColumnValueSelector(columnName, this);
         } else {
-          return selector;
+          columnValueSelector = selector;
         }
-      }
-
-      BaseColumn column = getCachedColumn(columnName, BaseColumn.class);
-
-      if (column != null) {
-        return column.makeColumnValueSelector(offset);
       } else {
-        return NilColumnValueSelector.instance();
-      }
-    };
+        BaseColumn column = columnCache.getColumn(columnName);
 
-    // We cannot use valueSelectorCache.computeIfAbsent() here since the 
function being
-    // applied may modify the valueSelectorCache itself through virtual column 
references,
-    // triggering a ConcurrentModificationException in JDK 9 and above.
-    ColumnValueSelector<?> columnValueSelector = 
valueSelectorCache.get(columnName);
-    if (columnValueSelector == null) {
-      columnValueSelector = mappingFunction.apply(columnName);
+        if (column != null) {
+          columnValueSelector = column.makeColumnValueSelector(offset);
+        } else {
+          columnValueSelector = NilColumnValueSelector.instance();
+        }
+      }
       valueSelectorCache.put(columnName, columnValueSelector);
     }
 
     return columnValueSelector;
   }
 
-  @Nullable
-  @SuppressWarnings("unchecked")
-  private <T extends BaseColumn> T getCachedColumn(final String columnName, 
final Class<T> clazz)
-  {
-    final BaseColumn cachedColumn = columnCache.computeIfAbsent(
-        columnName,
-        name -> {
-          ColumnHolder holder = index.getColumnHolder(name);
-          if (holder != null && 
clazz.isAssignableFrom(holder.getColumn().getClass())) {
-            return closer.register(holder.getColumn());
-          } else {
-            // Return null from the lambda in computeIfAbsent() results in no 
recorded value in the columnCache and
-            // the column variable is set to null.
-            return null;
-          }
-        }
-    );
-
-    if (cachedColumn != null && 
clazz.isAssignableFrom(cachedColumn.getClass())) {
-      return (T) cachedColumn;
-    } else {
-      return null;
-    }
-  }
-
   @Nullable
   @Override
   public RowIdSupplier getRowIdSupplier()
@@ -211,9 +175,9 @@ public class QueryableIndexColumnSelectorFactory implements 
ColumnSelectorFactor
   public ColumnCapabilities getColumnCapabilities(String columnName)
   {
     if (virtualColumns.exists(columnName)) {
-      return virtualColumns.getColumnCapabilities(index, columnName);
+      return virtualColumns.getColumnCapabilities(columnCache, columnName);
     }
 
-    return index.getColumnCapabilities(columnName);
+    return columnCache.getColumnCapabilities(columnName);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorSequenceBuilder.java
 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorSequenceBuilder.java
index cee4e1a5ce..d6143dfbe3 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorSequenceBuilder.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorSequenceBuilder.java
@@ -30,9 +30,10 @@ import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryMetrics;
 import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
-import org.apache.druid.segment.column.BaseColumn;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.NumericColumn;
 import org.apache.druid.segment.data.Offset;
@@ -50,51 +51,64 @@ import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 
 public class QueryableIndexCursorSequenceBuilder
 {
   private final QueryableIndex index;
   private final Interval interval;
   private final VirtualColumns virtualColumns;
-  @Nullable
-  private final ImmutableBitmap filterBitmap;
+  private final Filter filter;
+  private final QueryMetrics<? extends Query> metrics;
   private final long minDataTimestamp;
   private final long maxDataTimestamp;
   private final boolean descending;
-  @Nullable
-  private final Filter postFilter;
-  @Nullable
-  private final ColumnSelectorColumnIndexSelector bitmapIndexSelector;
 
   public QueryableIndexCursorSequenceBuilder(
       QueryableIndex index,
       Interval interval,
       VirtualColumns virtualColumns,
-      @Nullable ImmutableBitmap filterBitmap,
+      @Nullable Filter filter,
+      @Nullable QueryMetrics<? extends Query> metrics,
       long minDataTimestamp,
       long maxDataTimestamp,
-      boolean descending,
-      @Nullable Filter postFilter,
-      @Nullable ColumnSelectorColumnIndexSelector bitmapIndexSelector
+      boolean descending
   )
   {
     this.index = index;
     this.interval = interval;
     this.virtualColumns = virtualColumns;
-    this.filterBitmap = filterBitmap;
+    this.filter = filter;
+    this.metrics = metrics;
     this.minDataTimestamp = minDataTimestamp;
     this.maxDataTimestamp = maxDataTimestamp;
     this.descending = descending;
-    this.postFilter = postFilter;
-    this.bitmapIndexSelector = bitmapIndexSelector;
   }
 
   public Sequence<Cursor> build(final Granularity gran)
   {
+    final Closer closer = Closer.create();
+
+    // Column caches shared amongst all cursors in this sequence.
+    final ColumnCache columnCache = new ColumnCache(index, closer);
+
     final Offset baseOffset;
 
+    final ColumnSelectorColumnIndexSelector bitmapIndexSelector = new 
ColumnSelectorColumnIndexSelector(
+        index.getBitmapFactoryForDimensions(),
+        virtualColumns,
+        columnCache
+    );
+
+    final FilterAnalysis filterAnalysis = FilterAnalysis.analyzeFilter(
+        filter,
+        bitmapIndexSelector,
+        metrics,
+        index.getNumRows()
+    );
+
+    final ImmutableBitmap filterBitmap = filterAnalysis.getPreFilterBitmap();
+    final Filter postFilter = filterAnalysis.getPostFilter();
+
     if (filterBitmap == null) {
       baseOffset = descending
                    ? new SimpleDescendingOffset(index.getNumRows())
@@ -103,13 +117,7 @@ public class QueryableIndexCursorSequenceBuilder
       baseOffset = BitmapOffset.of(filterBitmap, descending, 
index.getNumRows());
     }
 
-    // Column caches shared amongst all cursors in this sequence.
-    final Map<String, BaseColumn> columnCache = new HashMap<>();
-
-    final NumericColumn timestamps = (NumericColumn) 
index.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME).getColumn();
-
-    final Closer closer = Closer.create();
-    closer.register(timestamps);
+    final NumericColumn timestamps = (NumericColumn) 
columnCache.getColumn(ColumnHolder.TIME_COLUMN_NAME);
 
     Iterable<Interval> iterable = gran.getIterable(interval);
     if (descending) {
@@ -161,10 +169,8 @@ public class QueryableIndexCursorSequenceBuilder
 
                 final Offset baseCursorOffset = offset.clone();
                 final ColumnSelectorFactory columnSelectorFactory = new 
QueryableIndexColumnSelectorFactory(
-                    index,
                     virtualColumns,
                     descending,
-                    closer,
                     baseCursorOffset.getBaseReadableOffset(),
                     columnCache
                 );
@@ -195,8 +201,25 @@ public class QueryableIndexCursorSequenceBuilder
     // Sanity check - matches QueryableIndexStorageAdapter.canVectorize
     Preconditions.checkState(!descending, "!descending");
 
-    final Map<String, BaseColumn> columnCache = new HashMap<>();
     final Closer closer = Closer.create();
+    final ColumnCache columnCache = new ColumnCache(index, closer);
+
+    final ColumnSelectorColumnIndexSelector bitmapIndexSelector = new 
ColumnSelectorColumnIndexSelector(
+        index.getBitmapFactoryForDimensions(),
+        virtualColumns,
+        columnCache
+    );
+
+    final FilterAnalysis filterAnalysis = FilterAnalysis.analyzeFilter(
+        filter,
+        bitmapIndexSelector,
+        metrics,
+        index.getNumRows()
+    );
+
+    final ImmutableBitmap filterBitmap = filterAnalysis.getPreFilterBitmap();
+    final Filter postFilter = filterAnalysis.getPostFilter();
+
 
     NumericColumn timestamps = null;
 
@@ -204,8 +227,7 @@ public class QueryableIndexCursorSequenceBuilder
     final int endOffset;
 
     if (interval.getStartMillis() > minDataTimestamp) {
-      timestamps = (NumericColumn) 
index.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME).getColumn();
-      closer.register(timestamps);
+      timestamps = (NumericColumn) 
columnCache.getColumn(ColumnHolder.TIME_COLUMN_NAME);
 
       startOffset = timeSearch(timestamps, interval.getStartMillis(), 0, 
index.getNumRows());
     } else {
@@ -214,8 +236,7 @@ public class QueryableIndexCursorSequenceBuilder
 
     if (interval.getEndMillis() <= maxDataTimestamp) {
       if (timestamps == null) {
-        timestamps = (NumericColumn) 
index.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME).getColumn();
-        closer.register(timestamps);
+        timestamps = (NumericColumn) 
columnCache.getColumn(ColumnHolder.TIME_COLUMN_NAME);
       }
 
       endOffset = timeSearch(timestamps, interval.getEndMillis(), startOffset, 
index.getNumRows());
@@ -231,8 +252,7 @@ public class QueryableIndexCursorSequenceBuilder
     // baseColumnSelectorFactory using baseOffset is the column selector for 
filtering.
     final VectorColumnSelectorFactory baseColumnSelectorFactory = 
makeVectorColumnSelectorFactoryForOffset(
         columnCache,
-        baseOffset,
-        closer
+        baseOffset
     );
     if (postFilter == null) {
       return new QueryableIndexVectorCursor(baseColumnSelectorFactory, 
baseOffset, vectorSize, closer);
@@ -244,33 +264,22 @@ public class QueryableIndexCursorSequenceBuilder
       );
 
       // Now create the cursor and column selector that will be returned to 
the caller.
-      //
-      // There is an inefficiency with how we do things here: this cursor (the 
one that will be provided to the
-      // caller) does share a columnCache with "baseColumnSelectorFactory", 
but it *doesn't* share vector data. This
-      // means that if the caller wants to read from a column that is also 
used for filtering, the underlying column
-      // object will get hit twice for some of the values (anything that 
matched the filter). This is probably most
-      // noticeable if it causes thrashing of decompression buffers due to 
out-of-order reads. I haven't observed
-      // this directly but it seems possible in principle.
-      // baseColumnSelectorFactory using baseOffset is the column selector for 
filtering.
       final VectorColumnSelectorFactory filteredColumnSelectorFactory = 
makeVectorColumnSelectorFactoryForOffset(
           columnCache,
-          filteredOffset,
-          closer
+          filteredOffset
       );
       return new QueryableIndexVectorCursor(filteredColumnSelectorFactory, 
filteredOffset, vectorSize, closer);
     }
   }
 
   VectorColumnSelectorFactory makeVectorColumnSelectorFactoryForOffset(
-      Map<String, BaseColumn> columnCache,
-      VectorOffset baseOffset,
-      Closer closer
+      ColumnCache columnCache,
+      VectorOffset baseOffset
   )
   {
     return new QueryableIndexVectorColumnSelectorFactory(
         index,
         baseOffset,
-        closer,
         columnCache,
         virtualColumns
     );
@@ -280,11 +289,10 @@ public class QueryableIndexCursorSequenceBuilder
    * Search the time column using binary search. Benchmarks on various other 
approaches (linear search, binary
    * search that switches to linear at various closeness thresholds) indicated 
that a pure binary search worked best.
    *
-   * @param timeColumn          the column
-   * @param timestamp           the timestamp to search for
-   * @param startIndex          first index to search, inclusive
-   * @param endIndex            last index to search, exclusive
-   *
+   * @param timeColumn the column
+   * @param timestamp  the timestamp to search for
+   * @param startIndex first index to search, inclusive
+   * @param endIndex   last index to search, exclusive
    * @return first index that has a timestamp equal to, or greater, than 
"timestamp"
    */
   @VisibleForTesting
diff --git 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java
 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java
index 5e1897d1e8..4b6eb38f03 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java
@@ -43,11 +43,9 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 /**
@@ -182,8 +180,8 @@ public class QueryableIndexIndexableAdapter implements 
IndexableAdapter
    */
   class RowIteratorImpl implements TransformableRowIterator
   {
-    private final Closer closer = Closer.create();
-    private final Map<String, BaseColumn> columnCache = new HashMap<>();
+    private final Closer closer;
+    private final ColumnCache columnCache;
 
     private final SimpleAscendingOffset offset = new 
SimpleAscendingOffset(numRows);
     private final int maxValidOffset = numRows - 1;
@@ -206,11 +204,12 @@ public class QueryableIndexIndexableAdapter implements 
IndexableAdapter
 
     RowIteratorImpl()
     {
+      this.closer = Closer.create();
+      this.columnCache = new ColumnCache(input, closer);
+
       final ColumnSelectorFactory columnSelectorFactory = new 
QueryableIndexColumnSelectorFactory(
-          input,
           VirtualColumns.EMPTY,
           false,
-          closer,
           offset,
           columnCache
       );
diff --git 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java
 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java
index d4898e71af..31480c62b1 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexStorageAdapter.java
@@ -21,19 +21,15 @@ package org.apache.druid.segment;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
-import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
-import org.apache.druid.query.BitmapResultFactory;
-import org.apache.druid.query.DefaultBitmapResultFactory;
 import org.apache.druid.query.QueryMetrics;
 import org.apache.druid.query.filter.Filter;
 import org.apache.druid.segment.column.BaseColumn;
-import org.apache.druid.segment.column.BitmapColumnIndex;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnIndexSupplier;
@@ -41,8 +37,6 @@ import 
org.apache.druid.segment.column.DictionaryEncodedColumn;
 import org.apache.druid.segment.column.DictionaryEncodedStringValueIndex;
 import org.apache.druid.segment.column.NumericColumn;
 import org.apache.druid.segment.data.Indexed;
-import org.apache.druid.segment.filter.AndFilter;
-import org.apache.druid.segment.filter.Filters;
 import org.apache.druid.segment.vector.VectorCursor;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -50,10 +44,7 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Objects;
 
 /**
@@ -227,21 +218,15 @@ public class QueryableIndexStorageAdapter implements 
StorageAdapter
     if (actualInterval == null) {
       return null;
     }
-
-    final ColumnSelectorColumnIndexSelector bitmapIndexSelector = 
makeBitmapIndexSelector(virtualColumns);
-
-    final FilterAnalysis filterAnalysis = analyzeFilter(filter, 
bitmapIndexSelector, queryMetrics);
-
     return new QueryableIndexCursorSequenceBuilder(
         index,
         actualInterval,
         virtualColumns,
-        filterAnalysis.getPreFilterBitmap(),
+        filter,
+        queryMetrics,
         getMinTime().getMillis(),
         getMaxTime().getMillis(),
-        descending,
-        filterAnalysis.getPostFilter(),
-        bitmapIndexSelector
+        descending
     ).buildVectorized(vectorSize > 0 ? vectorSize : DEFAULT_VECTOR_SIZE);
   }
 
@@ -265,21 +250,16 @@ public class QueryableIndexStorageAdapter implements 
StorageAdapter
       return Sequences.empty();
     }
 
-    final ColumnSelectorColumnIndexSelector bitmapIndexSelector = 
makeBitmapIndexSelector(virtualColumns);
-
-    final FilterAnalysis filterAnalysis = analyzeFilter(filter, 
bitmapIndexSelector, queryMetrics);
-
     return Sequences.filter(
         new QueryableIndexCursorSequenceBuilder(
             index,
             actualInterval,
             virtualColumns,
-            filterAnalysis.getPreFilterBitmap(),
+            filter,
+            queryMetrics,
             getMinTime().getMillis(),
             getMaxTime().getMillis(),
-            descending,
-            filterAnalysis.getPostFilter(),
-            bitmapIndexSelector
+            descending
         ).build(gran),
         Objects::nonNull
     );
@@ -295,7 +275,7 @@ public class QueryableIndexStorageAdapter implements 
StorageAdapter
   {
     // Compute and cache minTime, maxTime.
     final ColumnHolder columnHolder = 
index.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME);
-    try (final NumericColumn column = (NumericColumn) 
columnHolder.getColumn()) {
+    try (NumericColumn column = (NumericColumn) columnHolder.getColumn()) {
       this.minTime = DateTimes.utc(column.getLongSingleValueRow(0));
       this.maxTime = 
DateTimes.utc(column.getLongSingleValueRow(column.length() - 1));
     }
@@ -316,128 +296,14 @@ public class QueryableIndexStorageAdapter implements 
StorageAdapter
   }
 
   @VisibleForTesting
-  public ColumnSelectorColumnIndexSelector makeBitmapIndexSelector(final 
VirtualColumns virtualColumns)
+  public ColumnSelectorColumnIndexSelector makeBitmapIndexSelector(
+      final VirtualColumns virtualColumns
+  )
   {
     return new ColumnSelectorColumnIndexSelector(
         index.getBitmapFactoryForDimensions(),
         virtualColumns,
-        index
+        new DeprecatedQueryableIndexColumnSelector(index)
     );
   }
-
-  @VisibleForTesting
-  public FilterAnalysis analyzeFilter(
-      @Nullable final Filter filter,
-      ColumnSelectorColumnIndexSelector indexSelector,
-      @Nullable QueryMetrics queryMetrics
-  )
-  {
-    final int totalRows = index.getNumRows();
-
-    /*
-     * Filters can be applied in two stages:
-     * pre-filtering: Use bitmap indexes to prune the set of rows to be 
scanned.
-     * post-filtering: Iterate through rows and apply the filter to the row 
values
-     *
-     * The pre-filter and post-filter step have an implicit AND relationship. 
(i.e., final rows are those that
-     * were not pruned AND those that matched the filter during row scanning)
-     *
-     * An AND filter can have its subfilters partitioned across the two steps. 
The subfilters that can be
-     * processed entirely with bitmap indexes (subfilter returns non-null 
value for getBitmapColumnIndex)
-     * will be moved to the pre-filtering stage.
-     *
-     * Any subfilters that cannot be processed entirely with bitmap indexes 
will be moved to the post-filtering stage.
-     */
-    final List<Filter> preFilters;
-    final List<Filter> postFilters = new ArrayList<>();
-    int preFilteredRows = totalRows;
-    if (filter == null) {
-      preFilters = Collections.emptyList();
-    } else {
-      preFilters = new ArrayList<>();
-
-      if (filter instanceof AndFilter) {
-        // If we get an AndFilter, we can split the subfilters across both 
filtering stages
-        for (Filter subfilter : ((AndFilter) filter).getFilters()) {
-
-          final BitmapColumnIndex columnIndex = 
subfilter.getBitmapColumnIndex(indexSelector);
-
-          if (columnIndex == null) {
-            postFilters.add(subfilter);
-          } else {
-            preFilters.add(subfilter);
-            if (!columnIndex.getIndexCapabilities().isExact()) {
-              postFilters.add(subfilter);
-            }
-          }
-        }
-      } else {
-        // If we get an OrFilter or a single filter, handle the filter in one 
stage
-        final BitmapColumnIndex columnIndex = 
filter.getBitmapColumnIndex(indexSelector);
-        if (columnIndex == null) {
-          postFilters.add(filter);
-        } else {
-          preFilters.add(filter);
-          if (!columnIndex.getIndexCapabilities().isExact()) {
-            postFilters.add(filter);
-          }
-        }
-      }
-    }
-
-    final ImmutableBitmap preFilterBitmap;
-    if (preFilters.isEmpty()) {
-      preFilterBitmap = null;
-    } else {
-      if (queryMetrics != null) {
-        BitmapResultFactory<?> bitmapResultFactory =
-            
queryMetrics.makeBitmapResultFactory(indexSelector.getBitmapFactory());
-        long bitmapConstructionStartNs = System.nanoTime();
-        // Use AndFilter.getBitmapIndex to intersect the preFilters to get its 
short-circuiting behavior.
-        preFilterBitmap = AndFilter.getBitmapIndex(indexSelector, 
bitmapResultFactory, preFilters);
-        preFilteredRows = preFilterBitmap.size();
-        queryMetrics.reportBitmapConstructionTime(System.nanoTime() - 
bitmapConstructionStartNs);
-      } else {
-        BitmapResultFactory<?> bitmapResultFactory = new 
DefaultBitmapResultFactory(indexSelector.getBitmapFactory());
-        preFilterBitmap = AndFilter.getBitmapIndex(indexSelector, 
bitmapResultFactory, preFilters);
-      }
-    }
-
-    if (queryMetrics != null) {
-      queryMetrics.preFilters(new ArrayList<>(preFilters));
-      queryMetrics.postFilters(postFilters);
-      queryMetrics.reportSegmentRows(totalRows);
-      queryMetrics.reportPreFilteredRows(preFilteredRows);
-    }
-
-    return new FilterAnalysis(preFilterBitmap, 
Filters.maybeAnd(postFilters).orElse(null));
-  }
-
-  @VisibleForTesting
-  public static class FilterAnalysis
-  {
-    private final Filter postFilter;
-    private final ImmutableBitmap preFilterBitmap;
-
-    public FilterAnalysis(
-        @Nullable final ImmutableBitmap preFilterBitmap,
-        @Nullable final Filter postFilter
-    )
-    {
-      this.preFilterBitmap = preFilterBitmap;
-      this.postFilter = postFilter;
-    }
-
-    @Nullable
-    public ImmutableBitmap getPreFilterBitmap()
-    {
-      return preFilterBitmap;
-    }
-
-    @Nullable
-    public Filter getPostFilter()
-    {
-      return postFilter;
-    }
-  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
 
b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
index f143f1d24a..1d2de8c432 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnCache;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.NilColumnValueSelector;
@@ -50,7 +51,6 @@ import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -240,12 +240,10 @@ public class BroadcastSegmentIndexedTable implements 
IndexedTable
   public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset 
offset, boolean descending, Closer closer)
   {
     return new QueryableIndexColumnSelectorFactory(
-        queryableIndex,
         VirtualColumns.EMPTY,
         descending,
-        closer,
         offset,
-        new HashMap<>()
+        new ColumnCache(queryableIndex, closer)
     );
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java
index e300674cc5..68d2f210ad 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java
@@ -20,8 +20,8 @@
 package org.apache.druid.segment.vector;
 
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.ColumnCache;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.BaseColumn;
@@ -40,8 +40,7 @@ public class QueryableIndexVectorColumnSelectorFactory 
implements VectorColumnSe
   private final VirtualColumns virtualColumns;
   private final QueryableIndex index;
   private final ReadableVectorOffset offset;
-  private final Closer closer;
-  private final Map<String, BaseColumn> columnCache;
+  private final ColumnCache columnCache;
 
   // Shared selectors are useful, since they cache vectors internally, and we 
can avoid recomputation if the same
   // selector is used by more than one part of a query.
@@ -53,14 +52,12 @@ public class QueryableIndexVectorColumnSelectorFactory 
implements VectorColumnSe
   public QueryableIndexVectorColumnSelectorFactory(
       final QueryableIndex index,
       final ReadableVectorOffset offset,
-      final Closer closer,
-      final Map<String, BaseColumn> columnCache,
+      final ColumnCache columnCache,
       final VirtualColumns virtualColumns
   )
   {
     this.index = index;
     this.offset = offset;
-    this.closer = closer;
     this.virtualColumns = virtualColumns;
     this.columnCache = columnCache;
     this.singleValueDimensionSelectorCache = new HashMap<>();
@@ -85,7 +82,7 @@ public class QueryableIndexVectorColumnSelectorFactory 
implements VectorColumnSe
       if (virtualColumns.exists(spec.getDimension())) {
         MultiValueDimensionVectorSelector dimensionSelector = 
virtualColumns.makeMultiValueDimensionVectorSelector(
             dimensionSpec,
-            index,
+            columnCache,
             offset
         );
         if (dimensionSelector == null) {
@@ -108,7 +105,7 @@ public class QueryableIndexVectorColumnSelectorFactory 
implements VectorColumnSe
 
       @SuppressWarnings("unchecked")
       final DictionaryEncodedColumn<String> dictionaryEncodedColumn = 
(DictionaryEncodedColumn<String>)
-          getCachedColumn(spec.getDimension());
+          columnCache.getColumn(spec.getDimension());
 
       // dictionaryEncodedColumn is not null because of holder null check above
       assert dictionaryEncodedColumn != null;
@@ -141,7 +138,7 @@ public class QueryableIndexVectorColumnSelectorFactory 
implements VectorColumnSe
       if (virtualColumns.exists(spec.getDimension())) {
         SingleValueDimensionVectorSelector dimensionSelector = 
virtualColumns.makeSingleValueDimensionVectorSelector(
             dimensionSpec,
-            index,
+            columnCache,
             offset
         );
         if (dimensionSelector == null) {
@@ -166,7 +163,7 @@ public class QueryableIndexVectorColumnSelectorFactory 
implements VectorColumnSe
 
       @SuppressWarnings("unchecked")
       final DictionaryEncodedColumn<String> dictionaryEncodedColumn = 
(DictionaryEncodedColumn<String>)
-          getCachedColumn(spec.getDimension());
+          columnCache.getColumn(spec.getDimension());
 
       // dictionaryEncodedColumn is not null because of holder null check above
       assert dictionaryEncodedColumn != null;
@@ -192,14 +189,14 @@ public class QueryableIndexVectorColumnSelectorFactory 
implements VectorColumnSe
   {
     Function<String, VectorValueSelector> mappingFunction = name -> {
       if (virtualColumns.exists(columnName)) {
-        VectorValueSelector selector = 
virtualColumns.makeVectorValueSelector(columnName, index, offset);
+        VectorValueSelector selector = 
virtualColumns.makeVectorValueSelector(columnName, columnCache, offset);
         if (selector == null) {
           return virtualColumns.makeVectorValueSelector(columnName, this);
         } else {
           return selector;
         }
       }
-      final BaseColumn column = getCachedColumn(name);
+      final BaseColumn column = columnCache.getColumn(name);
       if (column == null) {
         return NilVectorSelector.create(offset);
       } else {
@@ -222,14 +219,14 @@ public class QueryableIndexVectorColumnSelectorFactory 
implements VectorColumnSe
   {
     Function<String, VectorObjectSelector> mappingFunction = name -> {
       if (virtualColumns.exists(columnName)) {
-        VectorObjectSelector selector = 
virtualColumns.makeVectorObjectSelector(columnName, index, offset);
+        VectorObjectSelector selector = 
virtualColumns.makeVectorObjectSelector(columnName, columnCache, offset);
         if (selector == null) {
           return virtualColumns.makeVectorObjectSelector(columnName, this);
         } else {
           return selector;
         }
       }
-      final BaseColumn column = getCachedColumn(name);
+      final BaseColumn column = columnCache.getColumn(name);
       if (column == null) {
         return NilVectorSelector.create(offset);
       } else {
@@ -247,26 +244,13 @@ public class QueryableIndexVectorColumnSelectorFactory 
implements VectorColumnSe
     return columnValueSelector;
   }
 
-  @Nullable
-  private BaseColumn getCachedColumn(final String columnName)
-  {
-    return columnCache.computeIfAbsent(columnName, name -> {
-      ColumnHolder holder = index.getColumnHolder(name);
-      if (holder != null) {
-        return closer.register(holder.getColumn());
-      } else {
-        return null;
-      }
-    });
-  }
-
   @Nullable
   @Override
   public ColumnCapabilities getColumnCapabilities(final String columnName)
   {
     if (virtualColumns.exists(columnName)) {
-      return virtualColumns.getColumnCapabilities(index, columnName);
+      return virtualColumns.getColumnCapabilities(columnCache, columnName);
     }
-    return index.getColumnCapabilities(columnName);
+    return columnCache.getColumnCapabilities(columnName);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/ColumnSelector.java 
b/processing/src/test/java/org/apache/druid/StupidPoolPoisonedTest.java
similarity index 57%
copy from processing/src/main/java/org/apache/druid/segment/ColumnSelector.java
copy to processing/src/test/java/org/apache/druid/StupidPoolPoisonedTest.java
index 38bfc0da60..bb343292c1 100644
--- a/processing/src/main/java/org/apache/druid/segment/ColumnSelector.java
+++ b/processing/src/test/java/org/apache/druid/StupidPoolPoisonedTest.java
@@ -17,31 +17,17 @@
  * under the License.
  */
 
-package org.apache.druid.segment;
+package org.apache.druid;
 
-import org.apache.druid.segment.column.ColumnCapabilities;
-import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.collections.StupidPool;
+import org.junit.Assert;
+import org.junit.Test;
 
-import javax.annotation.Nullable;
-import java.util.List;
-
-/**
- */
-public interface ColumnSelector extends ColumnInspector
+public class StupidPoolPoisonedTest
 {
-  List<String> getColumnNames();
-
-  @Nullable
-  ColumnHolder getColumnHolder(String columnName);
-
-  @Nullable
-  @Override
-  default ColumnCapabilities getColumnCapabilities(String column)
+  @Test
+  public void testStupidPoolPoisoned()
   {
-    final ColumnHolder columnHolder = getColumnHolder(column);
-    if (columnHolder == null) {
-      return null;
-    }
-    return columnHolder.getCapabilities();
+    Assert.assertTrue("StupidPool should've been poisoned, it wasn't", 
StupidPool.isPoisoned());
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/MergingRowIteratorTest.java 
b/processing/src/test/java/org/apache/druid/segment/MergingRowIteratorTest.java
index 0c8f51b146..9117fb05be 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/MergingRowIteratorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/MergingRowIteratorTest.java
@@ -114,40 +114,41 @@ public class MergingRowIteratorTest extends 
InitializedNullHandlingTest
   @SafeVarargs
   private static void testMerge(String message, int markIteration, 
List<Long>... timestampSequences)
   {
-    MergingRowIterator mergingRowIterator = new MergingRowIterator(
+    try (MergingRowIterator mergingRowIterator = new MergingRowIterator(
         
Stream.of(timestampSequences).map(TestRowIterator::new).collect(Collectors.toList())
-    );
-    Iterator<Long> mergedTimestamps = Utils.mergeSorted(
-        
Stream.of(timestampSequences).map(List::iterator).collect(Collectors.toList()),
-        Comparator.naturalOrder()
-    );
-    long markedTimestamp = 0;
-    long currentTimestamp = 0;
-    int i = 0;
-    boolean marked = false;
-    boolean iterated = false;
-    while (mergedTimestamps.hasNext()) {
-      currentTimestamp = mergedTimestamps.next();
-      Assert.assertTrue(message, mergingRowIterator.moveToNext());
-      iterated = true;
-      Assert.assertEquals(message, currentTimestamp, 
mergingRowIterator.getPointer().timestampSelector.getLong());
-      if (marked) {
-        Assert.assertEquals(
-            message,
-            markedTimestamp != currentTimestamp,
-            mergingRowIterator.hasTimeAndDimsChangedSinceMark()
-        );
+    )) {
+      Iterator<Long> mergedTimestamps = Utils.mergeSorted(
+          
Stream.of(timestampSequences).map(List::iterator).collect(Collectors.toList()),
+          Comparator.naturalOrder()
+      );
+      long markedTimestamp = 0;
+      long currentTimestamp = 0;
+      int i = 0;
+      boolean marked = false;
+      boolean iterated = false;
+      while (mergedTimestamps.hasNext()) {
+        currentTimestamp = mergedTimestamps.next();
+        Assert.assertTrue(message, mergingRowIterator.moveToNext());
+        iterated = true;
+        Assert.assertEquals(message, currentTimestamp, 
mergingRowIterator.getPointer().timestampSelector.getLong());
+        if (marked) {
+          Assert.assertEquals(
+              message,
+              markedTimestamp != currentTimestamp,
+              mergingRowIterator.hasTimeAndDimsChangedSinceMark()
+          );
+        }
+        if (i == markIteration) {
+          mergingRowIterator.mark();
+          markedTimestamp = currentTimestamp;
+          marked = true;
+        }
+        i++;
       }
-      if (i == markIteration) {
-        mergingRowIterator.mark();
-        markedTimestamp = currentTimestamp;
-        marked = true;
+      Assert.assertFalse(message, mergingRowIterator.moveToNext());
+      if (iterated) {
+        Assert.assertEquals(message, currentTimestamp, 
mergingRowIterator.getPointer().timestampSelector.getLong());
       }
-      i++;
-    }
-    Assert.assertFalse(message, mergingRowIterator.moveToNext());
-    if (iterated) {
-      Assert.assertEquals(message, currentTimestamp, 
mergingRowIterator.getPointer().timestampSelector.getLong());
     }
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
index 338897be75..2f94a772b4 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
@@ -179,19 +179,17 @@ public class CompressedDoublesSerdeTest
     Assert.assertEquals(baos.size(), serializer.getSerializedSize());
     Supplier<ColumnarDoubles> supplier = CompressedColumnarDoublesSuppliers
         .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order);
-    ColumnarDoubles doubles = supplier.get();
-
-    assertIndexMatchesVals(doubles, values);
-    for (int i = 0; i < 10; i++) {
-      int a = (int) (ThreadLocalRandom.current().nextDouble() * values.length);
-      int b = (int) (ThreadLocalRandom.current().nextDouble() * values.length);
-      int start = a < b ? a : b;
-      int end = a < b ? b : a;
-      tryFill(doubles, values, start, end - start);
+    try (ColumnarDoubles doubles = supplier.get()) {
+      assertIndexMatchesVals(doubles, values);
+      for (int i = 0; i < 10; i++) {
+        int a = (int) (ThreadLocalRandom.current().nextDouble() * 
values.length);
+        int b = (int) (ThreadLocalRandom.current().nextDouble() * 
values.length);
+        int start = a < b ? a : b;
+        int end = a < b ? b : a;
+        tryFill(doubles, values, start, end - start);
+      }
+      testConcurrentThreadReads(supplier, doubles, values);
     }
-    testConcurrentThreadReads(supplier, doubles, values);
-
-    doubles.close();
   }
 
   private void tryFill(ColumnarDoubles indexed, double[] vals, final int 
startIndex, final int size)
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
index c831170f48..14b935be2f 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
@@ -186,20 +186,19 @@ public class CompressedFloatsSerdeTest
     Assert.assertEquals(baos.size(), serializer.getSerializedSize());
     CompressedColumnarFloatsSupplier supplier = 
CompressedColumnarFloatsSupplier
         .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order);
-    ColumnarFloats floats = supplier.get();
-
-    assertIndexMatchesVals(floats, values);
-    for (int i = 0; i < 10; i++) {
-      int a = (int) (ThreadLocalRandom.current().nextDouble() * values.length);
-      int b = (int) (ThreadLocalRandom.current().nextDouble() * values.length);
-      int start = a < b ? a : b;
-      int end = a < b ? b : a;
-      tryFill(floats, values, start, end - start);
+    try (ColumnarFloats floats = supplier.get()) {
+
+      assertIndexMatchesVals(floats, values);
+      for (int i = 0; i < 10; i++) {
+        int a = (int) (ThreadLocalRandom.current().nextDouble() * 
values.length);
+        int b = (int) (ThreadLocalRandom.current().nextDouble() * 
values.length);
+        int start = a < b ? a : b;
+        int end = a < b ? b : a;
+        tryFill(floats, values, start, end - start);
+      }
+      testSupplierSerde(supplier, values);
+      testConcurrentThreadReads(supplier, floats, values);
     }
-    testSupplierSerde(supplier, values);
-    testConcurrentThreadReads(supplier, floats, values);
-
-    floats.close();
   }
 
   private void tryFill(ColumnarFloats indexed, float[] vals, final int 
startIndex, final int size)
@@ -242,8 +241,9 @@ public class CompressedFloatsSerdeTest
     CompressedColumnarFloatsSupplier anotherSupplier = 
CompressedColumnarFloatsSupplier.fromByteBuffer(
         ByteBuffer.wrap(bytes), order
     );
-    ColumnarFloats indexed = anotherSupplier.get();
-    assertIndexMatchesVals(indexed, vals);
+    try (ColumnarFloats indexed = anotherSupplier.get()) {
+      assertIndexMatchesVals(indexed, vals);
+    }
   }
 
   // This test attempts to cause a race condition with the DirectByteBuffers, 
it's non-deterministic in causing it,
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
index 1186d32a36..87daaddbe4 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
@@ -77,17 +77,18 @@ public class CompressedLongsAutoEncodingSerdeTest
     int numBits = (Long.SIZE - Long.numberOfLeadingZeros(1 << (bitsPerValue - 
1)));
     double numValuesPerByte = 8.0 / (double) numBits;
 
-    int numRows = (int) (blockSize * numValuesPerByte) * 2 + 
ThreadLocalRandom.current().nextInt(1, 101);
+    final ThreadLocalRandom currRand = ThreadLocalRandom.current();
+    int numRows = (int) (blockSize * numValuesPerByte) * 2 + 
currRand.nextInt(1, 101);
     long[] chunk = new long[numRows];
     for (int i = 0; i < numRows; i++) {
-      chunk[i] = ThreadLocalRandom.current().nextLong(bound);
+      chunk[i] = currRand.nextLong(bound);
     }
     testValues(chunk);
 
     numRows++;
     chunk = new long[numRows];
     for (int i = 0; i < numRows; i++) {
-      chunk[i] = ThreadLocalRandom.current().nextLong(bound);
+      chunk[i] = currRand.nextLong(bound);
     }
     testValues(chunk);
   }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
index 6d6fe4549f..b643ee43d8 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
@@ -193,20 +193,19 @@ public class CompressedLongsSerdeTest
     Assert.assertEquals(baos.size(), serializer.getSerializedSize());
     CompressedColumnarLongsSupplier supplier = CompressedColumnarLongsSupplier
         .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order);
-    ColumnarLongs longs = supplier.get();
-
-    assertIndexMatchesVals(longs, values);
-    for (int i = 0; i < 10; i++) {
-      int a = (int) (ThreadLocalRandom.current().nextDouble() * values.length);
-      int b = (int) (ThreadLocalRandom.current().nextDouble() * values.length);
-      int start = a < b ? a : b;
-      int end = a < b ? b : a;
-      tryFill(longs, values, start, end - start);
+    try (ColumnarLongs longs = supplier.get()) {
+
+      assertIndexMatchesVals(longs, values);
+      for (int i = 0; i < 10; i++) {
+        int a = (int) (ThreadLocalRandom.current().nextDouble() * 
values.length);
+        int b = (int) (ThreadLocalRandom.current().nextDouble() * 
values.length);
+        int start = a < b ? a : b;
+        int end = a < b ? b : a;
+        tryFill(longs, values, start, end - start);
+      }
+      testSupplierSerde(supplier, values);
+      testConcurrentThreadReads(supplier, longs, values);
     }
-    testSupplierSerde(supplier, values);
-    testConcurrentThreadReads(supplier, longs, values);
-
-    longs.close();
   }
 
   private void tryFill(ColumnarLongs indexed, long[] vals, final int 
startIndex, final int size)
@@ -256,8 +255,9 @@ public class CompressedLongsSerdeTest
         ByteBuffer.wrap(bytes),
         order
     );
-    ColumnarLongs indexed = anotherSupplier.get();
-    assertIndexMatchesVals(indexed, vals);
+    try (ColumnarLongs indexed = anotherSupplier.get()) {
+      assertIndexMatchesVals(indexed, vals);
+    }
   }
 
   // This test attempts to cause a race condition with the DirectByteBuffers, 
it's non-deterministic in causing it,
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTest.java
index 7fcb9c3262..60de4d78ed 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTest.java
@@ -21,30 +21,23 @@ package org.apache.druid.segment.data;
 
 import com.google.common.collect.Iterables;
 import org.apache.druid.java.util.common.io.Closer;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.channels.Channels;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 
 /**
+ *
  */
-public class CompressedVSizeColumnarMultiIntsSupplierTest
+public class CompressedVSizeColumnarMultiIntsSupplierTest extends 
CompressedVSizeColumnarMultiIntsSupplierTestBase
 {
   private Closer closer;
-  protected List<int[]> vals;
-
-  protected WritableSupplier<ColumnarMultiInts> columnarMultiIntsSupplier;
+  private List<int[]> vals;
+  private WritableSupplier<ColumnarMultiInts> columnarMultiIntsSupplier;
 
-  @Before
+  @Override
   public void setUpSimple()
   {
     closer = Closer.create();
@@ -55,84 +48,48 @@ public class CompressedVSizeColumnarMultiIntsSupplierTest
         new int[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
     );
 
-    columnarMultiIntsSupplier = 
CompressedVSizeColumnarMultiIntsSupplier.fromIterable(
-        Iterables.transform(vals, input -> VSizeColumnarInts.fromArray(input, 
20)),
-        20,
-        ByteOrder.nativeOrder(),
-        CompressionStrategy.LZ4,
+    columnarMultiIntsSupplier = wrapSupplier(
+        CompressedVSizeColumnarMultiIntsSupplier.fromIterable(
+            Iterables.transform(vals, input -> 
VSizeColumnarInts.fromArray(input, 20)),
+            20,
+            ByteOrder.nativeOrder(),
+            CompressionStrategy.LZ4,
+            closer
+        ),
         closer
     );
   }
 
-  @After
+  @Override
   public void teardown() throws IOException
   {
+    closer.close();
+    closer = null;
     columnarMultiIntsSupplier = null;
     vals = null;
-    closer.close();
-  }
-
-  @Test
-  public void testSanity()
-  {
-    assertSame(vals, columnarMultiIntsSupplier.get());
-  }
-
-  @Test
-  public void testSerde() throws IOException
-  {
-    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    columnarMultiIntsSupplier.writeTo(Channels.newChannel(baos), null);
-
-    final byte[] bytes = baos.toByteArray();
-    Assert.assertEquals(columnarMultiIntsSupplier.getSerializedSize(), 
bytes.length);
-    WritableSupplier<ColumnarMultiInts> deserializedColumnarMultiInts = 
fromByteBuffer(ByteBuffer.wrap(bytes));
-
-    assertSame(vals, deserializedColumnarMultiInts.get());
   }
 
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetInvalidElementInRow()
+  @Override
+  public List<int[]> getValsUsed()
   {
-    columnarMultiIntsSupplier.get().get(3).get(15);
+    return vals;
   }
 
-  @Test
-  public void testIterators()
+  @Override
+  public WritableSupplier<ColumnarMultiInts> getColumnarMultiIntsSupplier()
   {
-    Iterator<IndexedInts> iterator = 
columnarMultiIntsSupplier.get().iterator();
-    int row = 0;
-    while (iterator.hasNext()) {
-      final int[] ints = vals.get(row);
-      final IndexedInts vSizeIndexedInts = iterator.next();
-
-      Assert.assertEquals(ints.length, vSizeIndexedInts.size());
-      for (int i = 0, size = vSizeIndexedInts.size(); i < size; i++) {
-        Assert.assertEquals(ints[i], vSizeIndexedInts.get(i));
-      }
-      row++;
-    }
+    return columnarMultiIntsSupplier;
   }
 
-  private void assertSame(List<int[]> someInts, ColumnarMultiInts 
columnarMultiInts)
+  @Override
+  public WritableSupplier<ColumnarMultiInts> fromByteBuffer(ByteBuffer buffer)
   {
-    Assert.assertEquals(someInts.size(), columnarMultiInts.size());
-    for (int i = 0; i < columnarMultiInts.size(); ++i) {
-      final int[] ints = someInts.get(i);
-      final IndexedInts vSizeIndexedInts = columnarMultiInts.get(i);
-
-      Assert.assertEquals(ints.length, vSizeIndexedInts.size());
-      for (int j = 0; j < ints.length; j++) {
-        Assert.assertEquals(ints[j], vSizeIndexedInts.get(j));
-      }
-    }
-  }
-
-  protected WritableSupplier<ColumnarMultiInts> fromByteBuffer(ByteBuffer 
buffer)
-  {
-    return CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(
-        buffer,
-        ByteOrder.nativeOrder()
+    return wrapSupplier(
+        CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(
+            buffer,
+            ByteOrder.nativeOrder()
+        ),
+        closer
     );
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTestBase.java
similarity index 61%
copy from 
processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTest.java
copy to 
processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTestBase.java
index 7fcb9c3262..9944c72165 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplierTestBase.java
@@ -19,87 +19,70 @@
 
 package org.apache.druid.segment.data;
 
-import com.google.common.collect.Iterables;
 import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.nio.channels.Channels;
-import java.util.Arrays;
+import java.nio.channels.WritableByteChannel;
 import java.util.Iterator;
 import java.util.List;
 
-/**
- */
-public class CompressedVSizeColumnarMultiIntsSupplierTest
+public abstract class CompressedVSizeColumnarMultiIntsSupplierTestBase
 {
-  private Closer closer;
-  protected List<int[]> vals;
-
-  protected WritableSupplier<ColumnarMultiInts> columnarMultiIntsSupplier;
 
   @Before
-  public void setUpSimple()
-  {
-    closer = Closer.create();
-    vals = Arrays.asList(
-        new int[1],
-        new int[]{1, 2, 3, 4, 5},
-        new int[]{6, 7, 8, 9, 10},
-        new int[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
-    );
-
-    columnarMultiIntsSupplier = 
CompressedVSizeColumnarMultiIntsSupplier.fromIterable(
-        Iterables.transform(vals, input -> VSizeColumnarInts.fromArray(input, 
20)),
-        20,
-        ByteOrder.nativeOrder(),
-        CompressionStrategy.LZ4,
-        closer
-    );
-  }
+  public abstract void setUpSimple();
 
   @After
-  public void teardown() throws IOException
-  {
-    columnarMultiIntsSupplier = null;
-    vals = null;
-    closer.close();
-  }
+  public abstract void teardown() throws IOException;
+
+  public abstract List<int[]> getValsUsed();
+
+  public abstract WritableSupplier<ColumnarMultiInts> 
getColumnarMultiIntsSupplier();
+
+  public abstract WritableSupplier<ColumnarMultiInts> 
fromByteBuffer(ByteBuffer buf);
 
   @Test
   public void testSanity()
   {
-    assertSame(vals, columnarMultiIntsSupplier.get());
+    assertSame(getValsUsed(), getColumnarMultiIntsSupplier().get());
   }
 
   @Test
   public void testSerde() throws IOException
   {
     final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final WritableSupplier<ColumnarMultiInts> columnarMultiIntsSupplier = 
getColumnarMultiIntsSupplier();
     columnarMultiIntsSupplier.writeTo(Channels.newChannel(baos), null);
 
     final byte[] bytes = baos.toByteArray();
     Assert.assertEquals(columnarMultiIntsSupplier.getSerializedSize(), 
bytes.length);
     WritableSupplier<ColumnarMultiInts> deserializedColumnarMultiInts = 
fromByteBuffer(ByteBuffer.wrap(bytes));
 
-    assertSame(vals, deserializedColumnarMultiInts.get());
+    assertSame(getValsUsed(), deserializedColumnarMultiInts.get());
   }
 
+
   @Test(expected = IllegalArgumentException.class)
   public void testGetInvalidElementInRow()
   {
-    columnarMultiIntsSupplier.get().get(3).get(15);
+    getColumnarMultiIntsSupplier().get().get(3).get(15);
   }
 
   @Test
   public void testIterators()
   {
+    final WritableSupplier<ColumnarMultiInts> columnarMultiIntsSupplier = 
getColumnarMultiIntsSupplier();
+    List<int[]> vals = getValsUsed();
+
     Iterator<IndexedInts> iterator = 
columnarMultiIntsSupplier.get().iterator();
     int row = 0;
     while (iterator.hasNext()) {
@@ -128,11 +111,35 @@ public class CompressedVSizeColumnarMultiIntsSupplierTest
     }
   }
 
-  protected WritableSupplier<ColumnarMultiInts> fromByteBuffer(ByteBuffer 
buffer)
+  public static <T extends Closeable> WritableSupplier<T> wrapSupplier(
+      WritableSupplier<T> supplier,
+      Closer closer
+  )
   {
-    return CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(
-        buffer,
-        ByteOrder.nativeOrder()
-    );
+    return new WritableSupplier<T>()
+    {
+      @Override
+      public T get()
+      {
+        // We must register the actual column with the closer as well because 
the resources taken by the
+        // column are not part of what the Supplier's closer manages
+        return closer.register(supplier.get());
+      }
+
+      @Override
+      public long getSerializedSize() throws IOException
+      {
+        return supplier.getSerializedSize();
+      }
+
+      @Override
+      public void writeTo(
+          WritableByteChannel channel,
+          FileSmoosher smoosher
+      ) throws IOException
+      {
+        supplier.writeTo(channel, smoosher);
+      }
+    };
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java
 
b/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java
index 5833bdd538..ec745ffa23 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java
@@ -107,9 +107,10 @@ public class TestColumnCompression
   }
 
   @After
-  public void tearDown() 
+  public void tearDown() throws IOException
   {
     ByteBufferUtils.free(buffer);
+    compressed.close();
   }
 
   private static ByteBuffer serialize(WritableSupplier<ColumnarMultiInts> 
writableSupplier) throws IOException
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplierTest.java
index 3c8e4fc0e7..ad529e10bd 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplierTest.java
@@ -22,21 +22,21 @@ package org.apache.druid.segment.data;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import org.apache.druid.java.util.common.io.Closer;
-import org.junit.After;
-import org.junit.Before;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Arrays;
+import java.util.List;
 
-public class V3CompressedVSizeColumnarMultiIntsSupplierTest extends 
CompressedVSizeColumnarMultiIntsSupplierTest
+public class V3CompressedVSizeColumnarMultiIntsSupplierTest extends 
CompressedVSizeColumnarMultiIntsSupplierTestBase
 {
 
   private Closer closer;
+  private List<int[]> vals;
+  private WritableSupplier<ColumnarMultiInts> columnarMultiIntsSupplier;
 
   @Override
-  @Before
   public void setUpSimple()
   {
     vals = Arrays.asList(
@@ -46,18 +46,20 @@ public class V3CompressedVSizeColumnarMultiIntsSupplierTest 
extends CompressedVS
         new int[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
     );
     closer = Closer.create();
-    columnarMultiIntsSupplier = 
V3CompressedVSizeColumnarMultiIntsSupplier.fromIterable(
-        Iterables.transform(vals, (Function<int[], ColumnarInts>) input -> 
VSizeColumnarInts.fromArray(input, 20)),
-        2,
-        20,
-        ByteOrder.nativeOrder(),
-        CompressionStrategy.LZ4,
+    columnarMultiIntsSupplier = wrapSupplier(
+        V3CompressedVSizeColumnarMultiIntsSupplier.fromIterable(
+            Iterables.transform(vals, (Function<int[], ColumnarInts>) input -> 
VSizeColumnarInts.fromArray(input, 20)),
+            2,
+            20,
+            ByteOrder.nativeOrder(),
+            CompressionStrategy.LZ4,
+            closer
+        ),
         closer
     );
   }
 
   @Override
-  @After
   public void teardown() throws IOException
   {
     columnarMultiIntsSupplier = null;
@@ -66,11 +68,26 @@ public class V3CompressedVSizeColumnarMultiIntsSupplierTest 
extends CompressedVS
   }
 
   @Override
-  protected WritableSupplier<ColumnarMultiInts> fromByteBuffer(ByteBuffer 
buffer)
+  public List<int[]> getValsUsed()
   {
-    return V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(
-        buffer,
-        ByteOrder.nativeOrder()
+    return vals;
+  }
+
+  @Override
+  public WritableSupplier<ColumnarMultiInts> getColumnarMultiIntsSupplier()
+  {
+    return columnarMultiIntsSupplier;
+  }
+
+  @Override
+  public WritableSupplier<ColumnarMultiInts> fromByteBuffer(ByteBuffer buffer)
+  {
+    return wrapSupplier(
+        V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(
+            buffer,
+            ByteOrder.nativeOrder()
+        ),
+        closer
     );
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/FilterPartitionTest.java
 
b/processing/src/test/java/org/apache/druid/segment/filter/FilterPartitionTest.java
index 7478d2a481..0c6733e648 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/FilterPartitionTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/FilterPartitionTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.query.filter.FilterTuning;
 import org.apache.druid.query.filter.OrDimFilter;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.segment.ColumnSelectorColumnIndexSelector;
+import org.apache.druid.segment.FilterAnalysis;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.QueryableIndexStorageAdapter;
 import org.apache.druid.segment.StorageAdapter;
@@ -727,20 +728,22 @@ public class FilterPartitionTest extends BaseFilterTest
       return;
     }
     QueryableIndexStorageAdapter storageAdapter = 
(QueryableIndexStorageAdapter) adapter;
+    final int numRows = adapter.getNumRows();
+
     final ColumnSelectorColumnIndexSelector bitmapIndexSelector = 
storageAdapter.makeBitmapIndexSelector(BaseFilterTest.VIRTUAL_COLUMNS);
 
     // has bitmap index, will use it by default
     Filter normalFilter = new SelectorFilter("dim1", "HELLO");
-    QueryableIndexStorageAdapter.FilterAnalysis filterAnalysisNormal =
-        storageAdapter.analyzeFilter(normalFilter, bitmapIndexSelector, null);
+    FilterAnalysis filterAnalysisNormal =
+        FilterAnalysis.analyzeFilter(normalFilter, bitmapIndexSelector, null, 
numRows);
     Assert.assertTrue(filterAnalysisNormal.getPreFilterBitmap() != null);
     Assert.assertTrue(filterAnalysisNormal.getPostFilter() == null);
 
 
     // no bitmap index, should be a post filter
     Filter noBitmapFilter = new NoBitmapSelectorFilter("dim1", "HELLO");
-    QueryableIndexStorageAdapter.FilterAnalysis noBitmapFilterAnalysis =
-        storageAdapter.analyzeFilter(noBitmapFilter, bitmapIndexSelector, 
null);
+    FilterAnalysis noBitmapFilterAnalysis =
+        FilterAnalysis.analyzeFilter(noBitmapFilter, bitmapIndexSelector, 
null, numRows);
     Assert.assertTrue(noBitmapFilterAnalysis.getPreFilterBitmap() == null);
     Assert.assertTrue(noBitmapFilterAnalysis.getPostFilter() != null);
 
@@ -750,8 +753,8 @@ public class FilterPartitionTest extends BaseFilterTest
         "HELLO",
         new FilterTuning(false, null, null)
     );
-    QueryableIndexStorageAdapter.FilterAnalysis 
bitmapFilterWithForceNoIndexTuningAnalysis =
-        storageAdapter.analyzeFilter(bitmapFilterWithForceNoIndexTuning, 
bitmapIndexSelector, null);
+    FilterAnalysis bitmapFilterWithForceNoIndexTuningAnalysis =
+        FilterAnalysis.analyzeFilter(bitmapFilterWithForceNoIndexTuning, 
bitmapIndexSelector, null, numRows);
     
Assert.assertTrue(bitmapFilterWithForceNoIndexTuningAnalysis.getPreFilterBitmap()
 == null);
     
Assert.assertTrue(bitmapFilterWithForceNoIndexTuningAnalysis.getPostFilter() != 
null);
 
@@ -761,8 +764,8 @@ public class FilterPartitionTest extends BaseFilterTest
         "HELLO",
         new FilterTuning(true, 0, 3)
     );
-    QueryableIndexStorageAdapter.FilterAnalysis 
bitmapFilterWithCardinalityMaxAnalysis =
-        storageAdapter.analyzeFilter(bitmapFilterWithCardinalityMax, 
bitmapIndexSelector, null);
+    FilterAnalysis bitmapFilterWithCardinalityMaxAnalysis =
+        FilterAnalysis.analyzeFilter(bitmapFilterWithCardinalityMax, 
bitmapIndexSelector, null, numRows);
     
Assert.assertTrue(bitmapFilterWithCardinalityMaxAnalysis.getPreFilterBitmap() 
== null);
     Assert.assertTrue(bitmapFilterWithCardinalityMaxAnalysis.getPostFilter() 
!= null);
 
@@ -772,8 +775,8 @@ public class FilterPartitionTest extends BaseFilterTest
         "HELLO",
         new FilterTuning(true, 0, 1000)
     );
-    QueryableIndexStorageAdapter.FilterAnalysis 
bitmapFilterWithCardinalityMax2Analysis =
-        storageAdapter.analyzeFilter(bitmapFilterWithCardinalityMax2, 
bitmapIndexSelector, null);
+    FilterAnalysis bitmapFilterWithCardinalityMax2Analysis =
+        FilterAnalysis.analyzeFilter(bitmapFilterWithCardinalityMax2, 
bitmapIndexSelector, null, numRows);
     
Assert.assertTrue(bitmapFilterWithCardinalityMax2Analysis.getPreFilterBitmap() 
!= null);
     Assert.assertTrue(bitmapFilterWithCardinalityMax2Analysis.getPostFilter() 
== null);
 
@@ -783,8 +786,8 @@ public class FilterPartitionTest extends BaseFilterTest
         "HELLO",
         new FilterTuning(true, 1000, null)
     );
-    QueryableIndexStorageAdapter.FilterAnalysis 
bitmapFilterWithCardinalityMinAnalysis =
-        storageAdapter.analyzeFilter(bitmapFilterWithCardinalityMin, 
bitmapIndexSelector, null);
+    FilterAnalysis bitmapFilterWithCardinalityMinAnalysis =
+        FilterAnalysis.analyzeFilter(bitmapFilterWithCardinalityMin, 
bitmapIndexSelector, null, numRows);
     
Assert.assertTrue(bitmapFilterWithCardinalityMinAnalysis.getPreFilterBitmap() 
== null);
     Assert.assertTrue(bitmapFilterWithCardinalityMinAnalysis.getPostFilter() 
!= null);
 
@@ -794,8 +797,8 @@ public class FilterPartitionTest extends BaseFilterTest
         "HELLO",
         new FilterTuning(true, null, null)
     );
-    QueryableIndexStorageAdapter.FilterAnalysis 
noBitmapFilterWithForceUseAnalysis =
-        storageAdapter.analyzeFilter(noBitmapFilterWithForceUse, 
bitmapIndexSelector, null);
+    FilterAnalysis noBitmapFilterWithForceUseAnalysis =
+        FilterAnalysis.analyzeFilter(noBitmapFilterWithForceUse, 
bitmapIndexSelector, null, numRows);
     Assert.assertTrue(noBitmapFilterWithForceUseAnalysis.getPreFilterBitmap() 
== null);
     Assert.assertTrue(noBitmapFilterWithForceUseAnalysis.getPostFilter() != 
null);
   }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
index 67620a1d27..48e84837fd 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
@@ -129,7 +129,8 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
   @AfterClass
   public static void teardown()
   {
-    CloseableUtils.closeAndSuppressExceptions(CLOSER, throwable -> {});
+    CloseableUtils.closeAndSuppressExceptions(CLOSER, throwable -> {
+    });
   }
 
   @Test
@@ -146,9 +147,8 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
           null
       );
 
-      List<Cursor> flatten = cursorSequence.toList();
 
-      for (Cursor cursor : flatten) {
+      cursorSequence.accumulate(null, (accumulated, cursor) -> {
         ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
         ExpressionPlan plan = ExpressionPlanner.plan(
             adapter,
@@ -202,7 +202,9 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
 
           cursor.advance();
         }
-      }
+
+        return null;
+      });
     }
   }
 
@@ -220,9 +222,7 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
           null
       );
 
-      List<Cursor> flatten = cursorSequence.toList();
-
-      for (Cursor cursor : flatten) {
+      cursorSequence.accumulate(null, (ignored, cursor) -> {
         ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
 
         // identifier, uses dimension selector supplier supplier, no null 
coercion
@@ -289,7 +289,8 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
 
           cursor.advance();
         }
-      }
+        return ignored;
+      });
     }
   }
 
@@ -307,9 +308,7 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
           null
       );
 
-      List<Cursor> flatten = cursorSequence.toList();
-
-      for (Cursor cursor : flatten) {
+      cursorSequence.accumulate(null, (accumulated, cursor) -> {
         ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
         // an assortment of plans
         ExpressionPlan plan = ExpressionPlanner.plan(
@@ -344,7 +343,9 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
           }
           cursor.advance();
         }
-      }
+
+        return null;
+      });
     }
   }
 
@@ -362,9 +363,8 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
           null
       );
 
-      List<Cursor> flatten = cursorSequence.toList();
 
-      for (Cursor cursor : flatten) {
+      cursorSequence.accumulate(null, (accumulated, cursor) -> {
         ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
         // an assortment of plans
         ExpressionPlan plan = ExpressionPlanner.plan(
@@ -399,7 +399,9 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
           }
           cursor.advance();
         }
-      }
+
+        return null;
+      });
     }
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVectorSelectorsTest.java
 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVectorSelectorsTest.java
index a7ca0ad079..c3e6a92850 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVectorSelectorsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVectorSelectorsTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.DeprecatedQueryableIndexColumnSelector;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexStorageAdapter;
 import org.apache.druid.segment.VirtualColumns;
@@ -143,7 +144,7 @@ public class ExpressionVectorSelectorsTest
   public void setup()
   {
     Expr parsed = Parser.parse(expression, ExprMacroTable.nil());
-    outputType = parsed.getOutputType(INDEX);
+    outputType = parsed.getOutputType(new 
DeprecatedQueryableIndexColumnSelector(INDEX));
     if (outputType == null) {
       outputType = ExpressionType.STRING;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to