RussellSpitzer commented on code in PR #6524:
URL: https://github.com/apache/iceberg/pull/6524#discussion_r1062662228
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java:
##########
@@ -106,41 +106,41 @@ public SparkScanBuilder caseSensitive(boolean
isCaseSensitive) {
@Override
public Filter[] pushFilters(Filter[] filters) {
List<Expression> expressions =
Lists.newArrayListWithExpectedSize(filters.length);
- List<Filter> pushed = Lists.newArrayListWithExpectedSize(filters.length);
+ List<Filter> pushableFilters =
Lists.newArrayListWithExpectedSize(filters.length);
+ List<Filter> postScanFilters =
Lists.newArrayListWithExpectedSize(filters.length);
for (Filter filter : filters) {
- Expression expr = null;
try {
- expr = SparkFilters.convert(filter);
- } catch (IllegalArgumentException e) {
- // converting to Iceberg Expression failed, so this expression cannot
be pushed down
- LOG.info(
- "Failed to convert filter to Iceberg expression, skipping push
down for this expression: {}. {}",
- filter,
- e.getMessage());
- }
+ Expression expr = SparkFilters.convert(filter);
- if (expr != null) {
- try {
+ if (expr != null) {
+ // try binding the expression to ensure it can be pushed down
Binder.bind(schema.asStruct(), expr, caseSensitive);
+
expressions.add(expr);
- pushed.add(filter);
- } catch (ValidationException e) {
- // binding to the table schema failed, so this expression cannot be
pushed down
- LOG.info(
- "Failed to bind expression to table schema, skipping push down
for this expression: {}. {}",
- filter,
- e.getMessage());
+ pushableFilters.add(filter);
+ }
+
+ if (expr == null || requiresRecordLevelFiltering(expr)) {
+ postScanFilters.add(filter);
}
+ } catch (Exception e) {
+ LOG.warn("Failed to check if {} can be pushed down: {}", filter,
e.getMessage());
+ postScanFilters.add(filter);
}
}
this.filterExpressions = expressions;
- this.pushedFilters = pushed.toArray(new Filter[0]);
+ this.pushedFilters = pushableFilters.toArray(new Filter[0]);
+
+ // all unsupported filters and filters that require record-level filtering
+ // must be reported back and handled on the Spark side
+ return postScanFilters.toArray(new Filter[0]);
+ }
- // Spark doesn't support residuals per task, so return all filters
- // to get Spark to handle record-level filtering
- return filters;
+ private boolean requiresRecordLevelFiltering(Expression expr) {
+ return table.specs().values().stream()
+ .anyMatch(spec -> !ExpressionUtil.selectsPartitions(expr, spec,
caseSensitive));
Review Comment:
Actually maybe I like your version better. I guess negating at the low level
matches the function name better than negating .allMatch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]