rdblue commented on a change in pull request #1893:
URL: https://github.com/apache/iceberg/pull/1893#discussion_r557542475



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -122,6 +134,21 @@ public boolean isLimitPushedDown() {
 
   @Override
   public TableSource<RowData> applyLimit(long newLimit) {
-    return new IcebergTableSource(loader, schema, properties, projectedFields, 
true, newLimit);
+    return new IcebergTableSource(loader, schema, properties, projectedFields, 
true, newLimit, filters);
+  }
+
+  @Override
+  public TableSource<RowData> applyPredicate(List<Expression> predicates) {
+    List<org.apache.iceberg.expressions.Expression> expressions = 
Lists.newArrayList();
+    for (Expression predicate : predicates) {
+      FlinkFilters.convert(predicate).ifPresent(expressions::add);
+    }
+
+    return new IcebergTableSource(loader, schema, properties, projectedFields, 
isLimitPushDown, limit, expressions);

Review comment:
       @openinx, is this guaranteed to only be called on a source that has not 
had predicates pushed? This ignores the existing predicates in this source. 
Maybe we should add a precondition to check that assumption.
   
   I think it is safe either way because this doesn't remove any predicates 
from the list. From reading the Javadoc, I think that will result in all 
predicates running in Flink also.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to