[ https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-12399: ----------------------------------- Labels: pull-request-available (was: ) > FilterableTableSource does not use filters on job run > ----------------------------------------------------- > > Key: FLINK-12399 > URL: https://issues.apache.org/jira/browse/FLINK-12399 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.8.0 > Reporter: Josh Bradt > Assignee: Rong Rong > Priority: Major > Labels: pull-request-available > Attachments: flink-filter-bug.tar.gz > > > As discussed [on the mailing > list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html], > there appears to be a bug where a job that uses a custom > FilterableTableSource does not keep the filters that were pushed down into > the table source. More specifically, the table source does receive filters > via applyPredicates, and a new table source with those filters is returned, > but the final job graph appears to use the original table source, which does > not contain any filters. > I attached a minimal example program to this ticket. The custom table source > is as follows: > {code:java} > public class CustomTableSource implements BatchTableSource<Model>, > FilterableTableSource<Model> { > private static final Logger LOG = > LoggerFactory.getLogger(CustomTableSource.class); > private final Filter[] filters; > private final FilterConverter converter = new FilterConverter(); > public CustomTableSource() { > this(null); > } > private CustomTableSource(Filter[] filters) { > this.filters = filters; > } > @Override > public DataSet<Model> getDataSet(ExecutionEnvironment execEnv) { > if (filters == null) { > LOG.info("==== No filters defined ===="); > } else { > LOG.info("==== Found filters ===="); > for (Filter filter : filters) { > LOG.info("FILTER: {}", filter); > } > } > return execEnv.fromCollection(allModels()); > } > @Override > public TableSource<Model> applyPredicate(List<Expression> predicates) { > LOG.info("Applying predicates"); > List<Filter> acceptedFilters = new ArrayList<>(); > for (final Expression predicate : predicates) { > converter.convert(predicate).ifPresent(acceptedFilters::add); > } > return new CustomTableSource(acceptedFilters.toArray(new Filter[0])); > } > @Override > public boolean isFilterPushedDown() { > return filters != null; > } > @Override > public TypeInformation<Model> getReturnType() { > return TypeInformation.of(Model.class); > } > @Override > public TableSchema getTableSchema() { > return TableSchema.fromTypeInfo(getReturnType()); > } > private List<Model> allModels() { > List<Model> models = new ArrayList<>(); > models.add(new Model(1, 2, 3, 4)); > models.add(new Model(10, 11, 12, 13)); > models.add(new Model(20, 21, 22, 23)); > return models; > } > } > {code} > > When run, it logs > {noformat} > 15:24:54,888 INFO com.klaviyo.filterbug.CustomTableSource > - Applying predicates > 15:24:54,901 INFO com.klaviyo.filterbug.CustomTableSource > - Applying predicates > 15:24:54,910 INFO com.klaviyo.filterbug.CustomTableSource > - Applying predicates > 15:24:54,977 INFO com.klaviyo.filterbug.CustomTableSource > - ==== No filters defined ===={noformat} > which appears to indicate that although filters are getting pushed down, the > final job does not use them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)