Yicong-Huang commented on code in PR #3674:
URL: https://github.com/apache/texera/pull/3674#discussion_r2287081146


##########
core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/GreedyResourceAllocator.scala:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies
+
+import edu.uci.ics.amber.config.ApplicationConfig
+import edu.uci.ics.amber.core.storage.DocumentFactory
+import edu.uci.ics.amber.core.tuple.Tuple
+import edu.uci.ics.amber.core.workflow._
+import edu.uci.ics.amber.engine.architecture.scheduling.Region
+import edu.uci.ics.amber.engine.architecture.scheduling.config._
+import edu.uci.ics.amber.util.VirtualIdentityUtils
+import edu.uci.ics.texera.dao.SqlServer
+import edu.uci.ics.texera.dao.SqlServer.withTransaction
+import edu.uci.ics.texera.dao.jooq.generated.Tables.{WORKFLOW_EXECUTIONS, 
WORKFLOW_VERSION}
+
+import java.net.URI
+import scala.collection.mutable
+
+class GreedyResourceAllocator(
+                                physicalPlan: PhysicalPlan,
+                                executionClusterInfo: ExecutionClusterInfo,
+                                workflowSettings: WorkflowSettings,
+                                workflowContext: WorkflowContext
+                              ) extends ResourceAllocator {
+
+  // a map of a physical link to the partition info of the upstream/downstream 
of this link
+  private val linkPartitionInfos = new mutable.HashMap[PhysicalLink, 
PartitionInfo]()
+
+  private val operatorConfigs = new mutable.HashMap[PhysicalOpIdentity, 
OperatorConfig]()
+  private val linkConfigs = new mutable.HashMap[PhysicalLink, LinkConfig]()
+
+  /**
+   * Allocates resources for a given region and its operators.
+   *
+   * This method calculates and assigns worker configurations for each operator
+   * in the region.
+   * For the operators that are parallelizable, it respects the
+   * suggested worker number if provided.
+   * Non-parallelizable operators are assigned a single worker.
+   * For parallelizable operators without suggestions, the slowest operator
+   * is assigned with one more worker.
+   * Automatic adjustment will not result the total number of workers exceeds
+   * configured core to worker ratio * configured cpu cores.
+   * @param region The region for which to allocate resources.
+   * @return A tuple containing:
+   *         1) A new Region instance with new resource configuration.
+   *         2) An estimated cost of the workflow with the new resource 
configuration,
+   *         represented as a Double value (currently set to 0, but will be
+   *         updated in the future).
+   */
+  def allocate(
+                region: Region
+              ): (Region, Double) = {
+    val statsOpt = 
getOperatorExecutionStats(this.workflowContext.workflowId.id)
+    val maxWorkerUpperBound = executionClusterInfo.availableNumberOfCores * 
executionClusterInfo.coreToWorkerRatio
+    val operatorList = region.getOperators
+
+    val opToOperatorConfigMapping: Map[PhysicalOpIdentity, OperatorConfig] = 
statsOpt match {
+
+      case Some(stats) =>
+        val opToWorkerList = mutable.HashMap[PhysicalOpIdentity, 
List[WorkerConfig]]()
+
+        val greedyCandidates = operatorList.filter(op =>
+          op.parallelizable && op.suggestedWorkerNum.isEmpty && 
stats.contains(op.id.logicalOpId.id)
+        )
+
+        val slowestOpIdStrOpt =
+          greedyCandidates
+            .flatMap(op => stats.get(op.id.logicalOpId.id).map { case (rt, _) 
=> op.id.logicalOpId.id -> rt })
+            .maxByOption(_._2)
+            .map(_._1)
+
+        val basicWorkerCounts: Map[PhysicalOpIdentity, Int] = operatorList.map 
{ op =>
+          val opIdStr = op.id.logicalOpId.id
+          val baseCount =
+            if (!op.parallelizable) 1
+            else if (op.suggestedWorkerNum.isDefined) op.suggestedWorkerNum.get
+            else if (stats.contains(opIdStr)) stats(opIdStr)._2
+            else ApplicationConfig.numWorkerPerOperatorByDefault
+          op.id -> baseCount
+        }.toMap
+
+        val currentTotal = basicWorkerCounts.values.sum
+
+        val addingAllowed = slowestOpIdStrOpt.isDefined && currentTotal + 1 <= 
maxWorkerUpperBound
+
+        operatorList.foreach { op =>
+          val opIdStr = op.id.logicalOpId.id
+          val basicWorkerCount = basicWorkerCounts(op.id)
+          val updatedWorkerCount =
+            if (addingAllowed && slowestOpIdStrOpt.contains(opIdStr)) 
basicWorkerCount + 1 else basicWorkerCount
+          val workers = (0 until updatedWorkerCount).map { idx =>
+            WorkerConfig(
+              VirtualIdentityUtils.createWorkerIdentity(op.workflowId, op.id, 
idx)
+            )
+          }.toList
+          opToWorkerList(op.id) = workers
+        }
+
+        opToWorkerList.map {
+          case (opId, workerList) => opId -> OperatorConfig(workerList)
+        }.toMap
+
+      case None =>
+        region.getOperators
+          .map(op => op.id -> 
OperatorConfig(WorkerConfig.generateDefaultWorkerConfigs(op)))
+          .toMap
+    }
+
+    operatorConfigs ++= opToOperatorConfigMapping
+
+    val updatedLinkPartitionInfos = propagatePartitionRequirement(region, 
physicalPlan, operatorConfigs.toMap, linkPartitionInfos.toMap)
+
+    linkPartitionInfos ++= updatedLinkPartitionInfos
+
+    val linkToLinkConfigMapping =
+      getLinkConfigs(region, operatorConfigs.toMap, linkPartitionInfos.toMap, 
workflowSettings)
+
+    linkConfigs ++= linkToLinkConfigMapping
+
+    val portConfigs: Map[GlobalPortIdentity, PortConfig] = 
getPortConfigs(region, operatorConfigs.toMap, workflowSettings)
+
+    val resourceConfig = ResourceConfig(
+      opToOperatorConfigMapping,
+      linkToLinkConfigMapping,
+      portConfigs
+    )
+
+    (region.copy(resourceConfig = Some(resourceConfig)), 0)
+  }
+
+  private def getOperatorExecutionStats(wid: Long): Option[Map[String, 
(Double, Int)]] = {
+    val uriOpt: Option[String] =
+      withTransaction(SqlServer.getInstance().createDSLContext()) { context =>
+        val row = context
+          .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
+          .from(WORKFLOW_EXECUTIONS)
+          .join(WORKFLOW_VERSION)
+          .on(WORKFLOW_EXECUTIONS.VID.eq(WORKFLOW_VERSION.VID))
+          .where(
+            WORKFLOW_VERSION.WID.eq(wid.toInt)
+              .and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte)) // 成功状态
+          )
+          .orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc())
+          .limit(1)
+          .fetchOne()
+
+        Option(row).map(_.get(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI))
+      }
+
+    uriOpt.flatMap { uriStr =>
+      if (uriStr == null || uriStr.trim.isEmpty) {
+        None
+      } else {
+        val stats = readStatsFromUri(uriStr)
+        if (stats.isEmpty) None else Some(stats)
+      }
+    }
+  }
+
+
+  private def readStatsFromUri(uriStr: String): Map[String, (Double, Int)] = {

Review Comment:
   this method is not clear at all. what stats? what is the uri pointing to? 
please clarify by renaming and add comments.



##########
core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/GreedyResourceAllocator.scala:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies
+
+import edu.uci.ics.amber.config.ApplicationConfig
+import edu.uci.ics.amber.core.storage.DocumentFactory
+import edu.uci.ics.amber.core.tuple.Tuple
+import edu.uci.ics.amber.core.workflow._
+import edu.uci.ics.amber.engine.architecture.scheduling.Region
+import edu.uci.ics.amber.engine.architecture.scheduling.config._
+import edu.uci.ics.amber.util.VirtualIdentityUtils
+import edu.uci.ics.texera.dao.SqlServer
+import edu.uci.ics.texera.dao.SqlServer.withTransaction
+import edu.uci.ics.texera.dao.jooq.generated.Tables.{WORKFLOW_EXECUTIONS, 
WORKFLOW_VERSION}
+
+import java.net.URI
+import scala.collection.mutable
+
+class GreedyResourceAllocator(
+                                physicalPlan: PhysicalPlan,
+                                executionClusterInfo: ExecutionClusterInfo,
+                                workflowSettings: WorkflowSettings,
+                                workflowContext: WorkflowContext
+                              ) extends ResourceAllocator {
+
+  // a map of a physical link to the partition info of the upstream/downstream 
of this link
+  private val linkPartitionInfos = new mutable.HashMap[PhysicalLink, 
PartitionInfo]()
+
+  private val operatorConfigs = new mutable.HashMap[PhysicalOpIdentity, 
OperatorConfig]()
+  private val linkConfigs = new mutable.HashMap[PhysicalLink, LinkConfig]()
+
+  /**
+   * Allocates resources for a given region and its operators.
+   *
+   * This method calculates and assigns worker configurations for each operator
+   * in the region.
+   * For the operators that are parallelizable, it respects the
+   * suggested worker number if provided.
+   * Non-parallelizable operators are assigned a single worker.
+   * For parallelizable operators without suggestions, the slowest operator
+   * is assigned with one more worker.
+   * Automatic adjustment will not result the total number of workers exceeds
+   * configured core to worker ratio * configured cpu cores.
+   * @param region The region for which to allocate resources.
+   * @return A tuple containing:
+   *         1) A new Region instance with new resource configuration.
+   *         2) An estimated cost of the workflow with the new resource 
configuration,
+   *         represented as a Double value (currently set to 0, but will be
+   *         updated in the future).
+   */
+  def allocate(
+                region: Region
+              ): (Region, Double) = {
+    val statsOpt = 
getOperatorExecutionStats(this.workflowContext.workflowId.id)
+    val maxWorkerUpperBound = executionClusterInfo.availableNumberOfCores * 
executionClusterInfo.coreToWorkerRatio
+    val operatorList = region.getOperators
+
+    val opToOperatorConfigMapping: Map[PhysicalOpIdentity, OperatorConfig] = 
statsOpt match {
+
+      case Some(stats) =>
+        val opToWorkerList = mutable.HashMap[PhysicalOpIdentity, 
List[WorkerConfig]]()
+
+        val greedyCandidates = operatorList.filter(op =>
+          op.parallelizable && op.suggestedWorkerNum.isEmpty && 
stats.contains(op.id.logicalOpId.id)
+        )
+
+        val slowestOpIdStrOpt =
+          greedyCandidates
+            .flatMap(op => stats.get(op.id.logicalOpId.id).map { case (rt, _) 
=> op.id.logicalOpId.id -> rt })
+            .maxByOption(_._2)
+            .map(_._1)
+
+        val basicWorkerCounts: Map[PhysicalOpIdentity, Int] = operatorList.map 
{ op =>
+          val opIdStr = op.id.logicalOpId.id
+          val baseCount =
+            if (!op.parallelizable) 1
+            else if (op.suggestedWorkerNum.isDefined) op.suggestedWorkerNum.get
+            else if (stats.contains(opIdStr)) stats(opIdStr)._2
+            else ApplicationConfig.numWorkerPerOperatorByDefault
+          op.id -> baseCount
+        }.toMap
+
+        val currentTotal = basicWorkerCounts.values.sum
+
+        val addingAllowed = slowestOpIdStrOpt.isDefined && currentTotal + 1 <= 
maxWorkerUpperBound
+
+        operatorList.foreach { op =>
+          val opIdStr = op.id.logicalOpId.id
+          val basicWorkerCount = basicWorkerCounts(op.id)
+          val updatedWorkerCount =
+            if (addingAllowed && slowestOpIdStrOpt.contains(opIdStr)) 
basicWorkerCount + 1 else basicWorkerCount
+          val workers = (0 until updatedWorkerCount).map { idx =>
+            WorkerConfig(
+              VirtualIdentityUtils.createWorkerIdentity(op.workflowId, op.id, 
idx)
+            )
+          }.toList
+          opToWorkerList(op.id) = workers
+        }
+
+        opToWorkerList.map {
+          case (opId, workerList) => opId -> OperatorConfig(workerList)
+        }.toMap
+
+      case None =>
+        region.getOperators
+          .map(op => op.id -> 
OperatorConfig(WorkerConfig.generateDefaultWorkerConfigs(op)))
+          .toMap
+    }
+
+    operatorConfigs ++= opToOperatorConfigMapping
+
+    val updatedLinkPartitionInfos = propagatePartitionRequirement(region, 
physicalPlan, operatorConfigs.toMap, linkPartitionInfos.toMap)
+
+    linkPartitionInfos ++= updatedLinkPartitionInfos
+
+    val linkToLinkConfigMapping =
+      getLinkConfigs(region, operatorConfigs.toMap, linkPartitionInfos.toMap, 
workflowSettings)
+
+    linkConfigs ++= linkToLinkConfigMapping
+
+    val portConfigs: Map[GlobalPortIdentity, PortConfig] = 
getPortConfigs(region, operatorConfigs.toMap, workflowSettings)
+
+    val resourceConfig = ResourceConfig(
+      opToOperatorConfigMapping,
+      linkToLinkConfigMapping,
+      portConfigs
+    )
+
+    (region.copy(resourceConfig = Some(resourceConfig)), 0)
+  }
+
+  private def getOperatorExecutionStats(wid: Long): Option[Map[String, 
(Double, Int)]] = {
+    val uriOpt: Option[String] =
+      withTransaction(SqlServer.getInstance().createDSLContext()) { context =>
+        val row = context
+          .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
+          .from(WORKFLOW_EXECUTIONS)
+          .join(WORKFLOW_VERSION)
+          .on(WORKFLOW_EXECUTIONS.VID.eq(WORKFLOW_VERSION.VID))
+          .where(
+            WORKFLOW_VERSION.WID.eq(wid.toInt)
+              .and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte)) // 成功状态
+          )
+          .orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc())
+          .limit(1)
+          .fetchOne()
+
+        Option(row).map(_.get(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI))
+      }
+
+    uriOpt.flatMap { uriStr =>
+      if (uriStr == null || uriStr.trim.isEmpty) {
+        None
+      } else {
+        val stats = readStatsFromUri(uriStr)
+        if (stats.isEmpty) None else Some(stats)
+      }
+    }
+  }
+
+
+  private def readStatsFromUri(uriStr: String): Map[String, (Double, Int)] = {
+    val uri = new URI(uriStr)
+    val document = DocumentFactory.openDocument(uri)
+
+    document._1.get().foldLeft(Map.empty[String, (Double, Int)]) { (acc, 
tuple) =>
+      val record = tuple.asInstanceOf[Tuple]

Review Comment:
   what is a `record`? please give meaningful naming.



##########
core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala:
##########
@@ -19,126 +19,29 @@
 
 package edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies
 
-import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity
 import edu.uci.ics.amber.core.workflow._
 import edu.uci.ics.amber.engine.architecture.scheduling.Region
 import 
edu.uci.ics.amber.engine.architecture.scheduling.config.ChannelConfig.generateChannelConfigs
 import 
edu.uci.ics.amber.engine.architecture.scheduling.config.LinkConfig.toPartitioning
-import 
edu.uci.ics.amber.engine.architecture.scheduling.config.WorkerConfig.generateWorkerConfigs
+import 
edu.uci.ics.amber.engine.architecture.scheduling.config.WorkerConfig.generateDefaultWorkerConfigs
 import edu.uci.ics.amber.engine.architecture.scheduling.config._
-import 
edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings.Partitioning
 import 
edu.uci.ics.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage
 
 import java.net.URI
 import scala.collection.mutable
 
 trait ResourceAllocator {
-  def allocate(region: Region): (Region, Double)
-}
-
-class DefaultResourceAllocator(
-    physicalPlan: PhysicalPlan,
-    executionClusterInfo: ExecutionClusterInfo,
-    workflowSettings: WorkflowSettings
-) extends ResourceAllocator {
-
-  // a map of a physical link to the partition info of the upstream/downstream 
of this link
-  private val linkPartitionInfos = new mutable.HashMap[PhysicalLink, 
PartitionInfo]()
-
-  private val operatorConfigs = new mutable.HashMap[PhysicalOpIdentity, 
OperatorConfig]()
-  private val linkConfigs = new mutable.HashMap[PhysicalLink, LinkConfig]()
 
   /**
-    * Allocates resources for a given region and its operators.
+    * Allocate resources for the given region (operator/link/port).
+    * Returns the region with a new ResourceConfig and an estimated cost.
+    * Different ResourceAllocator implementations may apply different methods;
+    * this one applies the default allocation method.
     *
-    * This method calculates and assigns worker configurations for each 
operator
-    * in the region. For the operators that are parallelizable, it respects the
-    * suggested worker number if provided. Otherwise, it falls back to a 
default
-    * value. Non-parallelizable operators are assigned a single worker.
-    *
-    * @param region The region for which to allocate resources.
-    * @return A tuple containing:
-    *         1) A new Region instance with new resource configuration.
-    *         2) An estimated cost of the workflow with the new resource 
configuration,
-    *         represented as a Double value (currently set to 0, but will be
-    *         updated in the future).
+    * @param region Region to allocate.
+    * @return (updated Region, estimated cost)

Review Comment:
   per comments in #3660, we hope to only return resourceConfig instead of the 
updated region. 



##########
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/java/JavaUDFOpDesc.scala:
##########
@@ -60,10 +61,11 @@ class JavaUDFOpDesc extends LogicalOp {
   @JsonPropertyDescription("Input your code here")
   var code: String = ""
 
-  @JsonProperty(required = true, defaultValue = "1")
-  @JsonSchemaTitle("Worker count")
-  @JsonPropertyDescription("Specify how many parallel workers to lunch")
-  var workers: Int = Int.box(1)
+  @JsonProperty(required = true, defaultValue = "true")
+  @JsonSchemaTitle("Parallelizable?")
+  @JsonPropertyDescription("Default: True")
+  @JsonSchemaInject(json = """{"toggleHidden" : ["advanced"]}""")
+  val parallelizable: Boolean = Boolean.box(true)

Review Comment:
   I am a bit against this three-step design. Why do we ask users to click a 
check box (`parallelizible`), then click another one (`advanced`), then provide 
a number? This is way too complicated. Can we simplify it?



##########
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/java/JavaUDFOpDesc.scala:
##########
@@ -109,23 +122,40 @@ class JavaUDFOpDesc extends LogicalOp {
       Map(operatorInfo.outputPorts.head.id -> outputSchema)
     }
 
-    if (workers > 1)
-      PhysicalOp
-        .oneToOnePhysicalOp(
-          workflowId,
-          executionId,
-          operatorIdentifier,
-          OpExecWithCode(code, "java")
-        )
-        .withDerivePartition(_ => UnknownPartition())
-        .withInputPorts(operatorInfo.inputPorts)
-        .withOutputPorts(operatorInfo.outputPorts)
-        .withPartitionRequirement(partitionRequirement)
-        .withIsOneToManyOp(true)
-        .withParallelizable(true)
-        .withSuggestedWorkerNum(workers)
-        .withPropagateSchema(SchemaPropagationFunc(propagateSchema))
-    else
+    if (parallelizable) {
+      if (advanced) {
+        PhysicalOp
+          .oneToOnePhysicalOp(
+            workflowId,
+            executionId,
+            operatorIdentifier,
+            OpExecWithCode(code, "java")
+          )
+          .withDerivePartition(_ => UnknownPartition())
+          .withInputPorts(operatorInfo.inputPorts)
+          .withOutputPorts(operatorInfo.outputPorts)
+          .withPartitionRequirement(partitionRequirement)
+          .withIsOneToManyOp(true)
+          .withParallelizable(true)
+          .withSuggestedWorkerNum(workers)
+          .withPropagateSchema(SchemaPropagationFunc(propagateSchema))
+      } else {
+        PhysicalOp
+          .oneToOnePhysicalOp(
+            workflowId,
+            executionId,
+            operatorIdentifier,
+            OpExecWithCode(code, "java")
+          )
+          .withDerivePartition(_ => UnknownPartition())
+          .withInputPorts(operatorInfo.inputPorts)
+          .withOutputPorts(operatorInfo.outputPorts)

Review Comment:
   see pythonUDFSourceOpDescV2 for example.



##########
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/DualInputPortsPythonUDFOpDescV2.scala:
##########
@@ -78,21 +80,41 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp {
   )
   var outputColumns: List[Attribute] = List()
 
+  @JsonProperty(required = true, defaultValue = "false")
+  @JsonSchemaTitle("Advanced Setting")
+  @JsonDeserialize(contentAs = classOf[java.lang.Boolean])
+  @JsonSchemaInject(json = """{"toggleHidden" : ["workers"]}""")
+  var advanced: Boolean = Boolean.box(false)
+
+  @JsonProperty(required = true, defaultValue = "1")
+  @JsonSchemaTitle("Worker count")
+  @JsonPropertyDescription("Specify how many parallel workers to launch")
+  var workers: Int = Int.box(1)
+
   override def getPhysicalOp(
       workflowId: WorkflowIdentity,
       executionId: ExecutionIdentity
   ): PhysicalOp = {
     Preconditions.checkArgument(workers >= 1, "Need at least 1 worker.", 
Array())
-    val physicalOp = if (workers > 1) {
-      PhysicalOp
-        .oneToOnePhysicalOp(
-          workflowId,
-          executionId,
-          operatorIdentifier,
-          OpExecWithCode(code, "python")
-        )
-        .withParallelizable(true)
-        .withSuggestedWorkerNum(workers)
+    val physicalOp = if (parallelizable) {
+      if (advanced) {
+        PhysicalOp
+          .oneToOnePhysicalOp(
+            workflowId,
+            executionId,
+            operatorIdentifier,
+            OpExecWithCode(code, "python")
+          )
+          .withSuggestedWorkerNum(workers)
+      } else {
+        PhysicalOp
+          .oneToOnePhysicalOp(
+            workflowId,
+            executionId,
+            operatorIdentifier,
+            OpExecWithCode(code, "python")
+          )
+      }

Review Comment:
   ditto.



##########
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFOpDesc.scala:
##########
@@ -112,16 +125,25 @@ class RUDFOpDesc extends LogicalOp {
     }
 
     val r_operator_type = if (useTupleAPI) "r-tuple" else "r-table"
-    if (workers > 1) {
-      PhysicalOp
-        .oneToOnePhysicalOp(
-          workflowId,
-          executionId,
-          operatorIdentifier,
-          OpExecWithCode(code, r_operator_type)
-        )
-        .withParallelizable(true)
-        .withSuggestedWorkerNum(workers)
+    if (parallelizable) {
+      if (advanced) {
+        PhysicalOp
+          .oneToOnePhysicalOp(
+            workflowId,
+            executionId,
+            operatorIdentifier,
+            OpExecWithCode(code, r_operator_type)
+          )
+      } else {
+        PhysicalOp
+          .oneToOnePhysicalOp(
+            workflowId,
+            executionId,
+            operatorIdentifier,
+            OpExecWithCode(code, r_operator_type)
+          )
+          .withSuggestedWorkerNum(workers)
+      }

Review Comment:
   ditto.



##########
core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala:
##########
@@ -152,7 +55,13 @@ class DefaultResourceAllocator(
     * The link A->HJ will be propagated in the first region. The link B->HJ 
will be propagated in the second region.
     * The output partition info of HJ will be derived after both links are 
propagated, which is in the second region.
     */
-  private def propagatePartitionRequirement(region: Region): Unit = {
+  def propagatePartitionRequirement(
+      region: Region,
+      physicalPlan: PhysicalPlan,
+      operatorConfigs: Map[PhysicalOpIdentity, OperatorConfig],
+      seedLinkPartitions: Map[PhysicalLink, PartitionInfo] = Map.empty
+  ): Map[PhysicalLink, PartitionInfo] = {
+    val linkPartitionInfos = mutable.HashMap[PhysicalLink, PartitionInfo]() 
++= seedLinkPartitions

Review Comment:
   why are you saving a copy of the link partitions inside this method? the 
return type is already a map of partition infos. why do you need to pass an 
input map `seedLinkPartitions`?



##########
core/config/src/main/resources/application.conf:
##########
@@ -120,7 +120,7 @@ fault-tolerance {
 }
 
 schedule-generator {
-    max-concurrent-regions = 1
+    max-concurrent-regions = 2

Review Comment:
   why do we change this default value?



##########
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/java/JavaUDFOpDesc.scala:
##########
@@ -109,23 +122,40 @@ class JavaUDFOpDesc extends LogicalOp {
       Map(operatorInfo.outputPorts.head.id -> outputSchema)
     }
 
-    if (workers > 1)
-      PhysicalOp
-        .oneToOnePhysicalOp(
-          workflowId,
-          executionId,
-          operatorIdentifier,
-          OpExecWithCode(code, "java")
-        )
-        .withDerivePartition(_ => UnknownPartition())
-        .withInputPorts(operatorInfo.inputPorts)
-        .withOutputPorts(operatorInfo.outputPorts)
-        .withPartitionRequirement(partitionRequirement)
-        .withIsOneToManyOp(true)
-        .withParallelizable(true)
-        .withSuggestedWorkerNum(workers)
-        .withPropagateSchema(SchemaPropagationFunc(propagateSchema))
-    else
+    if (parallelizable) {
+      if (advanced) {
+        PhysicalOp
+          .oneToOnePhysicalOp(
+            workflowId,
+            executionId,
+            operatorIdentifier,
+            OpExecWithCode(code, "java")
+          )
+          .withDerivePartition(_ => UnknownPartition())
+          .withInputPorts(operatorInfo.inputPorts)
+          .withOutputPorts(operatorInfo.outputPorts)
+          .withPartitionRequirement(partitionRequirement)
+          .withIsOneToManyOp(true)
+          .withParallelizable(true)
+          .withSuggestedWorkerNum(workers)
+          .withPropagateSchema(SchemaPropagationFunc(propagateSchema))
+      } else {
+        PhysicalOp
+          .oneToOnePhysicalOp(
+            workflowId,
+            executionId,
+            operatorIdentifier,
+            OpExecWithCode(code, "java")
+          )
+          .withDerivePartition(_ => UnknownPartition())
+          .withInputPorts(operatorInfo.inputPorts)
+          .withOutputPorts(operatorInfo.outputPorts)

Review Comment:
   merge the common code. only apply a difference part (i.e.,  
`.withParallelizable(true)`) to different cases



##########
core/config/src/main/resources/application.conf:
##########
@@ -133,6 +133,15 @@ schedule-generator {
     search-timeout-milliseconds = 
${?SCHEDULE_GENERATOR_SEARCH_TIMEOUT_MILLISECONDS}
 }
 
+operator-parallelism {
+    allocator-type = "greedy"
+    allocator-type = ${?OPERATOR_PARALLELISM_ALLOCATOR_TYPE}
+    available-cores = 8
+    available-cores = ${?OPERATOR_PARALLELISM_AVAILABLE_CORES}
+    core-to-worker-ratio = 1.5
+    core-to-worker-ratio =${?OPERATOR_PARALLELISM_CORE_TO_WORKER_RATIO}

Review Comment:
   do you mean cpu core? please make it explicit. 



##########
core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/PythonUDFOpDescV2.scala:
##########
@@ -112,16 +125,25 @@ class PythonUDFOpDescV2 extends LogicalOp {
       Map(operatorInfo.outputPorts.head.id -> outputSchema)
     }
 
-    val physicalOp = if (workers > 1) {
-      PhysicalOp
-        .oneToOnePhysicalOp(
-          workflowId,
-          executionId,
-          operatorIdentifier,
-          OpExecWithCode(code, "python")
-        )
-        .withParallelizable(true)
-        .withSuggestedWorkerNum(workers)
+    val physicalOp = if (parallelizable) {
+      if (advanced) {
+        PhysicalOp
+          .oneToOnePhysicalOp(
+            workflowId,
+            executionId,
+            operatorIdentifier,
+            OpExecWithCode(code, "python")
+          )
+          .withSuggestedWorkerNum(workers)
+      } else {
+        PhysicalOp
+          .oneToOnePhysicalOp(
+            workflowId,
+            executionId,
+            operatorIdentifier,
+            OpExecWithCode(code, "python")
+          )
+      }

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to