rdblue commented on code in PR #6622:
URL: https://github.com/apache/iceberg/pull/6622#discussion_r1115101580
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -153,6 +172,152 @@ public Filter[] pushedFilters() {
return pushedFilters;
}
+ @Override
+ public boolean pushAggregation(Aggregation aggregation) {
+ if (!canPushDownAggregation(aggregation)) {
+ return false;
+ }
+
+ AggregateEvaluator aggregateEvaluator;
+ List<BoundAggregate<?, ?>> expressions =
+
Lists.newArrayListWithExpectedSize(aggregation.aggregateExpressions().length);
+
+ for (AggregateFunc aggregateFunc : aggregation.aggregateExpressions()) {
+ try {
+ Expression expr = SparkAggregates.convert(aggregateFunc);
+ if (expr != null) {
+ Expression bound = Binder.bind(schema.asStruct(), expr,
caseSensitive);
+ expressions.add((BoundAggregate<?, ?>) bound);
+ }
+ } catch (UnsupportedOperationException e) {
+ LOG.info(
+ "Skipping aggregate pushdown: AggregateFunc {} can't be converted
to iceberg Expression",
+ aggregateFunc,
+ e);
+ return false;
+ } catch (IllegalArgumentException e) {
+ LOG.info("Skipping aggregate pushdown: Bind failed for AggregateFunc
{}", aggregateFunc, e);
+ return false;
+ }
+ }
+
+ aggregateEvaluator = AggregateEvaluator.create(expressions);
+
+ if
(!metricsModeSupportsAggregatePushDown(aggregateEvaluator.aggregates())) {
+ return false;
+ }
+
+ TableScan scan = table.newScan().withColStats();
+ Snapshot snapshot = readSnapshot();
+ if (snapshot == null) {
+ LOG.info("Skipping aggregate pushdown: table snapshot is null");
+ return false;
+ }
+ scan = scan.useSnapshot(snapshot.snapshotId());
+ scan = configureSplitPlanning(scan);
+
+ try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles()) {
+ List<FileScanTask> tasks = ImmutableList.copyOf(fileScanTasks);
+ for (FileScanTask task : tasks) {
+ if (!task.deletes().isEmpty()) {
+ LOG.info("Skipping aggregate pushdown: detected row level deletes");
+ return false;
+ }
+
+ aggregateEvaluator.update(task.file());
+ }
+ } catch (IOException e) {
+ LOG.info("Skipping aggregate pushdown: ", e);
+ return false;
+ }
+
+ if (!aggregateEvaluator.allAggregatorsValid()) {
+ return false;
+ }
+
+ pushedAggregateSchema =
+ SparkSchemaUtil.convert(new
Schema(aggregateEvaluator.resultType().fields()));
+ InternalRow[] pushedAggregateRows = new InternalRow[1];
+ StructLike structLike = aggregateEvaluator.result();
+ pushedAggregateRows[0] =
+ new
StructInternalRow(aggregateEvaluator.resultType()).setStruct(structLike);
+ localScan = new SparkLocalScan(table, pushedAggregateSchema,
pushedAggregateRows);
+
+ return true;
+ }
+
+ private boolean canPushDownAggregation(Aggregation aggregation) {
+ if (!(table instanceof BaseTable)) {
+ return false;
+ }
+
+ if (!readConf.aggregatePushDownEnabled()) {
+ return false;
+ }
+
+ // If group by expression is the same as the partition, the statistics
information can still
+ // be used to calculate min/max/count, will enable aggregate push down in
next phase.
+ // TODO: enable aggregate push down for partition col group by expression
+ if (aggregation.groupByExpressions().length > 0) {
+ LOG.info("Skipping aggregate pushdown: group by aggregation push down is
not supported");
+ return false;
+ }
+
+ // TODO: enable aggregate push down for partition filter
+ if (filterExpressions != null) {
Review Comment:
I think that we could easily enable this if you wanted to.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]