Thank you Shenkai,
That does explain what I'm seeing.

Jark / Shenkai - Is there any workaround to get Flink to work with push
watermarks and predicate pushdown until this is resolved?

On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang <fskm...@gmail.com> wrote:

> Hi, Yuval, Jark, Timo.
>
> Currently the watermark push down happens in the logical rewrite phase but
> the filter push down happens in the local phase, which means the planner
> will first check the Filter push down and then check the watermark push
> down.
>
> I think we need a rule to transpose between the filter and watermark
> assigner or extend the filter push down rule to capture the structure that
> the watermark assigner is the parent of the table scan.
>
> Best,
> Shengkai
>
> Yuval Itzchakov <yuva...@gmail.com> 于2021年3月8日周一 上午12:13写道:
>
>> Hi Jark,
>>
>> Even after implementing both, I don't see the watermark being pushed to
>> the tablesource in the logical plan and avoids predicate pushdown from
>> running.
>>
>> On Sun, Mar 7, 2021, 15:43 Jark Wu <imj...@gmail.com> wrote:
>>
>>> Hi Yuval,
>>>
>>> That's correct you will always get a LogicalWatermarkAssigner if you
>>> assigned a watermark.
>>> If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner
>>> will be pushed
>>> into TableSource, and then you can push Filter into source if source
>>> implement SupportsFilterPushdown.
>>>
>>> Best,
>>> Jark
>>>
>>> On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <yuva...@gmail.com> wrote:
>>>
>>>> Hi Timo,
>>>> After investigating this further, this is actually non related to
>>>> implementing SupportsWatermarkPushdown.
>>>>
>>>> Once I create a TableSchema for my custom source's RowData, and assign
>>>> it a watermark (see my example in the original mail), the plan will always
>>>> include a LogicalWatermarkAssigner. This assigner that is between the
>>>> LogicalTableScan and the LogicalFilter will then go on and fail the
>>>> HepPlanner from invoking the optimization since it requires
>>>> LogicalTableScan to be a direct child of LogicalFilter. Since I have
>>>> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
>>>> work.
>>>>
>>>> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <twal...@apache.org> wrote:
>>>>
>>>>> Hi Yuval,
>>>>>
>>>>> sorry that nobody replied earlier. Somehow your email fell through the
>>>>> cracks.
>>>>>
>>>>> If I understand you correctly, could would like to implement a table
>>>>> source that implements both `SupportsWatermarkPushDown` and
>>>>> `SupportsFilterPushDown`?
>>>>>
>>>>> The current behavior might be on purpose. Filters and Watermarks are
>>>>> not
>>>>> very compatible. Filtering would also mean that records (from which
>>>>> watermarks could be generated) are skipped. If the filter is very
>>>>> strict, we would not generate any new watermarks and the pipeline
>>>>> would
>>>>> stop making progress in time.
>>>>>
>>>>> Watermark push down is only necessary, if per-partition watermarks are
>>>>> required. Otherwise the watermarks are generated in a subsequent
>>>>> operator after the source. So you can still use rowtime without
>>>>> implementing `SupportsWatermarkPushDown` in your custom source.
>>>>>
>>>>> I will lookp in Shengkai who worked on this topic recently.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>>
>>>>> On 04.03.21 18:52, Yuval Itzchakov wrote:
>>>>> > Bumping this up again, would appreciate any help if anyone is
>>>>> familiar
>>>>> > with the blink planner.
>>>>> >
>>>>> > Thanks,
>>>>> > Yuval.
>>>>> >
>>>>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com
>>>>> > <mailto:yuva...@gmail.com>> wrote:
>>>>> >
>>>>> >     Hi Jark,
>>>>> >     Would appreciate your help with this.
>>>>> >
>>>>> >     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <
>>>>> ro...@apache.org
>>>>> >     <mailto:ro...@apache.org>> wrote:
>>>>> >
>>>>> >         Hi Yuval,
>>>>> >
>>>>> >         I'm not familiar with the Blink planner but probably Jark
>>>>> can help.
>>>>> >
>>>>> >         Regards,
>>>>> >         Roman
>>>>> >
>>>>> >
>>>>> >         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>>>>> >         <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
>>>>> >
>>>>> >             Update: When I don't set the watermark explicitly on the
>>>>> >             TableSchema, `applyWatermarkStrategy` never gets called
>>>>> on
>>>>> >             my ScanTableSource, which does make sense. But now the
>>>>> >             question is what should be done? This feels a bit
>>>>> unintuitive.
>>>>> >
>>>>> >             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>>>>> >             <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
>>>>> >
>>>>> >                 Hi,
>>>>> >                 Flink 1.12.1, Blink Planner, Scala 2.12
>>>>> >
>>>>> >                 I have the following logical plan:
>>>>> >
>>>>> >
>>>>>  LogicalSink(table=[default_catalog.default_database.table], fields=[bar,
>>>>> baz, hello_world, a, b])
>>>>> >                 +- LogicalProject(value=[$2],
>>>>> >                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>>>>> >                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>>>>> >                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>>>>> >                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER
>>>>> SET
>>>>> >                 "UTF-16LE"], b=[EMPTY_MAP()])
>>>>> >                     +- LogicalFilter(condition=[AND(=($4,
>>>>> >                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>>>>> >                        +- LogicalWatermarkAssigner(rowtime=[bar],
>>>>> >                 watermark=[$0])
>>>>> >                           +-
>>>>> LogicalTableScan(table=[[default_catalog,
>>>>> >                 default_database, foo]])
>>>>> >
>>>>> >                 I have a custom source which creates a TableSchema
>>>>> based
>>>>> >                 on an external table. When I create the schema, I
>>>>> push
>>>>> >                 the watermark definition to the schema:
>>>>> >
>>>>> >                 image.png
>>>>> >
>>>>> >                 When the HepPlanner starts the optimization phase and
>>>>> >                 reaches the "PushFilterInotTableSourceScanRule", it
>>>>> >                 matches on the LogicalFilter in the definition. But
>>>>> >                 then, since the RelOptRuleOperandChildPolicy is set
>>>>> to
>>>>> >                 "SOME", it attempts to do a full match on the child
>>>>> >                 nodes. Since the rule is defined as so:
>>>>> >
>>>>> >                 image.png
>>>>> >
>>>>> >                 The child filter fails since the immediate child of
>>>>> the
>>>>> >                 filter is a "LocalWatermarkAssigner", and not the
>>>>> >                 "LogicalTableScan" which is the grandchild:
>>>>> >
>>>>> >                 image.png
>>>>> >
>>>>> >                 Is this the desired behavior? Should I create the
>>>>> >                 TableSchema without the row time attribute and use
>>>>> >                 "SupportsWatermarkPushdown" to generate the watermark
>>>>> >                 dynamically from the source record?
>>>>> >
>>>>> >                 --
>>>>> >                 Best Regards,
>>>>> >                 Yuval Itzchakov.
>>>>> >
>>>>> >
>>>>> >
>>>>> >             --
>>>>> >             Best Regards,
>>>>> >             Yuval Itzchakov.
>>>>> >
>>>>> >
>>>>> >
>>>>> >     --
>>>>> >     Best Regards,
>>>>> >     Yuval Itzchakov.
>>>>> >
>>>>>
>>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Yuval Itzchakov.
>>>>
>>>

-- 
Best Regards,
Yuval Itzchakov.

Reply via email to