Hi Timo,
After investigating this further, this is actually non related to
implementing SupportsWatermarkPushdown.

Once I create a TableSchema for my custom source's RowData, and assign it a
watermark (see my example in the original mail), the plan will always
include a LogicalWatermarkAssigner. This assigner that is between the
LogicalTableScan and the LogicalFilter will then go on and fail the
HepPlanner from invoking the optimization since it requires
LogicalTableScan to be a direct child of LogicalFilter. Since I have
LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
work.

On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <twal...@apache.org> wrote:

> Hi Yuval,
>
> sorry that nobody replied earlier. Somehow your email fell through the
> cracks.
>
> If I understand you correctly, could would like to implement a table
> source that implements both `SupportsWatermarkPushDown` and
> `SupportsFilterPushDown`?
>
> The current behavior might be on purpose. Filters and Watermarks are not
> very compatible. Filtering would also mean that records (from which
> watermarks could be generated) are skipped. If the filter is very
> strict, we would not generate any new watermarks and the pipeline would
> stop making progress in time.
>
> Watermark push down is only necessary, if per-partition watermarks are
> required. Otherwise the watermarks are generated in a subsequent
> operator after the source. So you can still use rowtime without
> implementing `SupportsWatermarkPushDown` in your custom source.
>
> I will lookp in Shengkai who worked on this topic recently.
>
> Regards,
> Timo
>
>
> On 04.03.21 18:52, Yuval Itzchakov wrote:
> > Bumping this up again, would appreciate any help if anyone is familiar
> > with the blink planner.
> >
> > Thanks,
> > Yuval.
> >
> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com
> > <mailto:yuva...@gmail.com>> wrote:
> >
> >     Hi Jark,
> >     Would appreciate your help with this.
> >
> >     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <ro...@apache.org
> >     <mailto:ro...@apache.org>> wrote:
> >
> >         Hi Yuval,
> >
> >         I'm not familiar with the Blink planner but probably Jark can
> help.
> >
> >         Regards,
> >         Roman
> >
> >
> >         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
> >         <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
> >
> >             Update: When I don't set the watermark explicitly on the
> >             TableSchema, `applyWatermarkStrategy` never gets called on
> >             my ScanTableSource, which does make sense. But now the
> >             question is what should be done? This feels a bit
> unintuitive.
> >
> >             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
> >             <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
> >
> >                 Hi,
> >                 Flink 1.12.1, Blink Planner, Scala 2.12
> >
> >                 I have the following logical plan:
> >
> >
>  LogicalSink(table=[default_catalog.default_database.table], fields=[bar,
> baz, hello_world, a, b])
> >                 +- LogicalProject(value=[$2],
> >                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
> >                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
> >                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
> >                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
> >                 "UTF-16LE"], b=[EMPTY_MAP()])
> >                     +- LogicalFilter(condition=[AND(=($4,
> >                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
> >                        +- LogicalWatermarkAssigner(rowtime=[bar],
> >                 watermark=[$0])
> >                           +- LogicalTableScan(table=[[default_catalog,
> >                 default_database, foo]])
> >
> >                 I have a custom source which creates a TableSchema based
> >                 on an external table. When I create the schema, I push
> >                 the watermark definition to the schema:
> >
> >                 image.png
> >
> >                 When the HepPlanner starts the optimization phase and
> >                 reaches the "PushFilterInotTableSourceScanRule", it
> >                 matches on the LogicalFilter in the definition. But
> >                 then, since the RelOptRuleOperandChildPolicy is set to
> >                 "SOME", it attempts to do a full match on the child
> >                 nodes. Since the rule is defined as so:
> >
> >                 image.png
> >
> >                 The child filter fails since the immediate child of the
> >                 filter is a "LocalWatermarkAssigner", and not the
> >                 "LogicalTableScan" which is the grandchild:
> >
> >                 image.png
> >
> >                 Is this the desired behavior? Should I create the
> >                 TableSchema without the row time attribute and use
> >                 "SupportsWatermarkPushdown" to generate the watermark
> >                 dynamically from the source record?
> >
> >                 --
> >                 Best Regards,
> >                 Yuval Itzchakov.
> >
> >
> >
> >             --
> >             Best Regards,
> >             Yuval Itzchakov.
> >
> >
> >
> >     --
> >     Best Regards,
> >     Yuval Itzchakov.
> >
>
>

-- 
Best Regards,
Yuval Itzchakov.

Reply via email to