This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0920a935591 fix: Precedence of taskCount, taskCountStart,
taskCountMin. (#19417)
0920a935591 is described below
commit 0920a93559116bfd401618d56854b1c0d5a31f14
Author: Gian Merlino <[email protected]>
AuthorDate: Wed May 6 09:49:07 2026 -0700
fix: Precedence of taskCount, taskCountStart, taskCountMin. (#19417)
PR #18745 included a discussion of desired behavior of taskCount,
taskCountStart, and taskCountMin, but this has not been fully
implemented. This patch updates logic to align with the intent.
---
docs/ingestion/supervisor.md | 2 +-
.../supervisor/KafkaSupervisorIOConfigTest.java | 47 +++++++-
.../kafka/supervisor/KafkaSupervisorSpecTest.java | 43 +++++++
.../overlord/supervisor/SupervisorManager.java | 4 +-
.../SeekableStreamSupervisorIOConfig.java | 34 ++++--
.../supervisor/SeekableStreamSupervisorSpec.java | 46 ++++----
.../supervisor/SupervisorResourceTest.java | 97 ++++++----------
.../SeekableStreamSupervisorIOConfigTest.java | 73 ++++++------
.../SeekableStreamSupervisorSpecTest.java | 124 ++++++++++++---------
.../SeekableStreamSupervisorStateTest.java | 12 +-
.../SeekableStreamSupervisorTestBase.java | 20 +++-
.../overlord/supervisor/SupervisorSpec.java | 6 +-
12 files changed, 307 insertions(+), 201 deletions(-)
diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 4e59f0fa542..19665242081 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -78,7 +78,7 @@ The following table outlines the configuration properties for
`autoScalerConfig`
|`enableTaskAutoScaler`|Enables the autoscaler. If not specified, Druid
disables the autoscaler even when `autoScalerConfig` is not null.|No|`false`|
|`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or
equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka
partitions or Kinesis shards, Druid sets the maximum number of reading tasks to
the number of Kafka partitions or Kinesis shards and ignores
`taskCountMax`.|Yes||
|`taskCountMin`|The minimum number of ingestion tasks. When you enable the
autoscaler, Druid computes the initial number of tasks to launch by checking
the configs in the following order: `taskCountStart`, then `taskCount` (in
`ioConfig`), then `taskCountMin`.|Yes||
-|`taskCountStart`|Optional config to specify the number of ingestion tasks to
start with. When you enable the autoscaler, Druid computes the initial number
of tasks to launch by checking the configs in the following order:
`taskCountStart`, then `taskCount` (in `ioConfig`), then
`taskCountMin`.|No|`taskCount` or `taskCountMin`|
+|`taskCountStart`|Optional config to specify the number of ingestion tasks to
start with. If `taskCountStart` is provided on POST of a supervisor, it takes
priority and the `taskCount` is reset to `taskCountStart` at that
time.|No|`taskCount` or `taskCountMin`|
|`minScaleUpDelay`|Minimum cooldown duration between scale-up actions,
specified as an ISO-8601 duration string. Falls back to
`minTriggerScaleActionFrequencyMillis` if not set.|No||
|`minScaleDownDelay`|Minimum cooldown duration between scale-down actions,
specified as an ISO-8601 duration string. Falls back to
`minTriggerScaleActionFrequencyMillis` if not set.|No||
|`minTriggerScaleActionFrequencyMillis`|**Deprecated.** Use `minScaleUpDelay`
and `minScaleDownDelay` instead. Minimum time interval in milliseconds between
scale actions, used as the fallback when the Duration-based fields are not
set.|No|600000|
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 289d3c989f3..264c37f9913 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -381,7 +381,7 @@ public class KafkaSupervisorIOConfigTest
false,
null
);
- Assert.assertEquals(5, kafkaSupervisorIOConfig.getTaskCount().intValue());
+ Assert.assertEquals(1, kafkaSupervisorIOConfig.getTaskCount());
Assert.assertThrows(
"taskCountMin <= taskCountStart <= taskCountMax",
@@ -400,6 +400,51 @@ public class KafkaSupervisorIOConfigTest
);
}
+ @Test
+ public void testTaskCountStartFallbackAndExplicitFlag()
+ {
+ final Map<String, Object> autoScalerConfig = ImmutableMap.of(
+ "enableTaskAutoScaler", true,
+ "taskCountMin", 1,
+ "taskCountMax", 10,
+ "taskCountStart", 5
+ );
+
+ Assert.assertEquals(7, makeIOConfig(7, autoScalerConfig).getTaskCount());
+ Assert.assertTrue(makeIOConfig(7, autoScalerConfig).isTaskCountExplicit());
+
+ Assert.assertEquals(5, makeIOConfig(null,
autoScalerConfig).getTaskCount());
+ Assert.assertFalse(makeIOConfig(null,
autoScalerConfig).isTaskCountExplicit());
+ }
+
+ private KafkaSupervisorIOConfig makeIOConfig(Integer taskCount, Map<String,
Object> autoScalerConfig)
+ {
+ return new KafkaSupervisorIOConfig(
+ "test",
+ null,
+ null,
+ 1,
+ taskCount,
+ new Period("PT1H"),
+ ImmutableMap.of("bootstrap.servers", "localhost:8082"),
+ mapper.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
+ LagAggregator.DEFAULT,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ new Period("P1D"),
+ new Period("PT30S"),
+ true,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ false,
+ null
+ );
+ }
+
@Test
public void testIdleConfigSerde() throws JsonProcessingException
{
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index 4f6088a8868..8879ff6d975 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -30,7 +30,10 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
+import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -422,6 +425,46 @@ public class KafkaSupervisorSpecTest
Assert.assertFalse(runningSpec.isSuspended());
}
+ @Test
+ public void testTaskCountSerdeRoundTrip() throws IOException
+ {
+ // A persisted taskCount must survive a serialize/deserialize round-trip
even when
+ // autoScalerConfig.taskCountStart is set.
+ final CostBasedAutoScalerConfig autoScalerConfig =
+ CostBasedAutoScalerConfig.builder()
+ .enableTaskAutoScaler(true)
+ .taskCountMin(1)
+ .taskCountMax(100)
+ .taskCountStart(25)
+ .build();
+
+ final KafkaSupervisorSpec spec = new KafkaSupervisorSpecBuilder()
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(TimestampSpec.DEFAULT)
+ .withAggregators(new CountAggregatorFactory("rows"))
+ .withGranularity(new UniformGranularitySpec(Granularities.DAY,
Granularities.NONE, null))
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withJsonInputFormat()
+ .withConsumerProperties(Map.of("bootstrap.servers",
"localhost:9092"))
+ .withTaskCount(25)
+ .withAutoScalerConfig(autoScalerConfig)
+ .withLagAggregator(LagAggregator.DEFAULT)
+ )
+ .build("testDs", "metrics");
+
+ // Mutate taskCount the same way
SeekableStreamSupervisor.changeTaskCountInIOConfig does,
+ // and verify that the mutation is picked up by serialization.
+ spec.getIoConfig().setTaskCount(50);
+ final byte[] payload = mapper.writeValueAsBytes(spec);
+ final KafkaSupervisorSpec roundTripped =
+ (KafkaSupervisorSpec) mapper.readValue(payload, SupervisorSpec.class);
+ Assert.assertEquals(50, roundTripped.getIoConfig().getTaskCount());
+ Assert.assertTrue(roundTripped.getIoConfig().isTaskCountExplicit());
+ }
+
@Test
public void test_validateSpecUpdateTo()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 51abb814a6d..52f3cba7fc1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -208,9 +208,7 @@ public class SupervisorManager implements
SupervisorStatsProvider
Preconditions.checkState(started, "SupervisorManager not started");
final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
SupervisorSpec existingSpec =
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
- if (existingSpec != null) {
- spec.merge(existingSpec);
- }
+ spec.merge(existingSpec);
createAndStartSupervisorInternal(spec, shouldUpdateSpec);
return shouldUpdateSpec;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 421a885b294..cf7b27a6f0c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -19,12 +19,14 @@
package org.apache.druid.indexing.seekablestream.supervisor;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.IAE;
import org.joda.time.DateTime;
@@ -41,7 +43,14 @@ public abstract class SeekableStreamSupervisorIOConfig
@Nullable
private final InputFormat inputFormat; // nullable for backward compatibility
private final Integer replicas;
- private Integer taskCount;
+ private int taskCount;
+ /**
+ * Whether {@link #taskCount} was explicitly provided to the constructor.
After a serde round-trip,
+ * this will always be false, because the constructor creates an explicit
taskCount. Its purpose is
+ * to allow {@link SeekableStreamSupervisorSpec#merge(SupervisorSpec)} to
tell if a user-submitted
+ * spec had an explicit taskCount or not.
+ */
+ private final boolean taskCountExplicit;
private final Duration taskDuration;
private final Duration startDelay;
private final Duration period;
@@ -90,16 +99,19 @@ public abstract class SeekableStreamSupervisorIOConfig
this.autoScalerConfig = autoScalerConfig;
boolean isAutoScalerAvailable = autoScalerConfig != null;
this.autoScalerEnabled = isAutoScalerAvailable &&
autoScalerConfig.getEnableTaskAutoScaler();
- if (autoScalerEnabled) {
- // Priority: taskCountStart > taskCount > taskCountMin
+ this.taskCountExplicit = taskCount != null;
+ if (taskCount != null) {
+ // Always retain taskCount when deserializing. Note: taskCountStart
takes precedence over taskCount
+ // in SeekableStreamSupervisorSpec#merge, to ensure that when a
supervisor is explicitly POSTed, taskCount
+ // is reset to taskCountStart.
+ this.taskCount = taskCount;
+ } else if (autoScalerEnabled) {
this.taskCount = Configs.valueOrDefault(
autoScalerConfig.getTaskCountStart(),
- Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin())
+ autoScalerConfig.getTaskCountMin()
);
- } else if (isAutoScalerAvailable) {
- this.taskCount = Configs.valueOrDefault(taskCount,
autoScalerConfig.getTaskCountMin());
} else {
- this.taskCount = Configs.valueOrDefault(taskCount, 1);
+ this.taskCount = 1;
}
Preconditions.checkArgument(stopTaskCount == null || stopTaskCount > 0,
"stopTaskCount must be greater than 0");
@@ -204,7 +216,7 @@ public abstract class SeekableStreamSupervisorIOConfig
}
@JsonProperty
- public Integer getTaskCount()
+ public int getTaskCount()
{
return taskCount;
}
@@ -214,6 +226,12 @@ public abstract class SeekableStreamSupervisorIOConfig
this.taskCount = taskCount;
}
+ @JsonIgnore
+ public boolean isTaskCountExplicit()
+ {
+ return taskCountExplicit;
+ }
+
@JsonProperty
public Duration getTaskDuration()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index f21e073f6c4..fefa24a5f3f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -43,7 +43,6 @@ import
org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import javax.annotation.Nullable;
-import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;
@@ -261,30 +260,37 @@ public abstract class SeekableStreamSupervisorSpec
implements SupervisorSpec
}
}
+ /**
+ * Updates {@link SeekableStreamSupervisorIOConfig#getTaskCount()} on this
user-submitted spec
+ * to the desired value. The rules applied are:
+ *
+ * <ol>
+ * <li>If {@code taskCountStart} is set on this user-submitted spec, use
it.</li>
+ * <li>Otherwise, if {@code taskCount} is set on this user-submitted spec,
use it.</li>
+ * <li>Otherwise, use the existing spec's {@code taskCount}.</li>
+ * </ol>
+ */
@Override
- public void merge(@NotNull SupervisorSpec existingSpec)
+ public void merge(@Nullable SupervisorSpec existingSpec)
{
- AutoScalerConfig thisAutoScalerConfig =
this.getIoConfig().getAutoScalerConfig();
- // Either if autoscaler is absent or taskCountStart is specified - just
return.
- if (thisAutoScalerConfig == null ||
thisAutoScalerConfig.getTaskCountStart() != null) {
+ // Use this spec's taskCountStart if set.
+ final AutoScalerConfig thisAutoScalerConfig =
getIoConfig().getAutoScalerConfig();
+ if (thisAutoScalerConfig != null
+ && thisAutoScalerConfig.getEnableTaskAutoScaler()
+ && thisAutoScalerConfig.getTaskCountStart() != null) {
+ getIoConfig().setTaskCount(thisAutoScalerConfig.getTaskCountStart());
+ return;
+ }
+
+ // Use this spec's taskCount if set.
+ if (getIoConfig().isTaskCountExplicit()) {
return;
}
- // Use a switch expression with pattern matching when we move to Java 21
as a minimum requirement.
- if (existingSpec instanceof SeekableStreamSupervisorSpec) {
- SeekableStreamSupervisorSpec spec = (SeekableStreamSupervisorSpec)
existingSpec;
- AutoScalerConfig autoScalerConfig =
spec.getIoConfig().getAutoScalerConfig();
- if (autoScalerConfig == null) {
- return;
- }
- // provided `taskCountStart` > provided `taskCount` > existing
`taskCount` > provided `taskCountMin`.
- int taskCount = thisAutoScalerConfig.getTaskCountMin();
- if (this.getIoConfig().getTaskCount() != null) {
- taskCount = this.getIoConfig().getTaskCount();
- } else if (spec.getIoConfig().getTaskCount() != null) {
- taskCount = spec.getIoConfig().getTaskCount();
- }
- this.getIoConfig().setTaskCount(taskCount);
+ // Use the existing spec's taskCount. If it isn't there, we'll fall back
to this spec's taskCount. Because there's
+ // no taskCountStart (and taskCount hasn't been explicitly set) this
spec's taskCount will be taskCountMin or 1.
+ if (existingSpec instanceof SeekableStreamSupervisorSpec
existingSeekableStreamSpec) {
+
getIoConfig().setTaskCount(existingSeekableStreamSpec.getIoConfig().getTaskCount());
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index bb7581ee874..4ccf4659994 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -37,7 +37,7 @@ import
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMeta
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
-import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTestBase;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
@@ -1394,36 +1394,44 @@ public class SupervisorResourceTest extends
EasyMockSupport
}
@Test
- public void
testSpecPostMergeUsesExistingTaskCountHigherPriorityHasBeenMissed()
+ public void testSpecPostMergeUsesExistingTaskCountWhenNewSpecHasNone()
{
- // New spec has no taskCount -> should use existing taskCount (5)
- TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1);
- TestSeekableStreamSupervisorSpec newSpec =
createTestSpecWithExpectedMerge(null, 2, 5);
+ // New spec has no taskCount -> should carry forward existing taskCount
(5).
+ final TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1);
+ final TestSeekableStreamSupervisorSpec newSpec = createTestSpec(null, 2);
newSpec.merge(existingSpec);
- EasyMock.verify(newSpec.getIoConfig());
+
+ Assert.assertEquals(5, newSpec.getIoConfig().getTaskCount());
}
@Test
public void testSpecPostMergeUsesProvidedTaskCountOverExistingTaskCount()
{
- // New spec has taskCount=3 -> should use provided taskCount over existing
(5)
- TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1);
- TestSeekableStreamSupervisorSpec newSpec =
createTestSpecWithExpectedMerge(3, 2, 3);
+ // New spec has taskCount=3 -> should keep provided taskCount over
existing (5).
+ final TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1);
+ final TestSeekableStreamSupervisorSpec newSpec = createTestSpec(3, 2);
newSpec.merge(existingSpec);
- EasyMock.verify(newSpec.getIoConfig());
+
+ Assert.assertEquals(3, newSpec.getIoConfig().getTaskCount());
}
@Test
- public void testSpecPostMergeFallsBackToProvidedTaskCountMin()
+ public void
testSpecPostMergeCarriesForwardEvenWhenExistingHasOnlyTaskCountMin()
{
- // Neither has taskCount -> should fall back to taskCountMin (4)
- TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(null, 1);
- TestSeekableStreamSupervisorSpec newSpec =
createTestSpecWithExpectedMerge(null, 4, 4);
+ // existingSpec has taskCount = 1, newSpec has no taskCount and
taskCountMin = 4
+ // -> carry forward existing taskCount, keep it at 1. We expect the
autoscaler
+ // to set the taskCount to the new min when it runs.
+ final TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(null,
1);
+ final TestSeekableStreamSupervisorSpec newSpec = createTestSpec(null, 4);
+
+ Assert.assertEquals(1, existingSpec.getIoConfig().getTaskCount());
+ Assert.assertEquals(4, newSpec.getIoConfig().getTaskCount());
newSpec.merge(existingSpec);
- EasyMock.verify(newSpec.getIoConfig());
+
+ Assert.assertEquals(1, newSpec.getIoConfig().getTaskCount());
}
@Test
@@ -1450,60 +1458,19 @@ public class SupervisorResourceTest extends
EasyMockSupport
private TestSeekableStreamSupervisorSpec createTestSpec(Integer taskCount,
int taskCountMin)
{
- HashMap<String, Object> autoScalerConfig = new HashMap<>();
- autoScalerConfig.put("enableTaskAutoScaler", true);
- autoScalerConfig.put("taskCountMax", 10);
- autoScalerConfig.put("taskCountMin", taskCountMin);
-
- SeekableStreamSupervisorIOConfig ioConfig =
EasyMock.createMock(SeekableStreamSupervisorIOConfig.class);
- EasyMock.expect(ioConfig.getAutoScalerConfig())
- .andReturn(OBJECT_MAPPER.convertValue(autoScalerConfig,
AutoScalerConfig.class))
- .anyTimes();
- EasyMock.expect(ioConfig.getTaskCount()).andReturn(taskCount).anyTimes();
- EasyMock.replay(ioConfig);
-
- DataSchema dataSchema = EasyMock.createMock(DataSchema.class);
-
EasyMock.expect(dataSchema.getDataSource()).andReturn("datasource1").anyTimes();
- EasyMock.replay(dataSchema);
-
- SeekableStreamSupervisorIngestionSpec ingestionSchema =
- EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class);
-
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(ioConfig).anyTimes();
-
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
- EasyMock.replay(ingestionSchema);
-
- return new TestSeekableStreamSupervisorSpec("my-id", ingestionSchema);
- }
-
- private TestSeekableStreamSupervisorSpec createTestSpecWithExpectedMerge(
- Integer taskCount,
- int taskCountMin,
- int expectedTaskCount
- )
- {
- HashMap<String, Object> autoScalerConfig = new HashMap<>();
- autoScalerConfig.put("enableTaskAutoScaler", true);
- autoScalerConfig.put("taskCountMax", 10);
- autoScalerConfig.put("taskCountMin", taskCountMin);
-
- SeekableStreamSupervisorIOConfig ioConfig =
EasyMock.createMock(SeekableStreamSupervisorIOConfig.class);
- EasyMock.expect(ioConfig.getAutoScalerConfig())
- .andReturn(OBJECT_MAPPER.convertValue(autoScalerConfig,
AutoScalerConfig.class))
- .anyTimes();
- EasyMock.expect(ioConfig.getTaskCount()).andReturn(taskCount).anyTimes();
- ioConfig.setTaskCount(expectedTaskCount);
- EasyMock.expectLastCall().once();
- EasyMock.replay(ioConfig);
+ final SeekableStreamSupervisorIOConfig ioConfig =
SeekableStreamSupervisorTestBase.createIOConfig(
+ taskCount,
+
SeekableStreamSupervisorTestBase.lagBasedAutoScalerConfig(taskCountMin, 10,
null)
+ );
- DataSchema dataSchema = EasyMock.createMock(DataSchema.class);
+ final DataSchema dataSchema = EasyMock.createMock(DataSchema.class);
EasyMock.expect(dataSchema.getDataSource()).andReturn("datasource1").anyTimes();
EasyMock.replay(dataSchema);
- SeekableStreamSupervisorIngestionSpec ingestionSchema =
- EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class);
-
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(ioConfig).anyTimes();
-
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
- EasyMock.replay(ingestionSchema);
+ final SeekableStreamSupervisorIngestionSpec ingestionSchema =
+ new SeekableStreamSupervisorIngestionSpec(dataSchema, ioConfig, null)
+ {
+ };
return new TestSeekableStreamSupervisorSpec("my-id", ingestionSchema);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
index 3974b9bebca..09f8bfde484 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
@@ -77,7 +77,7 @@ public class SeekableStreamSupervisorIOConfigTest
Assert.assertEquals("stream", config.getStream());
Assert.assertEquals(inputFormat, config.getInputFormat());
Assert.assertEquals(Integer.valueOf(1), config.getReplicas());
- Assert.assertEquals(Integer.valueOf(1), config.getTaskCount());
+ Assert.assertEquals(1, config.getTaskCount());
Assert.assertEquals(Duration.standardHours(1), config.getTaskDuration());
Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());
@@ -94,47 +94,45 @@ public class SeekableStreamSupervisorIOConfigTest
}
@Test
- public void testAutoScalerEnabledPreservesTaskCountWhenNonNull()
+ public void testTaskCountResolutionInConstructor()
{
- LagAggregator lagAggregator = mock(LagAggregator.class);
+ // Constructor priority is "explicit taskCount > taskCountStart >
taskCountMin" so that a
+ // previously autoscaled taskCount survives a Jackson round-trip through
the metadata store.
- // autoScalerEnabled = true
- AutoScalerConfig autoScalerConfig = mock(AutoScalerConfig.class);
- when(autoScalerConfig.getEnableTaskAutoScaler()).thenReturn(true);
- when(autoScalerConfig.getTaskCountStart()).thenReturn(5);
- when(autoScalerConfig.getTaskCountMin()).thenReturn(3);
+ // taskCount=10 + taskCountStart=5 -> taskCount wins, isExplicit=true.
+ assertTaskCount(10, autoScaler(5, 3), 10, true);
- SeekableStreamSupervisorIOConfig configAuto = new
SeekableStreamSupervisorIOConfig(
- "stream",
- null,
- 2,
- 10, // (taskCount should be ignored)
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- autoScalerConfig,
- lagAggregator,
- null,
- null,
- null,
- null
- )
- {
- };
+ // taskCount=null + taskCountStart=5 -> taskCountStart, isExplicit=false.
+ assertTaskCount(null, autoScaler(5, 3), 5, false);
- Assert.assertEquals(Integer.valueOf(5), configAuto.getTaskCount()); //
taskCountStart
+ // taskCount=null + no taskCountStart -> taskCountMin, isExplicit=false.
+ assertTaskCount(null, autoScaler(null, 3), 3, false);
- // autoScalerEnabled = false
- SeekableStreamSupervisorIOConfig configNoAuto = new
SeekableStreamSupervisorIOConfig(
+ // taskCount=10, no autoscaler -> taskCount, isExplicit=true.
+ assertTaskCount(10, null, 10, true);
+ }
+
+ private static AutoScalerConfig autoScaler(@Nullable Integer taskCountStart,
int taskCountMin)
+ {
+ final AutoScalerConfig config = mock(AutoScalerConfig.class);
+ when(config.getEnableTaskAutoScaler()).thenReturn(true);
+ when(config.getTaskCountStart()).thenReturn(taskCountStart);
+ when(config.getTaskCountMin()).thenReturn(taskCountMin);
+ return config;
+ }
+
+ private static void assertTaskCount(
+ @Nullable Integer taskCount,
+ @Nullable AutoScalerConfig autoScalerConfig,
+ int expectedTaskCount,
+ boolean expectedExplicit
+ )
+ {
+ final SeekableStreamSupervisorIOConfig config = new
SeekableStreamSupervisorIOConfig(
"stream",
null,
2,
- 10,
- null,
+ taskCount,
null,
null,
null,
@@ -142,7 +140,8 @@ public class SeekableStreamSupervisorIOConfigTest
null,
null,
null,
- lagAggregator,
+ autoScalerConfig,
+ mock(LagAggregator.class),
null,
null,
null,
@@ -150,8 +149,8 @@ public class SeekableStreamSupervisorIOConfigTest
)
{
};
-
- Assert.assertEquals(Integer.valueOf(10), configNoAuto.getTaskCount());
+ Assert.assertEquals(expectedTaskCount, config.getTaskCount());
+ Assert.assertEquals(expectedExplicit, config.isTaskCountExplicit());
}
@Test
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index bf3d5f9d71e..8d1f755cec2 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -52,6 +52,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -1219,7 +1221,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
supervisor.runInternal();
Thread.sleep(1000); // ensure a dynamic allocation notice completes
- Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount().intValue());
+ Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount());
Assert.assertTrue(
dynamicActionEmitter
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
@@ -1337,71 +1339,83 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
}
@Test
- public void testMergeSpecConfigs()
+ public void testMerge_withExistingSpec()
{
+ // Resolution rules on user POST when an existing spec is in the DB:
+ // 1. new spec's taskCountStart wins (over both explicit taskCount and
existing).
+ // 2. else new spec's explicit taskCount wins.
+ // 3. else carry forward existing.taskCount (so autoscaler progress is
not lost).
mockIngestionSchema();
- // Given
- // Create existing spec with autoscaler config and taskCount set to 5
- HashMap<String, Object> existingAutoScalerConfig = new HashMap<>();
- existingAutoScalerConfig.put("enableTaskAutoScaler", true);
- existingAutoScalerConfig.put("taskCountMax", 8);
- existingAutoScalerConfig.put("taskCountMin", 1);
+ // existing(taskCount=5, autoscaler) + new(no taskCount, no start) ->
carry forward 5.
+ assertMergeResult(spec(5, 1, 8, null), spec(null, 1, 8, null), 5);
- SeekableStreamSupervisorIOConfig existingIoConfig =
EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
- EasyMock.expect(existingIoConfig.getAutoScalerConfig())
- .andReturn(mapper.convertValue(existingAutoScalerConfig,
AutoScalerConfig.class))
- .anyTimes();
- EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes();
- EasyMock.replay(existingIoConfig);
-
- SeekableStreamSupervisorIngestionSpec existingIngestionSchema =
EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
-
EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes();
-
EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
- EasyMock.expect(existingIngestionSchema.getTuningConfig())
- .andReturn(seekableStreamSupervisorTuningConfig)
- .anyTimes();
- EasyMock.replay(existingIngestionSchema);
+ // existing(5) + new(taskCount=7) -> keep 7.
+ assertMergeResult(spec(5, 1, 8, null), spec(7, 1, 8, null), 7);
- TestSeekableStreamSupervisorSpec existingSpec =
buildDefaultSupervisorSpecWithIngestionSchema(
- "id123",
- existingIngestionSchema
- );
+ // existing(5) + new(taskCountStart=3) -> 3.
+ assertMergeResult(spec(5, 1, 8, null), spec(null, 1, 8, 3), 3);
- // Create new spec with autoscaler config that has taskCountStart not set
(null) and no taskCount set
- HashMap<String, Object> newAutoScalerConfig = new HashMap<>();
- newAutoScalerConfig.put("enableTaskAutoScaler", true);
- newAutoScalerConfig.put("taskCountMax", 8);
- newAutoScalerConfig.put("taskCountMin", 1);
+ // existing(5) + new(taskCount=7, taskCountStart=3) -> 3 (start beats
explicit).
+ assertMergeResult(spec(5, 1, 8, null), spec(7, 1, 8, 3), 3);
- SeekableStreamSupervisorIOConfig newIoConfig =
EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
- EasyMock.expect(newIoConfig.getAutoScalerConfig())
- .andReturn(mapper.convertValue(newAutoScalerConfig,
AutoScalerConfig.class))
- .anyTimes();
- EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes();
- newIoConfig.setTaskCount(5);
- EasyMock.expectLastCall().once();
- EasyMock.replay(newIoConfig);
-
- SeekableStreamSupervisorIngestionSpec newIngestionSchema =
EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
-
EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes();
-
EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
-
EasyMock.expect(newIngestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
- EasyMock.replay(newIngestionSchema);
-
- TestSeekableStreamSupervisorSpec newSpec =
buildDefaultSupervisorSpecWithIngestionSchema(
- "id124",
- newIngestionSchema
- );
+ // No autoscaler on new spec -> merge is a no-op; new spec's taskCount
stands.
+ assertMergeResult(spec(5, 1, 8, null), buildSpecWithIoConfig("new",
createIOConfig(7, null)), 7);
+
+ // existing already has an explicit taskCount=6 (e.g. metadata-store
round-trip). The
+ // *new* spec's isTaskCountExplicit must drive carry-forward, not the
existing's.
+ assertMergeResult(spec(6, 1, 8, 3), spec(null, 1, 8, 3), 3);
+ assertMergeResult(spec(6, 1, 8, 3), spec(null, 1, 8, null), 6);
+ }
+
+ @Test
+ public void testMerge_withNullExistingSpec_appliesTaskCountStartOnFirstPost()
+ {
+ // First-POST coverage. SupervisorManager calls merge(existingSpec)
unconditionally,
+ // including when there is no prior spec (existingSpec == null). The
constructor prefers
+ // an explicit taskCount over taskCountStart, so merge() must re-apply the
+ // "taskCountStart wins on user POST" rule even on the very first
submission.
+ mockIngestionSchema();
- // Before merge, taskCountStart should be null
-
Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart());
+ assertMergeResult(null, spec(7, 1, 8, 3), 3); // start beats explicit
+ assertMergeResult(null, spec(null, 1, 8, 3), 3); // start applied
+ assertMergeResult(null, spec(7, 1, 8, null), 7); // explicit kept
+ assertMergeResult(null, spec(null, 2, 8, null), 2); // taskCountMin from
constructor
+ }
- // When - merge should copy taskCount from existing spec since new spec
has no taskCount
+ private void assertMergeResult(
+ @Nullable TestSeekableStreamSupervisorSpec existingSpec,
+ TestSeekableStreamSupervisorSpec newSpec,
+ int expectedTaskCount
+ )
+ {
newSpec.merge(existingSpec);
+ Assert.assertEquals(expectedTaskCount,
newSpec.getIoConfig().getTaskCount());
+ }
+
+ private TestSeekableStreamSupervisorSpec spec(
+ @Nullable Integer taskCount,
+ int taskCountMin,
+ int taskCountMax,
+ @Nullable Integer taskCountStart
+ )
+ {
+ return buildSpecWithIoConfig(
+ "id",
+ createIOConfig(taskCount, lagBasedAutoScalerConfig(taskCountMin,
taskCountMax, taskCountStart))
+ );
+ }
- // Then - verify setTaskCount was called (EasyMock will verify the mock
expectations)
- EasyMock.verify(newIoConfig);
+ private TestSeekableStreamSupervisorSpec buildSpecWithIoConfig(
+ String id,
+ SeekableStreamSupervisorIOConfig ioConfig
+ )
+ {
+ final SeekableStreamSupervisorIngestionSpec ingestionSchema =
+ new SeekableStreamSupervisorIngestionSpec(dataSchema, ioConfig,
seekableStreamSupervisorTuningConfig)
+ {
+ };
+ return buildDefaultSupervisorSpecWithIngestionSchema(id, ingestionSchema);
}
private TestSeekableStreamSupervisorSpec
buildDefaultSupervisorSpecWithIngestionSchema(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index cb2f2b451c1..4266a002b84 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -3725,7 +3725,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
// minScaleUpDelay = 0 means any scale-up is immediately allowed.
supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {},
scalingEmitter);
- Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount().intValue());
+ Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount());
final List<ServiceMetricEvent> events =
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
@@ -3744,11 +3744,11 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
// First scale-up succeeds and stamps the last-scale timestamp.
supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {},
scalingEmitter);
- Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount().intValue());
+ Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount());
// Second scale-up is within the 1h minScaleUpDelay window and must be
blocked.
supervisor.handleDynamicAllocationTasksNotice(() -> 7, () -> {},
scalingEmitter);
- Assert.assertEquals("Second scale-up must not take effect", 5,
supervisor.getIoConfig().getTaskCount().intValue());
+ Assert.assertEquals("Second scale-up must not take effect", 5,
supervisor.getIoConfig().getTaskCount());
final List<ServiceMetricEvent> events =
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
@@ -3771,7 +3771,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
// minScaleDownDelay = 0 means any scale-down is immediately allowed.
supervisor.handleDynamicAllocationTasksNotice(() -> 2, () -> {},
scalingEmitter);
- Assert.assertEquals(2, supervisor.getIoConfig().getTaskCount().intValue());
+ Assert.assertEquals(2, supervisor.getIoConfig().getTaskCount());
final List<ServiceMetricEvent> events =
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
@@ -3790,11 +3790,11 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
// First scale-down succeeds and stamps the last-scale timestamp.
supervisor.handleDynamicAllocationTasksNotice(() -> 3, () -> {},
scalingEmitter);
- Assert.assertEquals(3, supervisor.getIoConfig().getTaskCount().intValue());
+ Assert.assertEquals(3, supervisor.getIoConfig().getTaskCount());
// Second scale-down is within the 1h minScaleDownDelay window and must be
blocked.
supervisor.handleDynamicAllocationTasksNotice(() -> 1, () -> {},
scalingEmitter);
- Assert.assertEquals("Second scale-down must not take effect", 3,
supervisor.getIoConfig().getTaskCount().intValue());
+ Assert.assertEquals("Second scale-down must not take effect", 3,
supervisor.getIoConfig().getTaskCount());
final List<ServiceMetricEvent> events =
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
index 35d063e88e1..82abda059a1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
@@ -46,7 +46,7 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningCon
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
-import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -66,6 +66,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -518,7 +519,10 @@ public abstract class SeekableStreamSupervisorTestBase
.build();
}
- protected SeekableStreamSupervisorIOConfig createIOConfig(int taskCount,
CostBasedAutoScalerConfig autoScalerConfig)
+ public static SeekableStreamSupervisorIOConfig createIOConfig(
+ Integer taskCount,
+ AutoScalerConfig autoScalerConfig
+ )
{
return new SeekableStreamSupervisorIOConfig(
STREAM,
@@ -542,4 +546,16 @@ public abstract class SeekableStreamSupervisorTestBase
{
};
}
+
+ public static AutoScalerConfig lagBasedAutoScalerConfig(int taskCountMin,
int taskCountMax, Integer taskCountStart)
+ {
+ final HashMap<String, Object> config = new HashMap<>();
+ config.put("enableTaskAutoScaler", true);
+ config.put("taskCountMin", taskCountMin);
+ config.put("taskCountMax", taskCountMax);
+ if (taskCountStart != null) {
+ config.put("taskCountStart", taskCountStart);
+ }
+ return OBJECT_MAPPER.convertValue(config, AutoScalerConfig.class);
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index bf50c7cf48c..67575057ceb 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -27,7 +27,7 @@ import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu
import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
-import javax.validation.constraints.NotNull;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
@@ -130,12 +130,12 @@ public interface SupervisorSpec
}
/**
- * Updates this supervisor spec by merging values from the given {@code
existingSpec}.
+ * Updates this user-submitted supervisor spec by merging values from the
given {@code existingSpec}.
* This method may be used to carry forward existing spec values when a
supervisor is being resubmitted.
*
* @param existingSpec used spec to merge values from
*/
- default void merge(@NotNull SupervisorSpec existingSpec)
+ default void merge(@Nullable SupervisorSpec existingSpec)
{
// No-op by default
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]