tgravescs commented on code in PR #42352:
URL: https://github.com/apache/spark/pull/42352#discussion_r1546387115


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -669,6 +728,27 @@ private[spark] class ExecutorAllocationManager(
     private val stageAttemptToExecutorPlacementHints =
       new mutable.HashMap[StageAttempt, (Int, Map[String, Int], Int)]
 
+    // to track total no. of tasks in each stage of a micro-batch (streaming 
use case)
+    // this will help in requesting resources by counting pending tasks in job,
+    // rather than counting pending tasks in a stage.
+    private val jobStagesToNumTasks = new mutable.HashMap[Int, Int]

Review Comment:
   name is a bit vague to me... what are jobStages.  Are they stageattempts or 
just stageId?



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -601,6 +601,12 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val DYN_ALLOCATION_STREAMING_ENABLED =
+    ConfigBuilder("spark.dynamicAllocation.streaming.enabled")

Review Comment:
   how does this interact with spark.dynamicAllocation.enabled  ?  I think it 
needs to be clear to the user and docuemnted well.
   
   you also need to update the configuration.md documentation to show this 
publicly



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -340,6 +366,39 @@ private[spark] class ExecutorAllocationManager(
     }
   }
 
+  /**
+   * Maximum number of executors to be removed per dra evaluation.
+   * This function executes logic only with 
`spark.dynamicAllocation.streaming.enabled` flag enabled

Review Comment:
   this should have better description... if not streamingDRA does nothing, put 
some details about the algorithm or reference the class description to explain 
it since this seems to be the main functionality.



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -644,6 +656,11 @@ package object config {
       .checkValue(_ >= 0L, "Timeout must be >= 0.")
       .createWithDefault(60)
 
+  private[spark] val DYN_ALLOCATION_EXECUTOR_DEALLOCATION_TIMEOUT =
+    ConfigBuilder("spark.dynamicAllocation.executorDeallocationTimeout")

Review Comment:
   same here, if it only applies to streaming put that in the name



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -185,6 +195,12 @@ private[spark] class ExecutorAllocationManager(
    * If not, throw an appropriate exception.
    */
   private def validateSettings(): Unit = {
+
+    if ( streamingDRAFeatureEnabled && !Utils.isDynamicAllocationEnabled(conf) 
) {

Review Comment:
   nits, there are a bunch of formatting issues here, with extra spaces, and 
indentation and things wrong, please fix those



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -630,6 +636,12 @@ package object config {
       .doubleConf
       .createWithDefault(1.0)
 
+  private[spark] val DYN_ALLOCATION_EXECUTOR_DEALLOCATION_RATIO =
+    ConfigBuilder("spark.dynamicAllocation.executorDeallocationRatio")

Review Comment:
   If this only applies to streaming dra we should put streaming in the name.. 
add docs



##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -133,6 +133,16 @@ private[spark] class ExecutorAllocationManager(
 
   private val defaultProfileId = 
resourceProfileManager.defaultResourceProfile.id
 
+  private val streamingDRAFeatureEnabled = 
conf.get(DYN_ALLOCATION_STREAMING_ENABLED)

Review Comment:
   it would be nice to update the description of this class to include the 
streaming details and how the algorithm works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to