Hi Jark,

Even after implementing both, I don't see the watermark being pushed to the
tablesource in the logical plan and avoids predicate pushdown from running.

On Sun, Mar 7, 2021, 15:43 Jark Wu <imj...@gmail.com> wrote:

> Hi Yuval,
>
> That's correct you will always get a LogicalWatermarkAssigner if you
> assigned a watermark.
> If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner
> will be pushed
> into TableSource, and then you can push Filter into source if source
> implement SupportsFilterPushdown.
>
> Best,
> Jark
>
> On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <yuva...@gmail.com> wrote:
>
>> Hi Timo,
>> After investigating this further, this is actually non related to
>> implementing SupportsWatermarkPushdown.
>>
>> Once I create a TableSchema for my custom source's RowData, and assign it
>> a watermark (see my example in the original mail), the plan will always
>> include a LogicalWatermarkAssigner. This assigner that is between the
>> LogicalTableScan and the LogicalFilter will then go on and fail the
>> HepPlanner from invoking the optimization since it requires
>> LogicalTableScan to be a direct child of LogicalFilter. Since I have
>> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
>> work.
>>
>> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <twal...@apache.org> wrote:
>>
>>> Hi Yuval,
>>>
>>> sorry that nobody replied earlier. Somehow your email fell through the
>>> cracks.
>>>
>>> If I understand you correctly, could would like to implement a table
>>> source that implements both `SupportsWatermarkPushDown` and
>>> `SupportsFilterPushDown`?
>>>
>>> The current behavior might be on purpose. Filters and Watermarks are not
>>> very compatible. Filtering would also mean that records (from which
>>> watermarks could be generated) are skipped. If the filter is very
>>> strict, we would not generate any new watermarks and the pipeline would
>>> stop making progress in time.
>>>
>>> Watermark push down is only necessary, if per-partition watermarks are
>>> required. Otherwise the watermarks are generated in a subsequent
>>> operator after the source. So you can still use rowtime without
>>> implementing `SupportsWatermarkPushDown` in your custom source.
>>>
>>> I will lookp in Shengkai who worked on this topic recently.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 04.03.21 18:52, Yuval Itzchakov wrote:
>>> > Bumping this up again, would appreciate any help if anyone is familiar
>>> > with the blink planner.
>>> >
>>> > Thanks,
>>> > Yuval.
>>> >
>>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com
>>> > <mailto:yuva...@gmail.com>> wrote:
>>> >
>>> >     Hi Jark,
>>> >     Would appreciate your help with this.
>>> >
>>> >     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <
>>> ro...@apache.org
>>> >     <mailto:ro...@apache.org>> wrote:
>>> >
>>> >         Hi Yuval,
>>> >
>>> >         I'm not familiar with the Blink planner but probably Jark can
>>> help.
>>> >
>>> >         Regards,
>>> >         Roman
>>> >
>>> >
>>> >         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>>> >         <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
>>> >
>>> >             Update: When I don't set the watermark explicitly on the
>>> >             TableSchema, `applyWatermarkStrategy` never gets called on
>>> >             my ScanTableSource, which does make sense. But now the
>>> >             question is what should be done? This feels a bit
>>> unintuitive.
>>> >
>>> >             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>>> >             <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
>>> >
>>> >                 Hi,
>>> >                 Flink 1.12.1, Blink Planner, Scala 2.12
>>> >
>>> >                 I have the following logical plan:
>>> >
>>> >
>>>  LogicalSink(table=[default_catalog.default_database.table], fields=[bar,
>>> baz, hello_world, a, b])
>>> >                 +- LogicalProject(value=[$2],
>>> >                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>>> >                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>>> >                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>>> >                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>>> >                 "UTF-16LE"], b=[EMPTY_MAP()])
>>> >                     +- LogicalFilter(condition=[AND(=($4,
>>> >                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>>> >                        +- LogicalWatermarkAssigner(rowtime=[bar],
>>> >                 watermark=[$0])
>>> >                           +- LogicalTableScan(table=[[default_catalog,
>>> >                 default_database, foo]])
>>> >
>>> >                 I have a custom source which creates a TableSchema
>>> based
>>> >                 on an external table. When I create the schema, I push
>>> >                 the watermark definition to the schema:
>>> >
>>> >                 image.png
>>> >
>>> >                 When the HepPlanner starts the optimization phase and
>>> >                 reaches the "PushFilterInotTableSourceScanRule", it
>>> >                 matches on the LogicalFilter in the definition. But
>>> >                 then, since the RelOptRuleOperandChildPolicy is set to
>>> >                 "SOME", it attempts to do a full match on the child
>>> >                 nodes. Since the rule is defined as so:
>>> >
>>> >                 image.png
>>> >
>>> >                 The child filter fails since the immediate child of the
>>> >                 filter is a "LocalWatermarkAssigner", and not the
>>> >                 "LogicalTableScan" which is the grandchild:
>>> >
>>> >                 image.png
>>> >
>>> >                 Is this the desired behavior? Should I create the
>>> >                 TableSchema without the row time attribute and use
>>> >                 "SupportsWatermarkPushdown" to generate the watermark
>>> >                 dynamically from the source record?
>>> >
>>> >                 --
>>> >                 Best Regards,
>>> >                 Yuval Itzchakov.
>>> >
>>> >
>>> >
>>> >             --
>>> >             Best Regards,
>>> >             Yuval Itzchakov.
>>> >
>>> >
>>> >
>>> >     --
>>> >     Best Regards,
>>> >     Yuval Itzchakov.
>>> >
>>>
>>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

Reply via email to