Hi Fabian,

Thanks for taking a look. I've filed this ticket:
https://issues.apache.org/jira/browse/FLINK-12399

Thanks,

Josh

On Fri, May 3, 2019 at 3:41 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Josh,
>
> The code looks good to me.
> This seems to be a bug then.
> It's strange that it works for ORC.
>
> Would you mind opening a Jira ticket and maybe a simple reproducable code
> example?
>
> Thank you,
> Fabian
>
> Am Do., 2. Mai 2019 um 18:23 Uhr schrieb Josh Bradt <
> josh.br...@klaviyo.com>:
>
>> Hi Fabian,
>>
>> Thanks for your reply. My custom table source does not implement
>> ProjectableTableSource. I believe that isFilterPushedDown is implemented
>> correctly since it's nearly identical to what's written in the
>> OrcTableSource. I pasted a slightly simplified version of the
>> implementation below. If you wouldn't mind reading over it, is there
>> anything obviously wrong?
>>
>> public final class CustomerTableSource implements BatchTableSource<Customer>,
>>         FilterableTableSource<Customer> {
>>
>>     // Iterator that gets data from a REST API as POJO instances
>>     private final AppResourceIterator<Customer> resourceIterator;
>>     private final String tableName;
>>     private final Class<Customer> modelClass;
>>     private final AppRequestFilter[] filters;
>>
>>     public CustomerTableSource(
>>             AppResourceIterator<Customer> resourceIterator,
>>             String tableName,
>>             Class<Customer> modelClass) {
>>
>>         this(resourceIterator, tableName, modelClass, null);
>>     }
>>
>>     protected CustomerTableSource(
>>             AppResourceIterator<Customer> resourceIterator,
>>             String tableName,
>>             Class<Customer> modelClass,
>>             AppRequestFilter[] filters) {
>>
>>         this.resourceIterator = resourceIterator;
>>         this.tableName = tableName;
>>         this.modelClass = modelClass;
>>         this.filters = filters;
>>     }
>>
>>     @Override
>>     public TableSource<Customer> applyPredicate(List<Expression> predicates) 
>> {
>>         List<Expression> acceptedPredicates = new ArrayList<>();
>>         List<AppRequestFilter> acceptedFilters = new ArrayList<>();
>>
>>         for (final Expression predicate : predicates) {
>>             buildFilterForPredicate(predicate).ifPresent(filter -> {
>>                 acceptedFilters.add(filter);
>>                 acceptedPredicates.add(predicate);
>>             });
>>         }
>>
>>         predicates.removeAll(acceptedPredicates);
>>
>>         return new CustomerTableSource(
>>                 resourceIterator.withFilters(acceptedFilters),
>>                 tableName,
>>                 modelClass,
>>                 acceptedFilters.toArray(new AppRequestFilter[0])
>>         );
>>     }
>>
>>     public Optional<AppRequestFilter> buildFilterForPredicate(Expression 
>> predicate) {
>>         // Code for translating an Expression into an AppRequestFilter
>>         // Returns Optional.empty() for predicates we don't want to / can't 
>> apply
>>     }
>>
>>     @Override
>>     public boolean isFilterPushedDown() {
>>         return filters != null;
>>     }
>>
>>     @Override
>>     public DataSet<Customer> getDataSet(ExecutionEnvironment execEnv) {
>>         return execEnv.fromCollection(resourceIterator, modelClass);
>>     }
>>
>>     @Override
>>     public TypeInformation<Customer> getReturnType() {
>>         return TypeInformation.of(modelClass);
>>     }
>>
>>     @Override
>>     public TableSchema getTableSchema() {
>>         return TableSchema.fromTypeInfo(getReturnType());
>>     }
>> }
>>
>>
>> Thanks,
>>
>> Josh
>>
>> On Thu, May 2, 2019 at 3:42 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Josh,
>>>
>>> Does your TableSource also implement ProjectableTableSource?
>>> If yes, you need to make sure that the filter information is also
>>> forwarded if ProjectableTableSource.projectFields() is called after
>>> FilterableTableSource.applyPredicate().
>>> Also make sure to correctly implement
>>> FilterableTableSource.isFilterPushedDown().
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <
>>> josh.br...@klaviyo.com>:
>>>
>>>> Hi all,
>>>>
>>>> I'm trying to implement filter push-down on a custom BatchTableSource
>>>> that retrieves data from a REST API and returns it as POJO instances. I've
>>>> implemented FilterableTableSource as described in the docs, returning a new
>>>> instance of my table source containing the predicates that I've removed
>>>> from the list of predicates passed into applyPredicate. However, when
>>>> getDataSet is eventually called, it's called on the instance of the table
>>>> source that was originally registered with the table environment, which
>>>> does not have any filters in it. I've stepped through the code in a
>>>> debugger, and applyPredicates is definitely being called, and it's
>>>> definitely returning new instances of my table source, but they don't seem
>>>> to be being used.
>>>>
>>>> I also played with the OrcTableSource, which is the only example of a
>>>> push-down filter implementation I could find, and it doesn't behave this
>>>> way. When I set a breakpoint in getDataSet in that case, it's being called
>>>> on one of the new instances of the table source that contains the accepted
>>>> filters.
>>>>
>>>> Are there any other requirements for implementing push-down filters
>>>> that aren't listed in the docs? Or does anyone have any tips for this?
>>>>
>>>> Thanks,
>>>>
>>>> Josh
>>>>
>>>> --
>>>> Josh Bradt
>>>> Software Engineer
>>>> 225 Franklin St, Boston, MA 02110
>>>> klaviyo.com <https://www.klaviyo.com>
>>>> [image: Klaviyo Logo]
>>>>
>>>
>>
>> --
>> Josh Bradt
>> Software Engineer
>> 225 Franklin St, Boston, MA 02110
>> klaviyo.com <https://www.klaviyo.com>
>> [image: Klaviyo Logo]
>>
>

-- 
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com <https://www.klaviyo.com>
[image: Klaviyo Logo]

Reply via email to