Hi Fabian, Thanks for taking a look. I've filed this ticket: https://issues.apache.org/jira/browse/FLINK-12399
Thanks, Josh On Fri, May 3, 2019 at 3:41 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Josh, > > The code looks good to me. > This seems to be a bug then. > It's strange that it works for ORC. > > Would you mind opening a Jira ticket and maybe a simple reproducable code > example? > > Thank you, > Fabian > > Am Do., 2. Mai 2019 um 18:23 Uhr schrieb Josh Bradt < > josh.br...@klaviyo.com>: > >> Hi Fabian, >> >> Thanks for your reply. My custom table source does not implement >> ProjectableTableSource. I believe that isFilterPushedDown is implemented >> correctly since it's nearly identical to what's written in the >> OrcTableSource. I pasted a slightly simplified version of the >> implementation below. If you wouldn't mind reading over it, is there >> anything obviously wrong? >> >> public final class CustomerTableSource implements BatchTableSource<Customer>, >> FilterableTableSource<Customer> { >> >> // Iterator that gets data from a REST API as POJO instances >> private final AppResourceIterator<Customer> resourceIterator; >> private final String tableName; >> private final Class<Customer> modelClass; >> private final AppRequestFilter[] filters; >> >> public CustomerTableSource( >> AppResourceIterator<Customer> resourceIterator, >> String tableName, >> Class<Customer> modelClass) { >> >> this(resourceIterator, tableName, modelClass, null); >> } >> >> protected CustomerTableSource( >> AppResourceIterator<Customer> resourceIterator, >> String tableName, >> Class<Customer> modelClass, >> AppRequestFilter[] filters) { >> >> this.resourceIterator = resourceIterator; >> this.tableName = tableName; >> this.modelClass = modelClass; >> this.filters = filters; >> } >> >> @Override >> public TableSource<Customer> applyPredicate(List<Expression> predicates) >> { >> List<Expression> acceptedPredicates = new ArrayList<>(); >> List<AppRequestFilter> acceptedFilters = new ArrayList<>(); >> >> for (final Expression predicate : predicates) { >> buildFilterForPredicate(predicate).ifPresent(filter -> { >> acceptedFilters.add(filter); >> acceptedPredicates.add(predicate); >> }); >> } >> >> predicates.removeAll(acceptedPredicates); >> >> return new CustomerTableSource( >> resourceIterator.withFilters(acceptedFilters), >> tableName, >> modelClass, >> acceptedFilters.toArray(new AppRequestFilter[0]) >> ); >> } >> >> public Optional<AppRequestFilter> buildFilterForPredicate(Expression >> predicate) { >> // Code for translating an Expression into an AppRequestFilter >> // Returns Optional.empty() for predicates we don't want to / can't >> apply >> } >> >> @Override >> public boolean isFilterPushedDown() { >> return filters != null; >> } >> >> @Override >> public DataSet<Customer> getDataSet(ExecutionEnvironment execEnv) { >> return execEnv.fromCollection(resourceIterator, modelClass); >> } >> >> @Override >> public TypeInformation<Customer> getReturnType() { >> return TypeInformation.of(modelClass); >> } >> >> @Override >> public TableSchema getTableSchema() { >> return TableSchema.fromTypeInfo(getReturnType()); >> } >> } >> >> >> Thanks, >> >> Josh >> >> On Thu, May 2, 2019 at 3:42 AM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Josh, >>> >>> Does your TableSource also implement ProjectableTableSource? >>> If yes, you need to make sure that the filter information is also >>> forwarded if ProjectableTableSource.projectFields() is called after >>> FilterableTableSource.applyPredicate(). >>> Also make sure to correctly implement >>> FilterableTableSource.isFilterPushedDown(). >>> >>> Hope this helps, >>> Fabian >>> >>> Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt < >>> josh.br...@klaviyo.com>: >>> >>>> Hi all, >>>> >>>> I'm trying to implement filter push-down on a custom BatchTableSource >>>> that retrieves data from a REST API and returns it as POJO instances. I've >>>> implemented FilterableTableSource as described in the docs, returning a new >>>> instance of my table source containing the predicates that I've removed >>>> from the list of predicates passed into applyPredicate. However, when >>>> getDataSet is eventually called, it's called on the instance of the table >>>> source that was originally registered with the table environment, which >>>> does not have any filters in it. I've stepped through the code in a >>>> debugger, and applyPredicates is definitely being called, and it's >>>> definitely returning new instances of my table source, but they don't seem >>>> to be being used. >>>> >>>> I also played with the OrcTableSource, which is the only example of a >>>> push-down filter implementation I could find, and it doesn't behave this >>>> way. When I set a breakpoint in getDataSet in that case, it's being called >>>> on one of the new instances of the table source that contains the accepted >>>> filters. >>>> >>>> Are there any other requirements for implementing push-down filters >>>> that aren't listed in the docs? Or does anyone have any tips for this? >>>> >>>> Thanks, >>>> >>>> Josh >>>> >>>> -- >>>> Josh Bradt >>>> Software Engineer >>>> 225 Franklin St, Boston, MA 02110 >>>> klaviyo.com <https://www.klaviyo.com> >>>> [image: Klaviyo Logo] >>>> >>> >> >> -- >> Josh Bradt >> Software Engineer >> 225 Franklin St, Boston, MA 02110 >> klaviyo.com <https://www.klaviyo.com> >> [image: Klaviyo Logo] >> > -- Josh Bradt Software Engineer 225 Franklin St, Boston, MA 02110 klaviyo.com <https://www.klaviyo.com> [image: Klaviyo Logo]