kfaraz commented on code in PR #18745:
URL: https://github.com/apache/druid/pull/18745#discussion_r2602627218
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1339,7 +1357,7 @@ public void tryInit()
}
initRetryCounter++;
log.makeAlert(e, "Exception starting SeekableStreamSupervisor[%s]",
supervisorId)
- .emit();
+ .emit();
Review Comment:
not needed?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -190,16 +190,22 @@ private Runnable computeAndCollectLag()
};
}
- /**
- * This method determines whether to do scale actions based on collected lag
points.
- * Current algorithm of scale is simple:
- * First of all, compute the proportion of lag points higher/lower than
scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
- * Secondly, compare scaleOutThreshold/scaleInThreshold with
triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. P.S. Scale
out action has higher priority than scale in action.
- * Finaly, if scaleOutThreshold/scaleInThreshold is higher than
triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in
action would be triggered.
- *
- * @param lags the lag metrics of Stream(Kafka/Kinesis)
- * @return Integer. target number of tasksCount, -1 means skip scale action.
- */
+ /**
+ * This method determines whether to do scale actions based on collected
lag points.
+ * The current algorithm of scale is straightforward:
+ * <ul>
+ * <li>First, compute the proportion of lag points higher/lower than
<code>scaleOutThreshold</code>/<code>scaleInThreshold</code>,
Review Comment:
You can also use the `{@code}` javadoc tag as it is less verbose.
```suggestion
* <li>First, compute the proportion of lag points higher/lower than
{@code scaleOutThreshold/scaleInThreshold},
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1161,7 +1179,7 @@ public void stop(boolean stopGracefully)
catch (Exception e) {
stateManager.recordThrowableEvent(e);
log.makeAlert(e, "Exception stopping [%s]", supervisorId)
- .emit();
+ .emit();
Review Comment:
not needed?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -255,6 +263,32 @@ public void validateSpecUpdateTo(SupervisorSpec
proposedSpec) throws DruidExcept
}
}
+ /**
+ * Writes the <code>taskCountStart</code> value from old config, if not
specificed in new config.
+ *
+ * @param existingSpec the existing supervisor specification to merge
configuration values from
+ */
+ @Override
+ public void mergeSpecConfigs(@NotNull SupervisorSpec existingSpec)
+ {
+ AutoScalerConfig thisAutoScalerConfig =
this.getIoConfig().getAutoScalerConfig();
+ // Either if autoscaler is absent or taskCountStart is specified - just
return.
+ if (thisAutoScalerConfig == null ||
thisAutoScalerConfig.getTaskCountStart() != null) {
+ return;
+ }
+
+ // TODO[sasha]: use switch expression with pattern matching when we move
to Java 21 as minimum requirement.
+ if (existingSpec instanceof SeekableStreamSupervisorSpec) {
+ // Note: for some reason, sources are available only for bytecode
version 11.
Review Comment:
Please remove this.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -255,6 +263,32 @@ public void validateSpecUpdateTo(SupervisorSpec
proposedSpec) throws DruidExcept
}
}
+ /**
+ * Writes the <code>taskCountStart</code> value from old config, if not
specificed in new config.
+ *
+ * @param existingSpec the existing supervisor specification to merge
configuration values from
+ */
+ @Override
+ public void mergeSpecConfigs(@NotNull SupervisorSpec existingSpec)
+ {
+ AutoScalerConfig thisAutoScalerConfig =
this.getIoConfig().getAutoScalerConfig();
+ // Either if autoscaler is absent or taskCountStart is specified - just
return.
+ if (thisAutoScalerConfig == null ||
thisAutoScalerConfig.getTaskCountStart() != null) {
+ return;
+ }
+
+ // TODO[sasha]: use switch expression with pattern matching when we move
to Java 21 as minimum requirement.
Review Comment:
```suggestion
// Use switch expression with pattern matching when we move to Java 21
as minimum requirement.
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -255,6 +263,32 @@ public void validateSpecUpdateTo(SupervisorSpec
proposedSpec) throws DruidExcept
}
}
+ /**
+ * Writes the <code>taskCountStart</code> value from old config, if not
specificed in new config.
+ *
+ * @param existingSpec the existing supervisor specification to merge
configuration values from
+ */
+ @Override
+ public void mergeSpecConfigs(@NotNull SupervisorSpec existingSpec)
+ {
+ AutoScalerConfig thisAutoScalerConfig =
this.getIoConfig().getAutoScalerConfig();
+ // Either if autoscaler is absent or taskCountStart is specified - just
return.
+ if (thisAutoScalerConfig == null ||
thisAutoScalerConfig.getTaskCountStart() != null) {
+ return;
+ }
+
+ // TODO[sasha]: use switch expression with pattern matching when we move
to Java 21 as minimum requirement.
+ if (existingSpec instanceof SeekableStreamSupervisorSpec) {
+ // Note: for some reason, sources are available only for bytecode
version 11.
+ //noinspection PatternVariableCanBeUsed
+ var spec = (SeekableStreamSupervisorSpec) existingSpec;
Review Comment:
```suggestion
SeekableStreamSupervisorSpec spec = (SeekableStreamSupervisorSpec)
existingSpec;
```
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java:
##########
@@ -113,4 +114,15 @@ default void validateSpecUpdateTo(SupervisorSpec
proposedSpec) throws DruidExcep
{
// The default implementation does not do any validation checks.
}
+
+ /**
+ * Perform any merging of spec configurations needed after deserialization.
Review Comment:
```suggestion
* Updates this supervisor spec by merging values from the given {@code
existingSpec}.
* This method may be used to carry forward existing spec values when a
supervisor is being resubmitted.
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -556,44 +566,52 @@ public String getType()
private boolean changeTaskCount(int desiredActiveTaskCount)
throws InterruptedException, ExecutionException
{
+ if (autoScalerConfig == null) {
+ log.warn("autoScalerConfig is 'null' but dynamic allocation notice is
submitted, how can it be ?");
+ return false;
+ }
int currentActiveTaskCount;
Collection<TaskGroup> activeTaskGroups =
activelyReadingTaskGroups.values();
currentActiveTaskCount = activeTaskGroups.size();
if (desiredActiveTaskCount < 0 || desiredActiveTaskCount ==
currentActiveTaskCount) {
return false;
- } else {
- log.info(
- "Starting scale action, current active task count is [%d] and
desired task count is [%d] for supervisor[%s] for dataSource[%s].",
- currentActiveTaskCount,
- desiredActiveTaskCount,
- supervisorId,
- dataSource
- );
- final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
- gracefulShutdownInternal();
- changeTaskCountInIOConfig(desiredActiveTaskCount);
- clearAllocationInfo();
- emitter.emit(ServiceMetricEvent.builder()
- .setDimension(DruidMetrics.SUPERVISOR_ID,
supervisorId)
- .setDimension(DruidMetrics.DATASOURCE,
dataSource)
- .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
- .setDimensionIfNotNull(
- DruidMetrics.TAGS,
-
spec.getContextValue(DruidMetrics.TAGS)
- )
- .setMetric(
- AUTOSCALER_SCALING_TIME_METRIC,
- scaleActionStopwatch.millisElapsed()
- ));
- log.info("Changed taskCount to [%s] for supervisor[%s] for
dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource);
- return true;
}
+ log.info(
+ "Starting scale action, current active task count is [%d] and desired
task count is [%d] for supervisor[%s] for dataSource[%s].",
+ currentActiveTaskCount,
+ desiredActiveTaskCount,
+ supervisorId,
+ dataSource
+ );
+ final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
+ gracefulShutdownInternal();
+ changeTaskCountInAutoScalerConfig(desiredActiveTaskCount);
+ clearAllocationInfo();
+ emitter.emit(ServiceMetricEvent.builder()
+ .setDimension(DruidMetrics.SUPERVISOR_ID,
supervisorId)
+ .setDimension(DruidMetrics.DATASOURCE,
dataSource)
+ .setDimension(DruidMetrics.STREAM,
getIoConfig().getStream())
+ .setDimensionIfNotNull(
+ DruidMetrics.TAGS,
+ spec.getContextValue(DruidMetrics.TAGS)
+ )
+ .setMetric(
+ AUTOSCALER_SCALING_TIME_METRIC,
+ scaleActionStopwatch.millisElapsed()
+ ));
+ log.info("Changed taskCount to [%s] for supervisor[%s] for
dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource);
+ return true;
Review Comment:
I guess this is just moving the code out of the `else` block. Can we revert
this change for the time being?
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java:
##########
@@ -113,4 +114,15 @@ default void validateSpecUpdateTo(SupervisorSpec
proposedSpec) throws DruidExcep
{
// The default implementation does not do any validation checks.
}
+
+ /**
+ * Perform any merging of spec configurations needed after deserialization.
+ *
+ * @param existingSpec used spec to merge values from
+ * @throws DruidException if the spec update is not allowed
+ */
+ default void mergeSpecConfigs(@NotNull SupervisorSpec existingSpec)
Review Comment:
```suggestion
default void merge(@NotNull SupervisorSpec existingSpec)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]