Shekharrajak commented on code in PR #3519:
URL: https://github.com/apache/datafusion-comet/pull/3519#discussion_r2807300733
##########
spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala:
##########
@@ -154,39 +154,59 @@ object IcebergReflection extends Logging {
/**
* Gets the tasks from a SparkScan.
*
- * The tasks() method is protected in SparkScan, requiring reflection to
access.
+ * SparkBatchQueryScan (via SparkPartitioningAwareScan) has tasks() method.
SparkStagedScan (via
+ * SparkScan) has taskGroups() - we extract tasks from groups.
*/
def getTasks(scan: Any): Option[java.util.List[_]] = {
- try {
- val tasksMethod = scan.getClass.getSuperclass
- .getDeclaredMethod("tasks")
- tasksMethod.setAccessible(true)
- Some(tasksMethod.invoke(scan).asInstanceOf[java.util.List[_]])
- } catch {
- case e: Exception =>
- logError(
- s"Iceberg reflection failure: Failed to get tasks from SparkScan:
${e.getMessage}")
- None
+ // Try tasks() first (SparkPartitioningAwareScan hierarchy)
+ findMethodInHierarchy(scan.getClass, "tasks").flatMap { tasksMethod =>
+ try {
+ return Some(tasksMethod.invoke(scan).asInstanceOf[java.util.List[_]])
+ } catch {
+ case _: Exception => None
+ }
+ }
+
+ // Fall back to taskGroups() (SparkScan hierarchy - used by
SparkStagedScan)
+ findMethodInHierarchy(scan.getClass, "taskGroups").flatMap {
taskGroupsMethod =>
+ try {
+ val taskGroups =
taskGroupsMethod.invoke(scan).asInstanceOf[java.util.List[_]]
+ // Extract individual tasks from each ScanTaskGroup
+ val allTasks = new java.util.ArrayList[Any]()
+ val iter = taskGroups.iterator()
+ while (iter.hasNext) {
+ val group = iter.next()
+ val tasksMethod = group.getClass.getMethod("tasks")
+ val groupTasks =
tasksMethod.invoke(group).asInstanceOf[java.lang.Iterable[_]]
+ groupTasks.forEach(task => allTasks.add(task))
+ }
+ Some(allTasks.asInstanceOf[java.util.List[_]])
+ } catch {
+ case e: Exception =>
+ logError(
+ s"Iceberg reflection failure: Failed to get tasks from SparkScan:
${e.getMessage}")
+ None
+ }
}
}
/**
* Gets the filter expressions from a SparkScan.
*
- * The filterExpressions() method is protected in SparkScan.
+ * The filterExpressions() method is protected in SparkScan. Uses
findMethodInHierarchy to
+ * support both SparkBatchQueryScan and SparkStagedScan.
*/
def getFilterExpressions(scan: Any): Option[java.util.List[_]] = {
- try {
- val filterExpressionsMethod = scan.getClass.getSuperclass.getSuperclass
- .getDeclaredMethod("filterExpressions")
- filterExpressionsMethod.setAccessible(true)
-
Some(filterExpressionsMethod.invoke(scan).asInstanceOf[java.util.List[_]])
- } catch {
- case e: Exception =>
- logError(
- "Iceberg reflection failure: Failed to get filter expressions from
SparkScan: " +
- s"${e.getMessage}")
- None
+ findMethodInHierarchy(scan.getClass, "filterExpressions").flatMap {
filterExpressionsMethod =>
Review Comment:
previously we were assuming a fixed Iceberg class hierarchy, this
findMethodInHierarchy walks up the class tree - better approach.
For compaction to work, we need to extract FileScanTask objects from the
scan. Different Iceberg scan types expose tasks differently:
SparkBatchQueryScan -> tasks() method
SparkStagedScan -> taskGroups() method (returns groups, need to extract
tasks from each)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]