mbutrovich commented on code in PR #2538:
URL: https://github.com/apache/datafusion-comet/pull/2538#discussion_r2429376851
##########
spark/src/main/scala/org/apache/comet/CometExecIterator.scala:
##########
@@ -318,3 +262,74 @@ class CometExecIterator(
nativeLib.logMemoryUsage(s"task_memory_spark_$threadId", sparkTaskMemory)
}
}
+
+object CometExecIterator extends Logging {
+
+ def getMemoryConfig(conf: SparkConf): MemoryConfig = {
+ val numCores = numDriverOrExecutorCores(conf)
+ val coresPerTask = conf.get("spark.task.cpus", "1").toInt
+ // there are different paths for on-heap vs off-heap mode
+ val offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf)
+ if (offHeapMode) {
+ // in off-heap mode, Comet uses unified memory management to share
off-heap memory with Spark
+ val offHeapSize =
ByteUnit.MiB.toBytes(conf.getSizeAsMb("spark.memory.offHeap.size"))
+ val memoryFraction = CometConf.COMET_EXEC_MEMORY_POOL_FRACTION.get()
+ val memoryLimit = (offHeapSize * memoryFraction).toLong
+ val memoryLimitPerTask = (memoryLimit.toFloat * coresPerTask /
numCores).toLong
Review Comment:
We should be able to use `toDouble` instead of `toFloat` here. I'm not super
worried about rounding errors or overflow here, but better safe than sorry and
we won't see a performance difference.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]