tgravescs commented on code in PR #42352: URL: https://github.com/apache/spark/pull/42352#discussion_r1546387115
########## core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala: ########## @@ -669,6 +728,27 @@ private[spark] class ExecutorAllocationManager( private val stageAttemptToExecutorPlacementHints = new mutable.HashMap[StageAttempt, (Int, Map[String, Int], Int)] + // to track total no. of tasks in each stage of a micro-batch (streaming use case) + // this will help in requesting resources by counting pending tasks in job, + // rather than counting pending tasks in a stage. + private val jobStagesToNumTasks = new mutable.HashMap[Int, Int] Review Comment: name is a bit vague to me... what are jobStages. Are they stageattempts or just stageId? ########## core/src/main/scala/org/apache/spark/internal/config/package.scala: ########## @@ -601,6 +601,12 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val DYN_ALLOCATION_STREAMING_ENABLED = + ConfigBuilder("spark.dynamicAllocation.streaming.enabled") Review Comment: how does this interact with spark.dynamicAllocation.enabled ? I think it needs to be clear to the user and docuemnted well. you also need to update the configuration.md documentation to show this publicly ########## core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala: ########## @@ -340,6 +366,39 @@ private[spark] class ExecutorAllocationManager( } } + /** + * Maximum number of executors to be removed per dra evaluation. + * This function executes logic only with `spark.dynamicAllocation.streaming.enabled` flag enabled Review Comment: this should have better description... if not streamingDRA does nothing, put some details about the algorithm or reference the class description to explain it since this seems to be the main functionality. ########## core/src/main/scala/org/apache/spark/internal/config/package.scala: ########## @@ -644,6 +656,11 @@ package object config { .checkValue(_ >= 0L, "Timeout must be >= 0.") .createWithDefault(60) + private[spark] val DYN_ALLOCATION_EXECUTOR_DEALLOCATION_TIMEOUT = + ConfigBuilder("spark.dynamicAllocation.executorDeallocationTimeout") Review Comment: same here, if it only applies to streaming put that in the name ########## core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala: ########## @@ -185,6 +195,12 @@ private[spark] class ExecutorAllocationManager( * If not, throw an appropriate exception. */ private def validateSettings(): Unit = { + + if ( streamingDRAFeatureEnabled && !Utils.isDynamicAllocationEnabled(conf) ) { Review Comment: nits, there are a bunch of formatting issues here, with extra spaces, and indentation and things wrong, please fix those ########## core/src/main/scala/org/apache/spark/internal/config/package.scala: ########## @@ -630,6 +636,12 @@ package object config { .doubleConf .createWithDefault(1.0) + private[spark] val DYN_ALLOCATION_EXECUTOR_DEALLOCATION_RATIO = + ConfigBuilder("spark.dynamicAllocation.executorDeallocationRatio") Review Comment: If this only applies to streaming dra we should put streaming in the name.. add docs ########## core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala: ########## @@ -133,6 +133,16 @@ private[spark] class ExecutorAllocationManager( private val defaultProfileId = resourceProfileManager.defaultResourceProfile.id + private val streamingDRAFeatureEnabled = conf.get(DYN_ALLOCATION_STREAMING_ENABLED) Review Comment: it would be nice to update the description of this class to include the streaming details and how the algorithm works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org