This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4a594bb9f6a Use task actions to fetch used segments in MSQ (#15284)
4a594bb9f6a is described below
commit 4a594bb9f6ab89eabda5db654a9d215633340c66
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Fri Dec 1 15:29:33 2023 +0530
Use task actions to fetch used segments in MSQ (#15284)
* Use task actions to fetch used segments in MSQ
* Fix tests
* Fixing tests.
* Revert "Fix tests"
This reverts commit 95ab6494
* Removing conditional check in tests.
* Pulling in latest changes.
---------
Co-authored-by: cryptoe <[email protected]>
---
.../java/org/apache/druid/msq/exec/ControllerImpl.java | 15 +++++++++++++--
.../apache/druid/msq/test/CalciteArraysQueryMSQTest.java | 2 +-
.../druid/msq/test/CalciteSelectJoinQueryMSQTest.java | 2 +-
.../apache/druid/msq/test/CalciteSelectQueryMSQTest.java | 2 +-
.../apache/druid/msq/test/CalciteUnionQueryMSQTest.java | 2 +-
.../test/java/org/apache/druid/msq/test/MSQTestBase.java | 2 +-
.../apache/druid/msq/test/MSQTestControllerContext.java | 15 ---------------
.../apache/druid/msq/test/MSQTestTaskActionClient.java | 16 +++++++++++++++-
8 files changed, 33 insertions(+), 23 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 1da68a423fa..ae9c1122498 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -69,6 +69,7 @@ import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
+import
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
@@ -1200,8 +1201,18 @@ public class ControllerImpl implements Controller
}
// Fetch all published, used segments (all non-realtime segments) from
the metadata store.
- final Collection<DataSegment> publishedUsedSegments =
-
FutureUtils.getUnchecked(context.coordinatorClient().fetchUsedSegments(dataSource,
intervals), true);
+ // If the task is operating with a REPLACE lock,
+ // any segment created after the lock was acquired for its interval will
not be considered.
+ final Collection<DataSegment> publishedUsedSegments;
+ try {
+ publishedUsedSegments = context.taskActionClient().submit(new
RetrieveSegmentsToReplaceAction(
+ dataSource,
+ intervals
+ ));
+ }
+ catch (IOException e) {
+ throw new MSQException(e, UnknownFault.forException(e));
+ }
int realtimeCount = 0;
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysQueryMSQTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysQueryMSQTest.java
index abefe6a378d..92f8d6f4e79 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysQueryMSQTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysQueryMSQTest.java
@@ -80,7 +80,7 @@ public class CalciteArraysQueryMSQTest extends
CalciteArraysQueryTest
final MSQTestOverlordServiceClient indexingServiceClient = new
MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
- new MSQTestTaskActionClient(queryJsonMapper),
+ new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
index 114583d31a1..644d0d05451 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
@@ -128,7 +128,7 @@ public class CalciteSelectJoinQueryMSQTest
final MSQTestOverlordServiceClient indexingServiceClient = new
MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
- new MSQTestTaskActionClient(queryJsonMapper),
+ new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java
index 974eed48734..504999ae45b 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java
@@ -84,7 +84,7 @@ public class CalciteSelectQueryMSQTest extends
CalciteQueryTest
final MSQTestOverlordServiceClient indexingServiceClient = new
MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
- new MSQTestTaskActionClient(queryJsonMapper),
+ new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java
index 6ec17687c45..01379e2a93f 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java
@@ -94,7 +94,7 @@ public class CalciteUnionQueryMSQTest extends
CalciteUnionQueryTest
final MSQTestOverlordServiceClient indexingServiceClient = new
MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
- new MSQTestTaskActionClient(queryJsonMapper),
+ new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index ba78e213ca4..36301a5fe0f 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -503,7 +503,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(),
anyString());
- testTaskActionClient = Mockito.spy(new
MSQTestTaskActionClient(objectMapper));
+ testTaskActionClient = Mockito.spy(new
MSQTestTaskActionClient(objectMapper, injector));
indexingServiceClient = new MSQTestOverlordServiceClient(
objectMapper,
injector,
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 85592fd0c53..5ab8932de3e 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -50,7 +50,6 @@ import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
@@ -104,20 +103,6 @@ public class MSQTestControllerContext implements
ControllerContext
this.injector = injector;
this.taskActionClient = taskActionClient;
coordinatorClient = Mockito.mock(CoordinatorClient.class);
- Mockito.when(coordinatorClient.fetchUsedSegments(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.anyList()
- )
- ).thenAnswer(invocation ->
- Futures.immediateFuture(
-
injector.getInstance(SpecificSegmentsQuerySegmentWalker.class)
- .getSegments()
- .stream()
- .filter(dataSegment ->
dataSegment.getDataSource()
-
.equals(invocation.getArguments()[0]))
- .collect(Collectors.toList())
- )
- );
Mockito.when(coordinatorClient.fetchServerViewSegments(
ArgumentMatchers.anyString(),
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
index 31b3272b74f..5192aafccdc 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
@@ -23,9 +23,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.inject.Injector;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.LockListAction;
+import
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
@@ -39,6 +41,7 @@ import
org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFaultTest;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
@@ -62,12 +65,15 @@ public class MSQTestTaskActionClient implements
TaskActionClient
"foo2", ImmutableList.of(Intervals.of("2000-01-01/P1D"))
);
private final Set<DataSegment> publishedSegments = new HashSet<>();
+ private final Injector injector;
public MSQTestTaskActionClient(
- ObjectMapper mapper
+ ObjectMapper mapper,
+ Injector injector
)
{
this.mapper = mapper;
+ this.injector = injector;
}
@Override
@@ -122,6 +128,14 @@ public class MSQTestTaskActionClient implements
TaskActionClient
.build()
).collect(Collectors.toSet());
}
+ } else if (taskAction instanceof RetrieveSegmentsToReplaceAction) {
+ String dataSource = ((RetrieveSegmentsToReplaceAction)
taskAction).getDataSource();
+ return (RetType)
injector.getInstance(SpecificSegmentsQuerySegmentWalker.class)
+ .getSegments()
+ .stream()
+ .filter(dataSegment ->
dataSegment.getDataSource()
+
.equals(dataSource))
+ .collect(Collectors.toSet());
} else if (taskAction instanceof SegmentTransactionalInsertAction) {
final Set<DataSegment> segments = ((SegmentTransactionalInsertAction)
taskAction).getSegments();
publishedSegments.addAll(segments);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]