This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4aedee9edf8 Improve lag-based autoscaler config persistence (#18745)
4aedee9edf8 is described below
commit 4aedee9edf8c7a9d16d964a77f55ce44691a7509
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Fri Dec 12 11:03:10 2025 +0200
Improve lag-based autoscaler config persistence (#18745)
Changes:
* Add `Supervisor.merge()` to merge task count from existing running
supervisor
* Fix up priority of `taskCount` vs `taskCountStart` vs `taskCountMin`
---
.../overlord/supervisor/SupervisorManager.java | 28 ++--
.../supervisor/SeekableStreamSupervisor.java | 24 ++-
.../supervisor/SeekableStreamSupervisorSpec.java | 45 ++++-
.../supervisor/autoscaler/LagBasedAutoScaler.java | 28 ++--
.../autoscaler/LagBasedAutoScalerConfig.java | 4 +-
.../supervisor/SupervisorResourceTest.java | 181 ++++++++++++++++++++-
.../SeekableStreamSupervisorSpecTest.java | 142 +++++++++++++---
.../overlord/supervisor/SupervisorSpec.java | 12 ++
8 files changed, 404 insertions(+), 60 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 168b956afd2..21d2a626501 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -174,7 +174,10 @@ public class SupervisorManager
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
- possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
+ SupervisorSpec existingSpec =
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
+ if (existingSpec != null) {
+ spec.merge(existingSpec);
+ }
createAndStartSupervisorInternal(spec, shouldUpdateSpec);
return shouldUpdateSpec;
}
@@ -183,6 +186,7 @@ public class SupervisorManager
/**
* Checks whether the submitted SupervisorSpec differs from the current spec
in SupervisorManager's supervisor list.
* This is used in SupervisorResource specPost to determine whether the
Supervisor needs to be restarted
+ *
* @param spec The spec submitted
* @return boolean - true only if the spec has been modified, false otherwise
*/
@@ -221,7 +225,7 @@ public class SupervisorManager
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
- return possiblyStopAndRemoveSupervisorInternal(id, true);
+ return possiblyStopAndRemoveSupervisorInternal(id, true) != null;
}
}
@@ -299,7 +303,8 @@ public class SupervisorManager
log.info("SupervisorManager stopped.");
}
- public List<VersionedSupervisorSpec> getSupervisorHistoryForId(String id,
@Nullable Integer limit) throws IllegalArgumentException
+ public List<VersionedSupervisorSpec> getSupervisorHistoryForId(String id,
@Nullable Integer limit)
+ throws IllegalArgumentException
{
return metadataSupervisorManager.getAllForId(id, limit);
}
@@ -429,13 +434,14 @@ public class SupervisorManager
* Caller should have acquired [lock] before invoking this method to avoid
contention with other threads that may be
* starting, stopping, suspending and resuming supervisors.
*
- * @return true if a supervisor was stopped, false if there was no
supervisor with this id
+ * @return reference to existing supervisor, if exists and was stopped, null
if there was no supervisor with this id
*/
- private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean
writeTombstone)
+ @Nullable
+ private SupervisorSpec possiblyStopAndRemoveSupervisorInternal(String id,
boolean writeTombstone)
{
Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
- if (pair == null) {
- return false;
+ if (pair == null || pair.rhs == null || pair.lhs == null) {
+ return null;
}
if (writeTombstone) {
@@ -447,13 +453,13 @@ public class SupervisorManager
pair.lhs.stop(true);
supervisors.remove(id);
- SupervisorTaskAutoScaler autoscler = autoscalers.get(id);
- if (autoscler != null) {
- autoscler.stop();
+ SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+ if (autoscaler != null) {
+ autoscaler.stop();
autoscalers.remove(id);
}
- return true;
+ return pair.rhs;
}
/**
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 0f2839e7fba..f69dc52159b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -135,8 +135,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
- * this class is the parent class of both the Kafka and Kinesis supervisor.
All the main run loop
- * logic are similar enough so they're grouped together into this class.
+ * This class is the parent class of both the Kafka and Kinesis supervisor.
All the main run loop
+ * logic is similar enough, so they're grouped together into this class.
* <p>
* Supervisor responsible for managing the SeekableStreamIndexTasks
(Kafka/Kinesis) for a single dataSource. At a high level, the class accepts a
* {@link SeekableStreamSupervisorSpec} which includes the stream name (topic
/ stream) and configuration as well as an ingestion spec which will
@@ -541,10 +541,20 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
/**
* This method determines how to do scale actions based on collected lag
points.
- * If scale action is triggered :
- * First of all, call gracefulShutdownInternal() which will change the state
of current datasource ingest tasks from reading to publishing.
- * Secondly, clear all the stateful data structures:
activelyReadingTaskGroups, partitionGroups, partitionOffsets,
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in
the next 'RunNotice'.
- * Finally, change the taskCount in SeekableStreamSupervisorIOConfig and
sync it to MetadataStorage.
+ * If scale action is triggered:
+ * <ul>
+ * <li>First, call <code>gracefulShutdownInternal()</code> which will change
the state of current datasource ingest tasks from reading to publishing.
+ * <li>Secondly, clear all the stateful data structures:
+ * <ul>
+ * <li><code>activelyReadingTaskGroups</code>,
+ * <li><code>partitionGroups</code>,
+ * <li><code>partitionOffsets</code>,
+ * <li><code>pendingCompletionTaskGroups</code>,
+ * <li><code>partitionIds</code>.
+ * </ul>
+ * These structures will be rebuiled in the next 'RunNotice'.
+ * <li>Finally, change the <code>taskCount</code> in
<code>SeekableStreamSupervisorIOConfig</code> and sync it to
<code>MetadataStorage</code>.
+ * </ul>
* After the taskCount is changed in SeekableStreamSupervisorIOConfig, next
RunNotice will create scaled number of ingest tasks without resubmitting the
supervisor.
*
* @param desiredActiveTaskCount desired taskCount computed from AutoScaler
@@ -916,7 +926,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;
- // snapshots latest sequences from stream to be verified in next run cycle
of inactive stream check
+ // snapshots latest sequences from the stream to be verified in the next run
cycle of inactive stream check
private final Map<PartitionIdType, SequenceOffsetType>
previousSequencesFromStream = new HashMap<>();
private long lastActiveTimeMillis;
private final IdleConfig idleConfig;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 967652673f7..f21e073f6c4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -43,16 +43,18 @@ import
org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;
public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
{
- protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE =
"Update of the input source stream from [%s] to [%s] is not supported for a
running supervisor."
- + "%nTo
perform the update safely, follow these steps:"
- + "%n(1)
Suspend this supervisor, reset its offsets and then terminate it. "
- + "%n(2)
Create a new supervisor with the new input source stream."
- + "%nNote
that doing the reset can cause data duplication or loss if any topic used in
the old supervisor is included in the new one too.";
+ protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE =
+ "Update of the input source stream from [%s] to [%s] is not supported
for a running supervisor."
+ + "%nTo perform the update safely, follow these steps:"
+ + "%n(1) Suspend this supervisor, reset its offsets and then terminate
it. "
+ + "%n(2) Create a new supervisor with the new input source stream."
+ + "%nNote that doing the reset can cause data duplication or loss if any
topic used in the old supervisor is included in the new one too.";
private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema(
SeekableStreamSupervisorIngestionSpec ingestionSchema
@@ -183,6 +185,7 @@ public abstract class SeekableStreamSupervisorSpec
implements SupervisorSpec
/**
* An autoScaler instance will be returned depending on the
autoScalerConfig. In case autoScalerConfig is null or autoScaler is disabled
then NoopTaskAutoScaler will be returned.
+ *
* @param supervisor
* @return autoScaler
*/
@@ -232,6 +235,7 @@ public abstract class SeekableStreamSupervisorSpec
implements SupervisorSpec
* <li>You cannot migrate between types of supervisors.</li>
* <li>You cannot change the input source stream of a running
supervisor.</li>
* </ul>
+ *
* @param proposedSpec the proposed supervisor spec
* @throws DruidException if the proposed spec update is not allowed
*/
@@ -240,7 +244,9 @@ public abstract class SeekableStreamSupervisorSpec
implements SupervisorSpec
{
if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) {
throw InvalidInput.exception(
- "Cannot update supervisor spec from type[%s] to type[%s]",
getClass().getSimpleName(), proposedSpec.getClass().getSimpleName()
+ "Cannot update supervisor spec from type[%s] to type[%s]",
+ getClass().getSimpleName(),
+ proposedSpec.getClass().getSimpleName()
);
}
SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec)
proposedSpec;
@@ -255,6 +261,33 @@ public abstract class SeekableStreamSupervisorSpec
implements SupervisorSpec
}
}
+ @Override
+ public void merge(@NotNull SupervisorSpec existingSpec)
+ {
+ AutoScalerConfig thisAutoScalerConfig =
this.getIoConfig().getAutoScalerConfig();
+ // Either if autoscaler is absent or taskCountStart is specified - just
return.
+ if (thisAutoScalerConfig == null ||
thisAutoScalerConfig.getTaskCountStart() != null) {
+ return;
+ }
+
+ // Use a switch expression with pattern matching when we move to Java 21
as a minimum requirement.
+ if (existingSpec instanceof SeekableStreamSupervisorSpec) {
+ SeekableStreamSupervisorSpec spec = (SeekableStreamSupervisorSpec)
existingSpec;
+ AutoScalerConfig autoScalerConfig =
spec.getIoConfig().getAutoScalerConfig();
+ if (autoScalerConfig == null) {
+ return;
+ }
+ // provided `taskCountStart` > provided `taskCount` > existing
`taskCount` > provided `taskCountMin`.
+ int taskCount = thisAutoScalerConfig.getTaskCountMin();
+ if (this.getIoConfig().getTaskCount() != null) {
+ taskCount = this.getIoConfig().getTaskCount();
+ } else if (spec.getIoConfig().getTaskCount() != null) {
+ taskCount = spec.getIoConfig().getTaskCount();
+ }
+ this.getIoConfig().setTaskCount(taskCount);
+ }
+ }
+
protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean
suspend);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index b1446c3d4c1..142193ae63f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -124,8 +124,10 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
);
log.info(
"LagBasedAutoScaler will collect lag every [%d] millis and will keep
up to [%d] data points for the last [%d] millis for dataSource [%s]",
- lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
lagMetricsQueue.maxSize(),
- lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+ lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+ lagMetricsQueue.maxSize(),
+ lagBasedAutoScalerConfig.getLagCollectionRangeMillis(),
+ dataSource
);
}
@@ -192,19 +194,25 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
/**
* This method determines whether to do scale actions based on collected lag
points.
- * Current algorithm of scale is simple:
- * First of all, compute the proportion of lag points higher/lower than
scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
- * Secondly, compare scaleOutThreshold/scaleInThreshold with
triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. P.S. Scale
out action has higher priority than scale in action.
- * Finaly, if scaleOutThreshold/scaleInThreshold is higher than
triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in
action would be triggered.
+ * The current algorithm of scale is straightforward:
+ * <ul>
+ * <li>First, compute the proportion of lag points higher/lower than {@code
scaleOutThreshold/scaleInThreshold},
+ * getting {@code scaleInThreshold/scaleOutThreshold},.
+ * <li>Secondly, compare {@code scaleInThreshold/scaleOutThreshold} with
+ * {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}.
+ * <ul><li>P.S. Scale out action has a higher priority than scale in
action.</ul>
+ * <li>Finally, if {@code scaleOutThreshold/scaleInThreshold}, is higher than
+ * {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold},
scale out/in action would be triggered.
+ * </ul>
*
- * @param lags the lag metrics of Stream(Kafka/Kinesis)
- * @return Integer. target number of tasksCount, -1 means skip scale action.
+ * @param lags the lag metrics of Stream (Kafka/Kinesis)
+ * @return Integer, target number of tasksCount. -1 means skip scale action.
*/
private int computeDesiredTaskCount(List<Long> lags)
{
- // if supervisor is not suspended, ensure required tasks are running
+ // if the supervisor is not suspended, ensure required tasks are running
// if suspended, ensure tasks have been requested to gracefully stop
- log.debug("Computing desired task count for [%s], based on following lags
: [%s]", dataSource, lags);
+ log.debug("Computing the desired task count for [%s], based on following
lags : [%s]", dataSource, lags);
int beyond = 0;
int within = 0;
int metricsCount = lags.size();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
index b4a9b0e8891..ad036dd0e10 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -103,8 +103,8 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
this.scaleInStep = scaleInStep != null ? scaleInStep : 1;
this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
- this.minTriggerScaleActionFrequencyMillis =
minTriggerScaleActionFrequencyMillis
- != null ? minTriggerScaleActionFrequencyMillis : 600000;
+ this.minTriggerScaleActionFrequencyMillis =
+ minTriggerScaleActionFrequencyMillis != null ?
minTriggerScaleActionFrequencyMillis : 600000;
Preconditions.checkArgument(
stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 &&
stopTaskCountRatio <= 1.0),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 538fd4a9e78..0737722d341 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -27,12 +27,23 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskStorage;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
@@ -54,6 +65,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.Collections;
@@ -61,6 +73,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
@RunWith(EasyMockRunner.class)
@@ -1083,7 +1096,9 @@ public class SupervisorResourceTest extends
EasyMockSupport
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1",
null)).andReturn(versions1).times(1);
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2",
null)).andReturn(versions2).times(1);
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3",
null)).andReturn(versions3).times(1);
- EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id4",
null)).andReturn(Collections.emptyList()).times(1);
+ EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id4", null))
+ .andReturn(Collections.emptyList())
+ .times(1);
setupMockRequestForUser("notdruid");
replayAll();
@@ -1183,12 +1198,18 @@ public class SupervisorResourceTest extends
EasyMockSupport
// Test with limit=0 (should return 400 Bad Request)
response = supervisorResource.specGetHistory(request, "id1", 0);
Assert.assertEquals(400, response.getStatus());
- Assert.assertEquals(ImmutableMap.of("error", "Count must be greater than
zero if set (count was 0)"), response.getEntity());
+ Assert.assertEquals(
+ ImmutableMap.of("error", "Count must be greater than zero if set
(count was 0)"),
+ response.getEntity()
+ );
// Test with negative limit (should return 400 Bad Request)
response = supervisorResource.specGetHistory(request, "id1", -1);
Assert.assertEquals(400, response.getStatus());
- Assert.assertEquals(ImmutableMap.of("error", "Count must be greater than
zero if set (count was -1)"), response.getEntity());
+ Assert.assertEquals(
+ ImmutableMap.of("error", "Count must be greater than zero if set
(count was -1)"),
+ response.getEntity()
+ );
// Test with limit larger than available history
response = supervisorResource.specGetHistory(request, "id1", 100);
@@ -1320,6 +1341,99 @@ public class SupervisorResourceTest extends
EasyMockSupport
Assert.assertEquals(spec, specRoundTrip);
}
+ @Test
+ public void
testSpecPostMergeUsesExistingTaskCountHigherPriorityHasBeenMissed()
+ {
+ // New spec has no taskCount -> should use existing taskCount (5)
+ TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1);
+ TestSeekableStreamSupervisorSpec newSpec =
createTestSpecWithExpectedMerge(null, 2, 5);
+
+ newSpec.merge(existingSpec);
+ EasyMock.verify(newSpec.getIoConfig());
+ }
+
+ @Test
+ public void testSpecPostMergeUsesProvidedTaskCountOverExistingTaskCount()
+ {
+ // New spec has taskCount=3 -> should use provided taskCount over existing
(5)
+ TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1);
+ TestSeekableStreamSupervisorSpec newSpec =
createTestSpecWithExpectedMerge(3, 2, 3);
+
+ newSpec.merge(existingSpec);
+ EasyMock.verify(newSpec.getIoConfig());
+ }
+
+ @Test
+ public void testSpecPostMergeFallsBackToProvidedTaskCountMin()
+ {
+ // Neither has taskCount -> should fall back to taskCountMin (4)
+ TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(null, 1);
+ TestSeekableStreamSupervisorSpec newSpec =
createTestSpecWithExpectedMerge(null, 4, 4);
+
+ newSpec.merge(existingSpec);
+ EasyMock.verify(newSpec.getIoConfig());
+ }
+
+ private TestSeekableStreamSupervisorSpec createTestSpec(Integer taskCount,
int taskCountMin)
+ {
+ HashMap<String, Object> autoScalerConfig = new HashMap<>();
+ autoScalerConfig.put("enableTaskAutoScaler", true);
+ autoScalerConfig.put("taskCountMax", 10);
+ autoScalerConfig.put("taskCountMin", taskCountMin);
+
+ SeekableStreamSupervisorIOConfig ioConfig =
EasyMock.createMock(SeekableStreamSupervisorIOConfig.class);
+ EasyMock.expect(ioConfig.getAutoScalerConfig())
+ .andReturn(OBJECT_MAPPER.convertValue(autoScalerConfig,
AutoScalerConfig.class))
+ .anyTimes();
+ EasyMock.expect(ioConfig.getTaskCount()).andReturn(taskCount).anyTimes();
+ EasyMock.replay(ioConfig);
+
+ DataSchema dataSchema = EasyMock.createMock(DataSchema.class);
+
EasyMock.expect(dataSchema.getDataSource()).andReturn("datasource1").anyTimes();
+ EasyMock.replay(dataSchema);
+
+ SeekableStreamSupervisorIngestionSpec ingestionSchema =
+ EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class);
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ return new TestSeekableStreamSupervisorSpec("my-id", ingestionSchema);
+ }
+
+ private TestSeekableStreamSupervisorSpec createTestSpecWithExpectedMerge(
+ Integer taskCount,
+ int taskCountMin,
+ int expectedTaskCount
+ )
+ {
+ HashMap<String, Object> autoScalerConfig = new HashMap<>();
+ autoScalerConfig.put("enableTaskAutoScaler", true);
+ autoScalerConfig.put("taskCountMax", 10);
+ autoScalerConfig.put("taskCountMin", taskCountMin);
+
+ SeekableStreamSupervisorIOConfig ioConfig =
EasyMock.createMock(SeekableStreamSupervisorIOConfig.class);
+ EasyMock.expect(ioConfig.getAutoScalerConfig())
+ .andReturn(OBJECT_MAPPER.convertValue(autoScalerConfig,
AutoScalerConfig.class))
+ .anyTimes();
+ EasyMock.expect(ioConfig.getTaskCount()).andReturn(taskCount).anyTimes();
+ ioConfig.setTaskCount(expectedTaskCount);
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(ioConfig);
+
+ DataSchema dataSchema = EasyMock.createMock(DataSchema.class);
+
EasyMock.expect(dataSchema.getDataSource()).andReturn("datasource1").anyTimes();
+ EasyMock.replay(dataSchema);
+
+ SeekableStreamSupervisorIngestionSpec ingestionSchema =
+ EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class);
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ return new TestSeekableStreamSupervisorSpec("my-id", ingestionSchema);
+ }
+
private void setupMockRequest()
{
setupMockRequestForUser("druid");
@@ -1445,10 +1559,10 @@ public class SupervisorResourceTest extends
EasyMockSupport
if (getId() != null ? !getId().equals(that.getId()) : that.getId() !=
null) {
return false;
}
- if (supervisor != null ? !supervisor.equals(that.supervisor) :
that.supervisor != null) {
+ if (!Objects.equals(supervisor, that.supervisor)) {
return false;
}
- if (datasources != null ? !datasources.equals(that.datasources) :
that.datasources != null) {
+ if (!Objects.equals(datasources, that.datasources)) {
return false;
}
return isSuspended() == that.isSuspended();
@@ -1464,4 +1578,61 @@ public class SupervisorResourceTest extends
EasyMockSupport
return result;
}
}
+
+ static class TestSeekableStreamSupervisorSpec extends
SeekableStreamSupervisorSpec
+ {
+ public TestSeekableStreamSupervisorSpec(
+ @Nullable String id,
+ SeekableStreamSupervisorIngestionSpec ingestionSchema
+ )
+ {
+ super(
+ id,
+ ingestionSchema,
+ null,
+ false,
+ EasyMock.createMock(TaskStorage.class),
+ EasyMock.createMock(TaskMaster.class),
+ EasyMock.createMock(IndexerMetadataStorageCoordinator.class),
+ EasyMock.createMock(SeekableStreamIndexTaskClientFactory.class),
+ OBJECT_MAPPER,
+ EasyMock.createMock(ServiceEmitter.class),
+ EasyMock.createMock(DruidMonitorSchedulerConfig.class),
+ EasyMock.createMock(RowIngestionMetersFactory.class),
+ EasyMock.createMock(SupervisorStateManagerConfig.class)
+ );
+ }
+
+ @Override
+ public Supervisor createSupervisor()
+ {
+ return null;
+ }
+
+ @Override
+ public String getType()
+ {
+ return "test";
+ }
+
+ @Override
+ public String getSource()
+ {
+ return "test-stream";
+ }
+
+ @Override
+ protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
+ {
+ return null;
+ }
+
+ @JsonIgnore
+ @Nonnull
+ @Override
+ public Set<ResourceAction> getInputSourceResources() throws
UnsupportedOperationException
+ {
+ return Collections.singleton(new ResourceAction(new Resource("test",
ResourceType.EXTERNAL), Action.READ));
+ }
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index e96e3887318..58ffb8438e0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -566,14 +566,16 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
Exception e = null;
try {
- AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of(
- "enableTaskAutoScaler",
- "true",
- "taskCountMax",
- "1",
- "taskCountMin",
- "4"
- ), AutoScalerConfig.class);
+ AutoScalerConfig autoScalerError = mapper.convertValue(
+ ImmutableMap.of(
+ "enableTaskAutoScaler",
+ "true",
+ "taskCountMax",
+ "1",
+ "taskCountMin",
+ "4"
+ ), AutoScalerConfig.class
+ );
}
catch (RuntimeException ex) {
e = ex;
@@ -685,16 +687,18 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
EasyMock.replay(ingestionSchema);
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
- .andReturn(mapper.convertValue(ImmutableMap.of(
- "lagCollectionIntervalMillis",
- "1",
- "enableTaskAutoScaler",
- true,
- "taskCountMax",
- "4",
- "taskCountMin",
- "1"
- ), AutoScalerConfig.class))
+ .andReturn(mapper.convertValue(
+ ImmutableMap.of(
+ "lagCollectionIntervalMillis",
+ "1",
+ "enableTaskAutoScaler",
+ true,
+ "taskCountMax",
+ "4",
+ "taskCountMin",
+ "1"
+ ), AutoScalerConfig.class
+ ))
.anyTimes();
EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
@@ -775,6 +779,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
supervisor.start();
autoScaler.start();
supervisor.runInternal();
+
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountBeforeScaleOut);
Thread.sleep(1000);
@@ -1003,9 +1008,11 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
supervisor.start();
autoScaler.start();
supervisor.runInternal();
+
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountBeforeScaleOut);
Thread.sleep(1000);
+
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScaleOut);
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC,
1);
@@ -1050,11 +1057,14 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
// enable autoscaler so that taskcount config will be ignored and init
value of taskCount will use taskCountMin.
Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
supervisor.getIoConfig().setTaskCount(2);
+
supervisor.start();
autoScaler.start();
supervisor.runInternal();
+
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountBeforeScaleOut);
+
Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountAfterScaleOut);
@@ -1102,15 +1112,18 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
emitter
);
- // enable autoscaler so that taskcount config will be ignored and init
value of taskCount will use taskCountMin.
+ // enable autoscaler so that taskcount config will be ignored and the init
value of taskCount will use taskCountMin.
Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
supervisor.getIoConfig().setTaskCount(2);
+
+ // When
supervisor.start();
autoScaler.start();
supervisor.runInternal();
Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount());
Thread.sleep(2000);
+ // Then
Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount());
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC,
1);
@@ -1580,6 +1593,97 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
originalSpec.validateSpecUpdateTo(proposedSpecSameSource);
}
+ @Test
+ public void testMergeSpecConfigs()
+ {
+ mockIngestionSchema();
+
+ // Given
+ // Create existing spec with autoscaler config and taskCount set to 5
+ HashMap<String, Object> existingAutoScalerConfig = new HashMap<>();
+ existingAutoScalerConfig.put("enableTaskAutoScaler", true);
+ existingAutoScalerConfig.put("taskCountMax", 8);
+ existingAutoScalerConfig.put("taskCountMin", 1);
+
+ SeekableStreamSupervisorIOConfig existingIoConfig =
EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
+ EasyMock.expect(existingIoConfig.getAutoScalerConfig())
+ .andReturn(mapper.convertValue(existingAutoScalerConfig,
AutoScalerConfig.class))
+ .anyTimes();
+ EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes();
+ EasyMock.replay(existingIoConfig);
+
+ SeekableStreamSupervisorIngestionSpec existingIngestionSchema =
EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
+
EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes();
+
EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(existingIngestionSchema.getTuningConfig())
+ .andReturn(seekableStreamSupervisorTuningConfig)
+ .anyTimes();
+ EasyMock.replay(existingIngestionSchema);
+
+ TestSeekableStreamSupervisorSpec existingSpec =
buildDefaultSupervisorSpecWithIngestionSchema(
+ "id123",
+ existingIngestionSchema
+ );
+
+ // Create new spec with autoscaler config that has taskCountStart not set
(null) and no taskCount set
+ HashMap<String, Object> newAutoScalerConfig = new HashMap<>();
+ newAutoScalerConfig.put("enableTaskAutoScaler", true);
+ newAutoScalerConfig.put("taskCountMax", 8);
+ newAutoScalerConfig.put("taskCountMin", 1);
+
+ SeekableStreamSupervisorIOConfig newIoConfig =
EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
+ EasyMock.expect(newIoConfig.getAutoScalerConfig())
+ .andReturn(mapper.convertValue(newAutoScalerConfig,
AutoScalerConfig.class))
+ .anyTimes();
+ EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes();
+ newIoConfig.setTaskCount(5);
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(newIoConfig);
+
+ SeekableStreamSupervisorIngestionSpec newIngestionSchema =
EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
+
EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes();
+
EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+
EasyMock.expect(newIngestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(newIngestionSchema);
+
+ TestSeekableStreamSupervisorSpec newSpec =
buildDefaultSupervisorSpecWithIngestionSchema(
+ "id124",
+ newIngestionSchema
+ );
+
+ // Before merge, taskCountStart should be null
+
Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart());
+
+ // When - merge should copy taskCount from existing spec since new spec
has no taskCount
+ newSpec.merge(existingSpec);
+
+ // Then - verify setTaskCount was called (EasyMock will verify the mock
expectations)
+ EasyMock.verify(newIoConfig);
+ }
+
+ private TestSeekableStreamSupervisorSpec
buildDefaultSupervisorSpecWithIngestionSchema(
+ String id,
+ SeekableStreamSupervisorIngestionSpec ingestionSchema
+ )
+ {
+ return new TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ id
+ );
+ }
+
private void mockIngestionSchema()
{
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index 9ff217d5404..377223308b0 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -27,6 +27,7 @@ import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu
import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
+import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Set;
@@ -113,4 +114,15 @@ public interface SupervisorSpec
{
// The default implementation does not do any validation checks.
}
+
+ /**
+ * Updates this supervisor spec by merging values from the given {@code
existingSpec}.
+ * This method may be used to carry forward existing spec values when a
supervisor is being resubmitted.
+ *
+ * @param existingSpec used spec to merge values from
+ */
+ default void merge(@NotNull SupervisorSpec existingSpec)
+ {
+ // No-op by default
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]