[ 
https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12399:
-----------------------------------
    Labels: pull-request-available  (was: )

> FilterableTableSource does not use filters on job run
> -----------------------------------------------------
>
>                 Key: FLINK-12399
>                 URL: https://issues.apache.org/jira/browse/FLINK-12399
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.8.0
>            Reporter: Josh Bradt
>            Assignee: Rong Rong
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: flink-filter-bug.tar.gz
>
>
> As discussed [on the mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html],
>  there appears to be a bug where a job that uses a custom 
> FilterableTableSource does not keep the filters that were pushed down into 
> the table source. More specifically, the table source does receive filters 
> via applyPredicates, and a new table source with those filters is returned, 
> but the final job graph appears to use the original table source, which does 
> not contain any filters.
> I attached a minimal example program to this ticket. The custom table source 
> is as follows: 
> {code:java}
> public class CustomTableSource implements BatchTableSource<Model>, 
> FilterableTableSource<Model> {
>     private static final Logger LOG = 
> LoggerFactory.getLogger(CustomTableSource.class);
>     private final Filter[] filters;
>     private final FilterConverter converter = new FilterConverter();
>     public CustomTableSource() {
>         this(null);
>     }
>     private CustomTableSource(Filter[] filters) {
>         this.filters = filters;
>     }
>     @Override
>     public DataSet<Model> getDataSet(ExecutionEnvironment execEnv) {
>         if (filters == null) {
>            LOG.info("==== No filters defined ====");
>         } else {
>             LOG.info("==== Found filters ====");
>             for (Filter filter : filters) {
>                 LOG.info("FILTER: {}", filter);
>             }
>         }
>         return execEnv.fromCollection(allModels());
>     }
>     @Override
>     public TableSource<Model> applyPredicate(List<Expression> predicates) {
>         LOG.info("Applying predicates");
>         List<Filter> acceptedFilters = new ArrayList<>();
>         for (final Expression predicate : predicates) {
>             converter.convert(predicate).ifPresent(acceptedFilters::add);
>         }
>         return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
>     }
>     @Override
>     public boolean isFilterPushedDown() {
>         return filters != null;
>     }
>     @Override
>     public TypeInformation<Model> getReturnType() {
>         return TypeInformation.of(Model.class);
>     }
>     @Override
>     public TableSchema getTableSchema() {
>         return TableSchema.fromTypeInfo(getReturnType());
>     }
>     private List<Model> allModels() {
>         List<Model> models = new ArrayList<>();
>         models.add(new Model(1, 2, 3, 4));
>         models.add(new Model(10, 11, 12, 13));
>         models.add(new Model(20, 21, 22, 23));
>         return models;
>     }
> }
> {code}
>  
> When run, it logs
> {noformat}
> 15:24:54,888 INFO  com.klaviyo.filterbug.CustomTableSource                    
>    - Applying predicates
> 15:24:54,901 INFO  com.klaviyo.filterbug.CustomTableSource                    
>    - Applying predicates
> 15:24:54,910 INFO  com.klaviyo.filterbug.CustomTableSource                    
>    - Applying predicates
> 15:24:54,977 INFO  com.klaviyo.filterbug.CustomTableSource                    
>    - ==== No filters defined ===={noformat}
> which appears to indicate that although filters are getting pushed down, the 
> final job does not use them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to