This is an automated email from the ASF dual-hosted git repository. cheddar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new bb953be09b Refactor usage of JoinableFactoryWrapper + more test coverage (#12767) bb953be09b is described below commit bb953be09bff79361331f74efa99317d2f3e6187 Author: Rohan Garg <7731512+rohang...@users.noreply.github.com> AuthorDate: Tue Jul 12 18:55:36 2022 +0530 Refactor usage of JoinableFactoryWrapper + more test coverage (#12767) Refactor usage of JoinableFactoryWrapper to add e2e test for createSegmentMapFn with joinToFilter feature enabled --- .../query/CachingClusteredClientBenchmark.java | 5 +- .../movingaverage/MovingAverageQueryTest.java | 3 +- .../druid/indexing/overlord/TaskLifecycleTest.java | 3 +- .../ServerManagerForQueryErrorTest.java | 6 +- .../apache/druid/segment/join/HashJoinSegment.java | 12 +++ .../druid/segment/join/InlineJoinableFactory.java | 0 .../druid/segment/join/JoinableFactoryWrapper.java | 16 ++- .../java/org/apache/druid/query/TestQuery.java | 16 +++ .../segment/join/InlineJoinableFactoryTest.java | 0 .../segment/join/JoinableFactoryWrapperTest.java | 111 ++++++++++++++------- .../druid/client/CachingClusteredClient.java | 5 +- .../realtime/appenderator/Appenderators.java | 3 +- .../appenderator/SinkQuerySegmentWalker.java | 5 +- .../UnifiedIndexerAppenderatorsManager.java | 9 +- .../segment/realtime/plumber/RealtimePlumber.java | 3 +- .../druid/server/LocalQuerySegmentWalker.java | 5 +- .../druid/server/coordination/ServerManager.java | 5 +- .../CachingClusteredClientFunctionalityTest.java | 5 +- .../client/CachingClusteredClientPerfTest.java | 4 +- .../druid/client/CachingClusteredClientTest.java | 5 +- .../QueryRunnerBasedOnClusteredClientTestBase.java | 5 +- .../UnifiedIndexerAppenderatorsManagerTest.java | 4 +- .../druid/server/ClientQuerySegmentWalkerTest.java | 6 +- .../org/apache/druid/server/QueryStackTests.java | 9 +- .../server/TestClusterQuerySegmentWalker.java | 5 +- .../server/coordination/ServerManagerTest.java | 4 +- .../druid/sql/calcite/TestQueryMakerFactory.java | 2 +- .../druid/sql/calcite/util/CalciteTests.java | 28 +++++- .../util/SpecificSegmentsQuerySegmentWalker.java | 50 ++++------ 29 files changed, 214 insertions(+), 120 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index b85dad30a7..bc480c1063 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -25,7 +25,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import org.apache.druid.client.CachingClusteredClient; @@ -104,7 +103,7 @@ import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.generator.GeneratorBasicSchemas; import org.apache.druid.segment.generator.GeneratorSchemaInfo; import org.apache.druid.segment.generator.SegmentGenerator; -import org.apache.druid.segment.join.MapJoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -343,7 +342,7 @@ public class CachingClusteredClientBenchmark processingConfig, forkJoinPool, QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new NoopServiceEmitter() ); } diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 9386425817..2945bdb1e5 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -69,6 +69,7 @@ import org.apache.druid.query.movingaverage.test.TestConfig; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.ClientQuerySegmentWalker; import org.apache.druid.server.QueryStackTests; @@ -372,7 +373,7 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest }, ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + new JoinableFactoryWrapper(new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())), new NoopServiceEmitter() ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index bb5a6d2ae4..8d4e296648 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -122,6 +122,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentArchiver; import org.apache.druid.segment.loading.DataSegmentMover; @@ -1359,7 +1360,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager( new ForwardingQueryProcessingPool(exec), - NoopJoinableFactory.INSTANCE, + JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new WorkerConfig(), MapCache.create(2048), new CacheConfig(), diff --git a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java index 98c7f964c1..87c50c88c6 100644 --- a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java +++ b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java @@ -46,7 +46,7 @@ import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.SegmentReference; -import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -96,7 +96,7 @@ public class ServerManagerForQueryErrorTest extends ServerManager Cache cache, CacheConfig cacheConfig, SegmentManager segmentManager, - JoinableFactory joinableFactory, + JoinableFactoryWrapper joinableFactoryWrapper, ServerConfig serverConfig ) { @@ -109,7 +109,7 @@ public class ServerManagerForQueryErrorTest extends ServerManager cache, cacheConfig, segmentManager, - joinableFactory, + joinableFactoryWrapper, serverConfig ); } diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java index 7ade12eaca..c726a8335a 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegment.java @@ -47,6 +47,7 @@ public class HashJoinSegment implements SegmentReference private static final Logger log = new Logger(HashJoinSegment.class); private final SegmentReference baseSegment; + @Nullable private final Filter baseFilter; private final List<JoinableClause> clauses; private final JoinFilterPreAnalysis joinFilterPreAnalysis; @@ -147,4 +148,15 @@ public class HashJoinSegment implements SegmentReference return Optional.empty(); } } + + @Nullable + public Filter getBaseFilter() + { + return baseFilter; + } + + public List<JoinableClause> getClauses() + { + return clauses; + } } diff --git a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java similarity index 100% rename from server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java rename to processing/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java index c0831b352e..0971e1abae 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactoryWrapper.java @@ -22,11 +22,14 @@ package org.apache.druid.segment.join; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Multiset; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import com.google.inject.Inject; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.logger.Logger; @@ -67,11 +70,17 @@ public class JoinableFactoryWrapper private final JoinableFactory joinableFactory; + @Inject public JoinableFactoryWrapper(final JoinableFactory joinableFactory) { this.joinableFactory = Preconditions.checkNotNull(joinableFactory, "joinableFactory"); } + public JoinableFactory getJoinableFactory() + { + return joinableFactory; + } + /** * Creates a Function that maps base segments to {@link HashJoinSegment} if needed (i.e. if the number of join * clauses is > 0). If mapping is not needed, this method will return {@link Function#identity()}. @@ -141,7 +150,12 @@ public class JoinableFactoryWrapper ); return baseSegment -> - new HashJoinSegment(baseSegment, baseFilterToUse, clausesToUse, joinFilterPreAnalysis); + new HashJoinSegment( + baseSegment, + baseFilterToUse, + GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()), + joinFilterPreAnalysis + ); } } ); diff --git a/processing/src/test/java/org/apache/druid/query/TestQuery.java b/processing/src/test/java/org/apache/druid/query/TestQuery.java index afeb361274..200151c200 100644 --- a/processing/src/test/java/org/apache/druid/query/TestQuery.java +++ b/processing/src/test/java/org/apache/druid/query/TestQuery.java @@ -22,10 +22,14 @@ package org.apache.druid.query; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.spec.QuerySegmentSpec; +import javax.annotation.Nullable; import java.util.Map; +import java.util.Set; public class TestQuery extends BaseQuery { + @Nullable + private Set<String> requiredColumns; public TestQuery(DataSource dataSource, QuerySegmentSpec querySegmentSpec, boolean descending, Map context) { @@ -72,4 +76,16 @@ public class TestQuery extends BaseQuery BaseQuery.computeOverriddenContext(getContext(), contextOverride) ); } + + @Nullable + @Override + public Set<String> getRequiredColumns() + { + return requiredColumns; + } + + public void setRequiredColumns(Set<String> requiredColumns) + { + this.requiredColumns = requiredColumns; + } } diff --git a/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java similarity index 100% rename from server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java rename to processing/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java index d430fb7e8f..49d159d585 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinableFactoryWrapperTest.java @@ -36,6 +36,7 @@ import org.apache.druid.query.DataSource; import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.LookupDataSource; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.TestQuery; import org.apache.druid.query.extraction.MapLookupExtractor; @@ -46,6 +47,8 @@ import org.apache.druid.query.filter.TrueDimFilter; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.planning.PreJoinableClause; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -54,12 +57,15 @@ import org.apache.druid.segment.join.lookup.LookupJoinable; import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.segment.join.table.IndexedTableJoinable; import org.apache.druid.segment.join.table.RowBasedIndexedTable; +import org.apache.druid.timeline.SegmentId; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -73,8 +79,9 @@ import java.util.stream.Collectors; public class JoinableFactoryWrapperTest extends NullHandlingTest { - private static final JoinableFactoryWrapper NOOP_JOINABLE_FACTORY_WRAPPER = new JoinableFactoryWrapper( - NoopJoinableFactory.INSTANCE); + public static final JoinableFactoryWrapper NOOP_JOINABLE_FACTORY_WRAPPER = new JoinableFactoryWrapper( + NoopJoinableFactory.INSTANCE + ); private static final Map<String, String> TEST_LOOKUP = ImmutableMap.<String, String>builder() @@ -124,6 +131,9 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest DateTimes.nowUtc().toString() ); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -143,12 +153,11 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest @Test public void test_createSegmentMapFn_unusableClause() { - final LookupDataSource lookupDataSource = new LookupDataSource("lookyloo"); - final PreJoinableClause clause = new PreJoinableClause( + final PreJoinableClause clause = makePreJoinableClause( + INDEXED_TABLE_DS, + "country == \"j.country\"", "j.", - lookupDataSource, - JoinType.LEFT, - JoinConditionAnalysis.forExpression("x == \"j.x\"", "j.", ExprMacroTable.nil()) + JoinType.LEFT ); expectedException.expect(IllegalStateException.class); @@ -165,39 +174,14 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest @Test public void test_createSegmentMapFn_usableClause() { - final LookupDataSource lookupDataSource = new LookupDataSource("lookyloo"); - final JoinConditionAnalysis conditionAnalysis = JoinConditionAnalysis.forExpression( - "x == \"j.x\"", + final PreJoinableClause clause = makePreJoinableClause( + INDEXED_TABLE_DS, + "country == \"j.country\"", "j.", - ExprMacroTable.nil() - ); - final PreJoinableClause clause = new PreJoinableClause( - "j.", - lookupDataSource, - JoinType.LEFT, - conditionAnalysis + JoinType.LEFT ); - JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new JoinableFactory() - { - @Override - public boolean isDirectlyJoinable(DataSource dataSource) - { - return dataSource.equals(lookupDataSource); - } - - @Override - public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition) - { - if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) { - return Optional.of( - LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false)) - ); - } else { - return Optional.empty(); - } - } - }); + JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new InlineJoinableFactory()); final Function<SegmentReference, SegmentReference> segmentMapFn = joinableFactoryWrapper.createSegmentMapFn( null, ImmutableList.of(clause), @@ -206,13 +190,64 @@ public class JoinableFactoryWrapperTest extends NullHandlingTest new TableDataSource("test"), new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))), false, - new HashMap() + new HashMap<>() ) ); Assert.assertNotSame(Function.identity(), segmentMapFn); } + @Test + public void test_createSegmentMapFn_usableClause_joinToFilterEnabled() throws IOException + { + final PreJoinableClause clause = makePreJoinableClause( + INDEXED_TABLE_DS, + "country == \"j.country\"", + "j.", + JoinType.INNER + ); + // required columns are necessary for the rewrite + final TestQuery queryWithRequiredColumnsAndJoinFilterRewrite = (TestQuery) new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))), + false, + new HashMap<>() + ).withOverriddenContext(ImmutableMap.of(QueryContexts.REWRITE_JOIN_TO_FILTER_ENABLE_KEY, "true")); + queryWithRequiredColumnsAndJoinFilterRewrite.setRequiredColumns(ImmutableSet.of("country")); + + final JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(new InlineJoinableFactory()); + final Function<SegmentReference, SegmentReference> segmentMapFn = joinableFactoryWrapper.createSegmentMapFn( + null, + ImmutableList.of(clause), + new AtomicLong(), + queryWithRequiredColumnsAndJoinFilterRewrite + ); + + // dummy segment + final SegmentReference baseSegmentReference = ReferenceCountingSegment.wrapRootGenerationSegment( + new QueryableIndexSegment( + JoinTestHelper.createFactIndexBuilder(temporaryFolder.newFolder()).buildMMappedIndex(), + SegmentId.dummy("facts") + ) + ); + + // check the output contains the conversion filter + Assert.assertNotSame(Function.identity(), segmentMapFn); + final SegmentReference joinSegmentReference = segmentMapFn.apply(baseSegmentReference); + Assert.assertTrue(joinSegmentReference instanceof HashJoinSegment); + HashJoinSegment hashJoinSegment = (HashJoinSegment) joinSegmentReference; + Assert.assertEquals( + hashJoinSegment.getBaseFilter(), + new InDimFilter( + "country", + INDEXED_TABLE_DS.getRowsAsList().stream().map(row -> row[0].toString()).collect(Collectors.toSet()) + ) + ); + // the returned clause list is not comparable with an expected clause list since the Joinable + // class member in JoinableClause doesn't implement equals method in its implementations + Assert.assertEquals(hashJoinSegment.getClauses().size(), 1); + } + @Test public void test_computeJoinDataSourceCacheKey_noClauses() { diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 4eda53bd4d..69952e22ce 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -74,7 +74,6 @@ import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.QuerySegmentSpec; -import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.server.QueryResource; import org.apache.druid.server.QueryScheduler; @@ -142,7 +141,7 @@ public class CachingClusteredClient implements QuerySegmentWalker DruidProcessingConfig processingConfig, @Merging ForkJoinPool pool, QueryScheduler scheduler, - JoinableFactory joinableFactory, + JoinableFactoryWrapper joinableFactoryWrapper, ServiceEmitter emitter ) { @@ -156,7 +155,7 @@ public class CachingClusteredClient implements QuerySegmentWalker this.processingConfig = processingConfig; this.pool = pool; this.scheduler = scheduler; - this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory); + this.joinableFactoryWrapper = joinableFactoryWrapper; this.emitter = emitter; if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java index 9b0064fbcf..b087c91988 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java @@ -33,6 +33,7 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; @@ -80,7 +81,7 @@ public class Appenderators emitter, conglomerate, queryProcessingPool, - joinableFactory, + new JoinableFactoryWrapper(joinableFactory), Preconditions.checkNotNull(cache, "cache"), cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index ae6994134b..53783fcbdb 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -60,7 +60,6 @@ import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.filter.Filters; -import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.plumber.Sink; @@ -102,7 +101,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, QueryProcessingPool queryProcessingPool, - JoinableFactory joinableFactory, + JoinableFactoryWrapper joinableFactoryWrapper, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats @@ -114,7 +113,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker this.emitter = Preconditions.checkNotNull(emitter, "emitter"); this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate"); this.queryProcessingPool = Preconditions.checkNotNull(queryProcessingPool, "queryProcessingPool"); - this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory); + this.joinableFactoryWrapper = joinableFactoryWrapper; this.cache = Preconditions.checkNotNull(cache, "cache"); this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig"); this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats"); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index c323010fb0..0390e32b22 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -60,6 +60,7 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.plumber.Sink; @@ -107,7 +108,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager private final Map<String, DatasourceBundle> datasourceBundles = new HashMap<>(); private final QueryProcessingPool queryProcessingPool; - private final JoinableFactory joinableFactory; + private final JoinableFactoryWrapper joinableFactoryWrapper; private final WorkerConfig workerConfig; private final Cache cache; private final CacheConfig cacheConfig; @@ -121,7 +122,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager @Inject public UnifiedIndexerAppenderatorsManager( QueryProcessingPool queryProcessingPool, - JoinableFactory joinableFactory, + JoinableFactoryWrapper joinableFactoryWrapper, WorkerConfig workerConfig, Cache cache, CacheConfig cacheConfig, @@ -132,7 +133,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager ) { this.queryProcessingPool = queryProcessingPool; - this.joinableFactory = joinableFactory; + this.joinableFactoryWrapper = joinableFactoryWrapper; this.workerConfig = workerConfig; this.cache = cache; this.cacheConfig = cacheConfig; @@ -427,7 +428,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager serviceEmitter, queryRunnerFactoryConglomerateProvider.get(), queryProcessingPool, - joinableFactory, + joinableFactoryWrapper, Preconditions.checkNotNull(cache, "cache"), cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index b5c32f4819..efabdcc168 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -65,6 +65,7 @@ import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.FireHydrant; @@ -171,7 +172,7 @@ public class RealtimePlumber implements Plumber emitter, conglomerate, queryProcessingPool, - joinableFactory, + new JoinableFactoryWrapper(joinableFactory), cache, cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java index 5c71d84043..8ed289b8bc 100644 --- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java @@ -37,7 +37,6 @@ import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.segment.filter.Filters; -import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.joda.time.Interval; @@ -66,14 +65,14 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker public LocalQuerySegmentWalker( QueryRunnerFactoryConglomerate conglomerate, SegmentWrangler segmentWrangler, - JoinableFactory joinableFactory, + JoinableFactoryWrapper joinableFactoryWrapper, QueryScheduler scheduler, ServiceEmitter emitter ) { this.conglomerate = conglomerate; this.segmentWrangler = segmentWrangler; - this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory); + this.joinableFactoryWrapper = joinableFactoryWrapper; this.scheduler = scheduler; this.emitter = emitter; } diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index 734f65421b..9a31000b46 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -60,7 +60,6 @@ import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.filter.Filters; -import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.SetAndVerifyContextQueryRunner; @@ -104,7 +103,7 @@ public class ServerManager implements QuerySegmentWalker Cache cache, CacheConfig cacheConfig, SegmentManager segmentManager, - JoinableFactory joinableFactory, + JoinableFactoryWrapper joinableFactoryWrapper, ServerConfig serverConfig ) { @@ -118,7 +117,7 @@ public class ServerManager implements QuerySegmentWalker this.cacheConfig = cacheConfig; this.segmentManager = segmentManager; - this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory); + this.joinableFactoryWrapper = joinableFactoryWrapper; this.serverConfig = serverConfig; } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 4773581b1b..f77baa4812 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -21,7 +21,6 @@ package org.apache.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; @@ -48,7 +47,7 @@ import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.segment.join.MapJoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -336,7 +335,7 @@ public class CachingClusteredClientFunctionalityTest }, ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new NoopServiceEmitter() ); } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java index 218cac521b..95fce2060e 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java @@ -49,7 +49,7 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.join.NoopJoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.coordination.ServerManagerTest; import org.apache.druid.server.coordination.ServerType; @@ -139,7 +139,7 @@ public class CachingClusteredClientPerfTest Mockito.mock(DruidProcessingConfig.class), ForkJoinPool.commonPool(), queryScheduler, - NoopJoinableFactory.INSTANCE, + JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new NoopServiceEmitter() ); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index ac9afb8958..c7f05747fa 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -26,7 +26,6 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; @@ -120,7 +119,7 @@ import org.apache.druid.query.topn.TopNQueryConfig; import org.apache.druid.query.topn.TopNQueryQueryToolChest; import org.apache.druid.query.topn.TopNResultValue; import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.join.MapJoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.ServerTestHelper; import org.apache.druid.server.coordination.ServerType; @@ -2850,7 +2849,7 @@ public class CachingClusteredClientTest NoQueryLaningStrategy.INSTANCE, new ServerConfig() ), - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new NoopServiceEmitter() ); } diff --git a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java index ba7e4da173..d07bfdef34 100644 --- a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java +++ b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import org.apache.druid.client.CachingClusteredClient; import org.apache.druid.client.DirectDruidClient; import org.apache.druid.client.DruidServer; @@ -50,7 +49,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.generator.GeneratorBasicSchemas; import org.apache.druid.segment.generator.GeneratorSchemaInfo; import org.apache.druid.segment.generator.SegmentGenerator; -import org.apache.druid.segment.join.MapJoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; @@ -149,7 +148,7 @@ public abstract class QueryRunnerBasedOnClusteredClientTestBase ), ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new NoopServiceEmitter() ); servers = new ArrayList<>(); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java index 8dcc4c10dd..b95a00eff9 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java @@ -46,7 +46,7 @@ import org.apache.druid.segment.incremental.NoopRowIngestionMeters; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; -import org.apache.druid.segment.join.NoopJoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.segment.loading.NoopDataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory; @@ -74,7 +74,7 @@ public class UnifiedIndexerAppenderatorsManagerTest extends InitializedNullHandl private final UnifiedIndexerAppenderatorsManager manager = new UnifiedIndexerAppenderatorsManager( DirectQueryProcessingPool.INSTANCE, - NoopJoinableFactory.INSTANCE, + JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new WorkerConfig(), MapCache.create(10), new CacheConfig(), diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 8d2713068c..e19dfcf189 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -77,6 +77,7 @@ import org.apache.druid.segment.join.JoinConditionAnalysis; import org.apache.druid.segment.join.JoinType; import org.apache.druid.segment.join.Joinable; import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; @@ -1331,6 +1332,7 @@ public class ClientQuerySegmentWalkerTest .put(globalFactory.getClass(), GlobalTableDataSource.class) .build() ); + final JoinableFactoryWrapper joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory); class CapturingWalker implements QuerySegmentWalker { @@ -1379,7 +1381,7 @@ public class ClientQuerySegmentWalkerTest .put(ARRAY, makeTimeline(ARRAY, ARRAY_INLINE)) .put(ARRAY_UNKNOWN, makeTimeline(ARRAY_UNKNOWN, ARRAY_INLINE_UNKNOWN)) .build(), - joinableFactory, + joinableFactoryWrapper, conglomerate, schedulerForTest ), @@ -1389,7 +1391,7 @@ public class ClientQuerySegmentWalkerTest QueryStackTests.createLocalQuerySegmentWalker( conglomerate, segmentWrangler, - joinableFactory, + joinableFactoryWrapper, schedulerForTest ), ClusterOrLocal.LOCAL diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 3c27b9ad3f..a53589d120 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -70,6 +70,7 @@ import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.join.InlineJoinableFactory; import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.join.LookupJoinableFactory; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.initialization.ServerConfig; @@ -159,25 +160,25 @@ public class QueryStackTests public static TestClusterQuerySegmentWalker createClusterQuerySegmentWalker( Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines, - JoinableFactory joinableFactory, + JoinableFactoryWrapper joinableFactoryWraper, QueryRunnerFactoryConglomerate conglomerate, @Nullable QueryScheduler scheduler ) { - return new TestClusterQuerySegmentWalker(timelines, joinableFactory, conglomerate, scheduler); + return new TestClusterQuerySegmentWalker(timelines, joinableFactoryWraper, conglomerate, scheduler); } public static LocalQuerySegmentWalker createLocalQuerySegmentWalker( final QueryRunnerFactoryConglomerate conglomerate, final SegmentWrangler segmentWrangler, - final JoinableFactory joinableFactory, + final JoinableFactoryWrapper joinableFactoryWrapper, final QueryScheduler scheduler ) { return new LocalQuerySegmentWalker( conglomerate, segmentWrangler, - joinableFactory, + joinableFactoryWrapper, scheduler, EMITTER ); diff --git a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java index 8e7a64f44c..673a93be12 100644 --- a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java @@ -46,7 +46,6 @@ import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.filter.Filters; -import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -80,13 +79,13 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker TestClusterQuerySegmentWalker( Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines, - JoinableFactory joinableFactory, + JoinableFactoryWrapper joinableFactoryWrapper, QueryRunnerFactoryConglomerate conglomerate, @Nullable QueryScheduler scheduler ) { this.timelines = timelines; - this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory); + this.joinableFactoryWrapper = joinableFactoryWrapper; this.conglomerate = conglomerate; this.scheduler = scheduler; } diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index bc16274aab..b55e22feb8 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -84,7 +84,7 @@ import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.data.Indexed; -import org.apache.druid.segment.join.NoopJoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.TombstoneSegmentizerFactory; @@ -196,7 +196,7 @@ public class ServerManagerTest new LocalCacheProvider().get(), new CacheConfig(), segmentManager, - NoopJoinableFactory.INSTANCE, + JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new ServerConfig() ); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/TestQueryMakerFactory.java b/sql/src/test/java/org/apache/druid/sql/calcite/TestQueryMakerFactory.java index c2fbe5aeee..b5e50236fd 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/TestQueryMakerFactory.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/TestQueryMakerFactory.java @@ -34,7 +34,7 @@ public class TestQueryMakerFactory implements QueryMakerFactory private final QueryLifecycleFactory queryLifecycleFactory; private final ObjectMapper jsonMapper; - TestQueryMakerFactory( + public TestQueryMakerFactory( final QueryLifecycleFactory queryLifecycleFactory, final ObjectMapper jsonMapper ) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 54d3aea6f5..2a67706b3b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -95,6 +95,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.join.JoinConditionAnalysis; import org.apache.druid.segment.join.Joinable; import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.join.table.IndexedTableJoinable; import org.apache.druid.segment.join.table.RowBasedIndexedTable; import org.apache.druid.segment.loading.SegmentLoader; @@ -883,7 +884,7 @@ public class CalciteTests final QueryScheduler scheduler ) { - return createMockWalker(conglomerate, tmpDir, scheduler, null); + return createMockWalker(conglomerate, tmpDir, scheduler, (JoinableFactory) null); } public static SpecificSegmentsQuerySegmentWalker createMockWalker( @@ -892,6 +893,29 @@ public class CalciteTests final QueryScheduler scheduler, final JoinableFactory joinableFactory ) + { + final JoinableFactory joinableFactoryToUse; + if (joinableFactory == null) { + joinableFactoryToUse = QueryStackTests.makeJoinableFactoryForLookup( + INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class) + ); + } else { + joinableFactoryToUse = joinableFactory; + } + return createMockWalker( + conglomerate, + tmpDir, + scheduler, + new JoinableFactoryWrapper(joinableFactoryToUse) + ); + } + + public static SpecificSegmentsQuerySegmentWalker createMockWalker( + final QueryRunnerFactoryConglomerate conglomerate, + final File tmpDir, + final QueryScheduler scheduler, + final JoinableFactoryWrapper joinableFactoryWrapper + ) { final QueryableIndex index1 = IndexBuilder .create() @@ -969,7 +993,7 @@ public class CalciteTests return new SpecificSegmentsQuerySegmentWalker( conglomerate, INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class), - joinableFactory, + joinableFactoryWrapper, scheduler ).add( DataSegment.builder() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index 8283b666be..ca312983ff 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -41,6 +41,7 @@ import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.server.ClientQuerySegmentWalker; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QueryStackTests; @@ -49,7 +50,6 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -72,6 +72,21 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines = new HashMap<>(); private final List<Closeable> closeables = new ArrayList<>(); private final List<DataSegment> segments = new ArrayList<>(); + private static final LookupExtractorFactoryContainerProvider LOOKUP_EXTRACTOR_FACTORY_CONTAINER_PROVIDER = + new LookupExtractorFactoryContainerProvider() + { + @Override + public Set<String> getAllLookupNames() + { + return Collections.emptySet(); + } + + @Override + public Optional<LookupExtractorFactoryContainer> get(String lookupName) + { + return Optional.empty(); + } + }; /** * Create an instance using the provided query runner factory conglomerate and lookup provider. @@ -81,22 +96,14 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C public SpecificSegmentsQuerySegmentWalker( final QueryRunnerFactoryConglomerate conglomerate, final LookupExtractorFactoryContainerProvider lookupProvider, - @Nullable final JoinableFactory joinableFactory, + final JoinableFactoryWrapper joinableFactoryWrapper, final QueryScheduler scheduler ) { - final JoinableFactory joinableFactoryToUse; - - if (joinableFactory == null) { - joinableFactoryToUse = QueryStackTests.makeJoinableFactoryForLookup(lookupProvider); - } else { - joinableFactoryToUse = joinableFactory; - } - this.walker = QueryStackTests.createClientQuerySegmentWalker( QueryStackTests.createClusterQuerySegmentWalker( timelines, - joinableFactoryToUse, + joinableFactoryWrapper, conglomerate, scheduler ), @@ -108,11 +115,11 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C .put(LookupDataSource.class, new LookupSegmentWrangler(lookupProvider)) .build() ), - joinableFactoryToUse, + joinableFactoryWrapper, scheduler ), conglomerate, - joinableFactoryToUse, + joinableFactoryWrapper.getJoinableFactory(), new ServerConfig() ); } @@ -125,21 +132,8 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C { this( conglomerate, - new LookupExtractorFactoryContainerProvider() - { - @Override - public Set<String> getAllLookupNames() - { - return Collections.emptySet(); - } - - @Override - public Optional<LookupExtractorFactoryContainer> get(String lookupName) - { - return Optional.empty(); - } - }, - null, + LOOKUP_EXTRACTOR_FACTORY_CONTAINER_PROVIDER, + new JoinableFactoryWrapper(QueryStackTests.makeJoinableFactoryForLookup(LOOKUP_EXTRACTOR_FACTORY_CONTAINER_PROVIDER)), QueryStackTests.DEFAULT_NOOP_SCHEDULER ); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org