This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5c9d524e898 Add new task distribution strategy that assigns tasks to
workers based on supervisor affinity (#18634)
5c9d524e898 is described below
commit 5c9d524e8980fcb7c169c549be31ef4fd84aceed
Author: PANKAJ KUMAR <[email protected]>
AuthorDate: Mon Oct 27 13:46:15 2025 +0530
Add new task distribution strategy that assigns tasks to workers based on
supervisor affinity (#18634)
Changes:
- Add field `WorkerCategorySpec.supervisorIdCategoryAffinity`
- This is a map from supervisor ID to worker category name
- While assigning workers in `WorkerSelectUtils`, check in this map, then
the `categoryAffinity` datasource level map
---
.../overlord/setup/WorkerCategorySpec.java | 27 +++-
.../indexing/overlord/setup/WorkerSelectUtils.java | 24 +++-
...onWithCategorySpecWorkerSelectStrategyTest.java | 137 +++++++++++++++++++-
...tyWithCategorySpecWorkerSelectStrategyTest.java | 142 ++++++++++++++++++++-
.../overlord/setup/WorkerCategorySpecTest.java | 2 +-
.../TestSeekableStreamIndexTask.java | 90 +++++++++++++
.../SeekableStreamSupervisorStateTest.java | 106 ++++-----------
7 files changed, 429 insertions(+), 99 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java
index 9f58d62d05a..410aad78a91 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java
@@ -21,7 +21,9 @@ package org.apache.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@@ -86,17 +88,19 @@ public class WorkerCategorySpec
public static class CategoryConfig
{
private final String defaultCategory;
- // key: datasource, value: category
private final Map<String, String> categoryAffinity;
+ private final Map<String, String> supervisorIdCategoryAffinity;
@JsonCreator
public CategoryConfig(
@JsonProperty("defaultCategory") String defaultCategory,
- @JsonProperty("categoryAffinity") Map<String, String> categoryAffinity
+ @JsonProperty("categoryAffinity") Map<String, String> categoryAffinity,
+ @JsonProperty("supervisorIdCategoryAffinity") @Nullable Map<String,
String> supervisorIdCategoryAffinity
)
{
this.defaultCategory = defaultCategory;
this.categoryAffinity = categoryAffinity == null ?
Collections.emptyMap() : categoryAffinity;
+ this.supervisorIdCategoryAffinity =
Configs.valueOrDefault(supervisorIdCategoryAffinity, Map.of());
}
@JsonProperty
@@ -105,12 +109,25 @@ public class WorkerCategorySpec
return defaultCategory;
}
+ /**
+ * Returns a map from datasource name to the worker category name to be
used for tasks of that datasource.
+ */
@JsonProperty
public Map<String, String> getCategoryAffinity()
{
return categoryAffinity;
}
+ /**
+ * Returns a map from supervisor ID to worker category name to be used for
tasks of that supervisor.
+ * This takes precedence over {@link #getCategoryAffinity()} when both are
configured.
+ */
+ @JsonProperty
+ public Map<String, String> getSupervisorIdCategoryAffinity()
+ {
+ return supervisorIdCategoryAffinity;
+ }
+
@Override
public boolean equals(final Object o)
{
@@ -122,13 +139,14 @@ public class WorkerCategorySpec
}
final CategoryConfig that = (CategoryConfig) o;
return Objects.equals(defaultCategory, that.defaultCategory) &&
- Objects.equals(categoryAffinity, that.categoryAffinity);
+ Objects.equals(categoryAffinity, that.categoryAffinity) &&
+ Objects.equals(supervisorIdCategoryAffinity,
that.supervisorIdCategoryAffinity);
}
@Override
public int hashCode()
{
- return Objects.hash(defaultCategory, categoryAffinity);
+ return Objects.hash(defaultCategory, categoryAffinity,
supervisorIdCategoryAffinity);
}
@Override
@@ -137,6 +155,7 @@ public class WorkerCategorySpec
return "CategoryConfig{" +
"defaultCategory=" + defaultCategory +
", categoryAffinity=" + categoryAffinity +
+ ", supervisorIdCategoryAffinity=" + supervisorIdCategoryAffinity +
'}';
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
index be4e8426008..e8c75a814bd 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import javax.annotation.Nullable;
import java.util.Collections;
@@ -120,10 +121,25 @@ public class WorkerSelectUtils
if (categoryConfig != null) {
final String defaultCategory = categoryConfig.getDefaultCategory();
final Map<String, String> categoryAffinity =
categoryConfig.getCategoryAffinity();
-
- String preferredCategory = categoryAffinity.get(task.getDataSource());
- // If there is no preferred category for the datasource, then using
the defaultCategory. However, the defaultCategory
- // may be null too, so we need to do one more null check (see below).
+ final Map<String, String> supervisorIdCategoryAffinity =
categoryConfig.getSupervisorIdCategoryAffinity();
+
+ String preferredCategory = null;
+
+ // First, check if this task has a supervisorId and if there's a
category affinity for it
+ if (task instanceof SeekableStreamIndexTask) {
+ final String supervisorId = ((SeekableStreamIndexTask<?, ?, ?>)
task).getSupervisorId();
+ if (supervisorId != null) {
+ preferredCategory = supervisorIdCategoryAffinity.get(supervisorId);
+ }
+ }
+
+ // If no supervisor-based category is found, fall back to
datasource-based category affinity
+ if (preferredCategory == null) {
+ preferredCategory = categoryAffinity.get(task.getDataSource());
+ }
+
+ // If there is no preferred category for the supervisorId or
datasource, then use the defaultCategory.
+ // However, the defaultCategory may be null too, so we need to do one
more null check (see below).
preferredCategory = preferredCategory == null ? defaultCategory :
preferredCategory;
if (preferredCategory != null) {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
index 6ac04a8920f..7ac8b2b746a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
@@ -20,14 +20,26 @@
package org.apache.druid.indexing.overlord.setup;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec;
import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.segment.indexing.DataSchema;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
+import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.HashSet;
public class EqualDistributionWithCategorySpecWorkerSelectStrategyTest
@@ -80,7 +92,8 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
"noop",
new WorkerCategorySpec.CategoryConfig(
"c2",
- ImmutableMap.of("ds1", "c2")
+ ImmutableMap.of("ds1", "c2"),
+ null
)
),
false
@@ -95,7 +108,8 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
"noop",
new WorkerCategorySpec.CategoryConfig(
null,
- ImmutableMap.of("ds1", "c2")
+ ImmutableMap.of("ds1", "c2"),
+ null
)
),
false
@@ -110,6 +124,7 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
"noop",
new WorkerCategorySpec.CategoryConfig(
"c2",
+ null,
null
)
),
@@ -127,6 +142,7 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
+ null,
null,
null
)
@@ -146,7 +162,8 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
- ImmutableMap.of("ds1", "c3")
+ ImmutableMap.of("ds1", "c3"),
+ null
)
),
false
@@ -164,7 +181,8 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
- ImmutableMap.of("ds1", "c3")
+ ImmutableMap.of("ds1", "c3"),
+ null
)
),
true
@@ -174,6 +192,94 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
Assert.assertNull(worker);
}
+ @Test
+ public void testSupervisorIdCategoryAffinity()
+ {
+ final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
+ ImmutableMap.of(
+ "test_seekable_stream",
+ new WorkerCategorySpec.CategoryConfig(
+ "c1",
+ ImmutableMap.of("ds1", "c1"),
+ ImmutableMap.of("supervisor1", "c2")
+ )
+ ),
+ false
+ );
+ final Task taskWithSupervisor = createTestTask("task1", "supervisor1",
"ds1");
+
+ final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
+ new
EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
+
+ ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ WORKERS_FOR_TIER_TESTS,
+ taskWithSupervisor
+ );
+ Assert.assertNotNull(worker);
+ Assert.assertEquals("c2", worker.getWorker().getCategory());
+ Assert.assertEquals("localhost3", worker.getWorker().getHost());
+ }
+
+ @Test
+ public void testSupervisorIdCategoryAffinityFallbackToDatasource()
+ {
+ final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
+ ImmutableMap.of(
+ "test_seekable_stream",
+ new WorkerCategorySpec.CategoryConfig(
+ "c2",
+ ImmutableMap.of("ds1", "c1"),
+ ImmutableMap.of("supervisor2", "c2")
+ )
+ ),
+ false
+ );
+ final Task taskWithSupervisor = createTestTask("task1", "supervisor1",
"ds1");
+
+ final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
+ new
EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
+
+ ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ WORKERS_FOR_TIER_TESTS,
+ taskWithSupervisor
+ );
+ Assert.assertNotNull(worker);
+ Assert.assertEquals("c1", worker.getWorker().getCategory());
+ Assert.assertEquals("localhost1", worker.getWorker().getHost());
+ }
+
+ @Test
+ public void testSupervisorIdCategoryAffinityFallbackToDefault()
+ {
+ final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
+ ImmutableMap.of(
+ "test_seekable_stream",
+ new WorkerCategorySpec.CategoryConfig(
+ "c2",
+ ImmutableMap.of("ds2", "c1"),
+ ImmutableMap.of("supervisor2", "c1")
+ )
+ ),
+ false
+ );
+
+ final Task taskWithSupervisor = createTestTask("task1", "supervisor1",
"ds1");
+
+ final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
+ new
EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
+
+ ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ WORKERS_FOR_TIER_TESTS,
+ taskWithSupervisor
+ );
+ Assert.assertNotNull(worker);
+ Assert.assertEquals("c2", worker.getWorker().getCategory());
+ Assert.assertEquals("localhost3", worker.getWorker().getHost());
+ }
+
private ImmutableWorkerInfo selectWorker(WorkerCategorySpec
workerCategorySpec)
{
final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
@@ -187,4 +293,27 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
return worker;
}
+
+ /**
+ * Helper method to create a test task with supervisor ID for testing
+ */
+ @SuppressWarnings("unchecked")
+ private static Task createTestTask(String id, @Nullable String supervisorId,
String datasource)
+ {
+ return new TestSeekableStreamIndexTask(
+ id,
+ supervisorId,
+ null,
+ DataSchema.builder()
+ .withDataSource(datasource)
+ .withTimestamp(new TimestampSpec(null, null, null))
+ .withDimensions(new DimensionsSpec(Collections.emptyList()))
+ .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(),
Collections.emptyList()))
+ .build(),
+ Mockito.mock(SeekableStreamIndexTaskTuningConfig.class),
+ Mockito.mock(SeekableStreamIndexTaskIOConfig.class),
+ null,
+ null
+ );
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
index 880ef743dca..ea4ffb16af5 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
@@ -20,14 +20,26 @@
package org.apache.druid.indexing.overlord.setup;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec;
import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.segment.indexing.DataSchema;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
+import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.HashSet;
public class FillCapacityWithCategorySpecWorkerSelectStrategyTest
@@ -80,7 +92,8 @@ public class
FillCapacityWithCategorySpecWorkerSelectStrategyTest
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
- ImmutableMap.of("ds1", "c1")
+ ImmutableMap.of("ds1", "c1"),
+ null
)
),
false
@@ -95,7 +108,8 @@ public class
FillCapacityWithCategorySpecWorkerSelectStrategyTest
"noop",
new WorkerCategorySpec.CategoryConfig(
null,
- ImmutableMap.of("ds1", "c1")
+ ImmutableMap.of("ds1", "c1"),
+ null
)
),
false
@@ -110,6 +124,7 @@ public class
FillCapacityWithCategorySpecWorkerSelectStrategyTest
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
+ null,
null
)
),
@@ -127,6 +142,7 @@ public class
FillCapacityWithCategorySpecWorkerSelectStrategyTest
ImmutableMap.of(
"noop",
new WorkerCategorySpec.CategoryConfig(
+ null,
null,
null
)
@@ -146,7 +162,8 @@ public class
FillCapacityWithCategorySpecWorkerSelectStrategyTest
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
- ImmutableMap.of("ds1", "c3")
+ ImmutableMap.of("ds1", "c3"),
+ null
)
),
false
@@ -164,7 +181,8 @@ public class
FillCapacityWithCategorySpecWorkerSelectStrategyTest
"noop",
new WorkerCategorySpec.CategoryConfig(
"c1",
- ImmutableMap.of("ds1", "c3")
+ ImmutableMap.of("ds1", "c3"),
+ null
)
),
true
@@ -174,6 +192,99 @@ public class
FillCapacityWithCategorySpecWorkerSelectStrategyTest
Assert.assertNull(worker);
}
+ @Test
+ public void testSupervisorIdCategoryAffinity()
+ {
+ final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
+ ImmutableMap.of(
+ "test_seekable_stream",
+ new WorkerCategorySpec.CategoryConfig(
+ "c1",
+ ImmutableMap.of("ds1", "c1"),
+ ImmutableMap.of("supervisor1", "c2")
+ )
+ ),
+ false
+ );
+
+ // Create a test task with supervisor ID "supervisor1"
+ final Task taskWithSupervisor = createTestTask("task1", "supervisor1",
"ds1");
+
+ final FillCapacityWithCategorySpecWorkerSelectStrategy strategy =
+ new
FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
+
+ ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ WORKERS_FOR_TIER_TESTS,
+ taskWithSupervisor
+ );
+ Assert.assertNotNull(worker);
+ Assert.assertEquals("c2", worker.getWorker().getCategory());
+ Assert.assertEquals("localhost3", worker.getWorker().getHost());
+ }
+
+ @Test
+ public void testSupervisorIdCategoryAffinityFallbackToDatasource()
+ {
+ final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
+ ImmutableMap.of(
+ "test_seekable_stream",
+ new WorkerCategorySpec.CategoryConfig(
+ "c2",
+ ImmutableMap.of("ds1", "c1"),
+ ImmutableMap.of("supervisor2", "c2")
+ )
+ ),
+ false
+ );
+
+ // Create a test task with supervisor ID "supervisor1" (not in
supervisorIdCategoryAffinity map)
+ final Task taskWithSupervisor = createTestTask("task1", "supervisor1",
"ds1");
+
+ final FillCapacityWithCategorySpecWorkerSelectStrategy strategy =
+ new
FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
+
+ ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ WORKERS_FOR_TIER_TESTS,
+ taskWithSupervisor
+ );
+ Assert.assertNotNull(worker);
+ Assert.assertEquals("c1", worker.getWorker().getCategory());
+ Assert.assertEquals("localhost1", worker.getWorker().getHost());
+ }
+
+ @Test
+ public void testSupervisorIdCategoryAffinityFallbackToDefault()
+ {
+ final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
+ ImmutableMap.of(
+ "test_seekable_stream",
+ new WorkerCategorySpec.CategoryConfig(
+ "c2",
+ ImmutableMap.of("ds2", "c1"),
+ ImmutableMap.of("supervisor2", "c1")
+ )
+ ),
+ false
+ );
+
+ // Create a test task with supervisor ID "supervisor1" and datasource "ds1"
+ final Task taskWithSupervisor = createTestTask("task1", "supervisor1",
"ds1");
+
+ final FillCapacityWithCategorySpecWorkerSelectStrategy strategy =
+ new
FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
+
+ ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ WORKERS_FOR_TIER_TESTS,
+ taskWithSupervisor
+ );
+ Assert.assertNotNull(worker);
+ Assert.assertEquals("c2", worker.getWorker().getCategory());
+ Assert.assertEquals("localhost3", worker.getWorker().getHost());
+ }
+
private ImmutableWorkerInfo selectWorker(WorkerCategorySpec
workerCategorySpec)
{
final FillCapacityWithCategorySpecWorkerSelectStrategy strategy =
@@ -187,4 +298,27 @@ public class
FillCapacityWithCategorySpecWorkerSelectStrategyTest
return worker;
}
+
+ /**
+ * Helper method to create a test task with supervisor ID for testing
+ */
+ @SuppressWarnings("unchecked")
+ private static Task createTestTask(String id, @Nullable String supervisorId,
String datasource)
+ {
+ return new TestSeekableStreamIndexTask(
+ id,
+ supervisorId,
+ null,
+ DataSchema.builder()
+ .withDataSource(datasource)
+ .withTimestamp(new TimestampSpec(null, null, null))
+ .withDimensions(new DimensionsSpec(Collections.emptyList()))
+ .withGranularity(new ArbitraryGranularitySpec(new
AllGranularity(), Collections.emptyList()))
+ .build(),
+ Mockito.mock(SeekableStreamIndexTaskTuningConfig.class),
+ Mockito.mock(SeekableStreamIndexTaskIOConfig.class),
+ null,
+ null
+ );
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java
index 4277984fc10..9576aa0ffbb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java
@@ -58,7 +58,7 @@ public class WorkerCategorySpecTest
Assert.assertTrue(workerCategorySpec.isStrong());
Assert.assertEquals(ImmutableMap.of(
"index_kafka",
- new WorkerCategorySpec.CategoryConfig("c1", ImmutableMap.of("ds1",
"c2"))
+ new WorkerCategorySpec.CategoryConfig("c1", ImmutableMap.of("ds1",
"c2"), null)
), workerCategorySpec.getCategoryMap());
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
new file mode 100644
index 00000000000..985aa7da706
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.segment.indexing.DataSchema;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * Test implementation of SeekableStreamIndexTask for use in unit tests.
+ */
+public class TestSeekableStreamIndexTask extends
SeekableStreamIndexTask<String, String, ByteEntity>
+{
+ private final SeekableStreamIndexTaskRunner<String, String, ByteEntity>
streamingTaskRunner;
+ private final RecordSupplier<String, String, ByteEntity> recordSupplier;
+
+ public TestSeekableStreamIndexTask(
+ String id,
+ @Nullable String supervisorId,
+ @Nullable TaskResource taskResource,
+ DataSchema dataSchema,
+ SeekableStreamIndexTaskTuningConfig tuningConfig,
+ SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
+ @Nullable Map<String, Object> context,
+ @Nullable String groupId
+ )
+ {
+ this(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig,
context, groupId, null, null);
+ }
+
+ public TestSeekableStreamIndexTask(
+ String id,
+ @Nullable String supervisorId,
+ @Nullable TaskResource taskResource,
+ DataSchema dataSchema,
+ SeekableStreamIndexTaskTuningConfig tuningConfig,
+ SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
+ @Nullable Map<String, Object> context,
+ @Nullable String groupId,
+ @Nullable SeekableStreamIndexTaskRunner<String, String, ByteEntity>
streamingTaskRunner,
+ @Nullable RecordSupplier<String, String, ByteEntity> recordSupplier
+ )
+ {
+ super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig,
context, groupId);
+ this.streamingTaskRunner = streamingTaskRunner;
+ this.recordSupplier = recordSupplier;
+ }
+
+ @Nullable
+ @Override
+ protected SeekableStreamIndexTaskRunner<String, String, ByteEntity>
createTaskRunner()
+ {
+ return streamingTaskRunner;
+ }
+
+ @Override
+ protected RecordSupplier<String, String, ByteEntity>
newTaskRecordSupplier(final TaskToolbox toolbox)
+ {
+ return recordSupplier;
+ }
+
+ @Override
+ public String getType()
+ {
+ return "test_seekable_stream";
+ }
+}
+
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index e204a67cae8..5df9edd184d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -39,10 +39,8 @@ import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
-import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
@@ -65,6 +63,7 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
@@ -1151,7 +1150,9 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
taskTuningConfig,
taskIoConfig,
context,
- "0"
+ "0",
+ null,
+ recordSupplier
);
TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
@@ -1162,7 +1163,9 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
taskTuningConfig,
taskIoConfig,
context,
- "0"
+ "0",
+ null,
+ recordSupplier
);
TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask(
@@ -1173,7 +1176,9 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
taskTuningConfig,
taskIoConfig,
context,
- "0"
+ "0",
+ null,
+ recordSupplier
);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
@@ -1364,7 +1369,9 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
ioConfig
),
context,
- "0"
+ "0",
+ null,
+ recordSupplier
);
TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
@@ -1384,7 +1391,9 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
ioConfig
),
context,
- "1"
+ "1",
+ null,
+ recordSupplier
);
TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask(
@@ -1404,7 +1413,9 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
ioConfig
),
context,
- "2"
+ "2",
+ null,
+ recordSupplier
);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
@@ -1596,7 +1607,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
),
context,
"0",
- streamingTaskRunner
+ streamingTaskRunner,
+ recordSupplier
);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
@@ -2907,78 +2919,6 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
};
}
- private class TestSeekableStreamIndexTask extends
SeekableStreamIndexTask<String, String, ByteEntity>
- {
- private final SeekableStreamIndexTaskRunner<String, String, ByteEntity>
streamingTaskRunner;
-
- public TestSeekableStreamIndexTask(
- String id,
- @Nullable String supervisorId,
- @Nullable TaskResource taskResource,
- DataSchema dataSchema,
- SeekableStreamIndexTaskTuningConfig tuningConfig,
- SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
- @Nullable Map<String, Object> context,
- @Nullable String groupId
- )
- {
- this(
- id,
- supervisorId,
- taskResource,
- dataSchema,
- tuningConfig,
- ioConfig,
- context,
- groupId,
- null
- );
- }
-
- public TestSeekableStreamIndexTask(
- String id,
- @Nullable String supervisorId,
- @Nullable TaskResource taskResource,
- DataSchema dataSchema,
- SeekableStreamIndexTaskTuningConfig tuningConfig,
- SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
- @Nullable Map<String, Object> context,
- @Nullable String groupId,
- @Nullable SeekableStreamIndexTaskRunner<String, String, ByteEntity>
streamingTaskRunner
- )
- {
- super(
- id,
- supervisorId,
- taskResource,
- dataSchema,
- tuningConfig,
- ioConfig,
- context,
- groupId
- );
- this.streamingTaskRunner = streamingTaskRunner;
- }
-
- @Nullable
- @Override
- protected SeekableStreamIndexTaskRunner<String, String, ByteEntity>
createTaskRunner()
- {
- return streamingTaskRunner;
- }
-
- @Override
- protected RecordSupplier<String, String, ByteEntity>
newTaskRecordSupplier(final TaskToolbox toolbox)
- {
- return recordSupplier;
- }
-
- @Override
- public String getType()
- {
- return "test";
- }
- }
private abstract class BaseTestSeekableStreamSupervisor extends
SeekableStreamSupervisor<String, String, ByteEntity>
{
@@ -3069,7 +3009,9 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
taskTuningConfig,
taskIoConfig,
null,
- null
+ null,
+ null,
+ recordSupplier
));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]