Hi Jark, Even after implementing both, I don't see the watermark being pushed to the tablesource in the logical plan and avoids predicate pushdown from running.
On Sun, Mar 7, 2021, 15:43 Jark Wu <imj...@gmail.com> wrote: > Hi Yuval, > > That's correct you will always get a LogicalWatermarkAssigner if you > assigned a watermark. > If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner > will be pushed > into TableSource, and then you can push Filter into source if source > implement SupportsFilterPushdown. > > Best, > Jark > > On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <yuva...@gmail.com> wrote: > >> Hi Timo, >> After investigating this further, this is actually non related to >> implementing SupportsWatermarkPushdown. >> >> Once I create a TableSchema for my custom source's RowData, and assign it >> a watermark (see my example in the original mail), the plan will always >> include a LogicalWatermarkAssigner. This assigner that is between the >> LogicalTableScan and the LogicalFilter will then go on and fail the >> HepPlanner from invoking the optimization since it requires >> LogicalTableScan to be a direct child of LogicalFilter. Since I have >> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't >> work. >> >> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <twal...@apache.org> wrote: >> >>> Hi Yuval, >>> >>> sorry that nobody replied earlier. Somehow your email fell through the >>> cracks. >>> >>> If I understand you correctly, could would like to implement a table >>> source that implements both `SupportsWatermarkPushDown` and >>> `SupportsFilterPushDown`? >>> >>> The current behavior might be on purpose. Filters and Watermarks are not >>> very compatible. Filtering would also mean that records (from which >>> watermarks could be generated) are skipped. If the filter is very >>> strict, we would not generate any new watermarks and the pipeline would >>> stop making progress in time. >>> >>> Watermark push down is only necessary, if per-partition watermarks are >>> required. Otherwise the watermarks are generated in a subsequent >>> operator after the source. So you can still use rowtime without >>> implementing `SupportsWatermarkPushDown` in your custom source. >>> >>> I will lookp in Shengkai who worked on this topic recently. >>> >>> Regards, >>> Timo >>> >>> >>> On 04.03.21 18:52, Yuval Itzchakov wrote: >>> > Bumping this up again, would appreciate any help if anyone is familiar >>> > with the blink planner. >>> > >>> > Thanks, >>> > Yuval. >>> > >>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com >>> > <mailto:yuva...@gmail.com>> wrote: >>> > >>> > Hi Jark, >>> > Would appreciate your help with this. >>> > >>> > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan < >>> ro...@apache.org >>> > <mailto:ro...@apache.org>> wrote: >>> > >>> > Hi Yuval, >>> > >>> > I'm not familiar with the Blink planner but probably Jark can >>> help. >>> > >>> > Regards, >>> > Roman >>> > >>> > >>> > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov >>> > <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote: >>> > >>> > Update: When I don't set the watermark explicitly on the >>> > TableSchema, `applyWatermarkStrategy` never gets called on >>> > my ScanTableSource, which does make sense. But now the >>> > question is what should be done? This feels a bit >>> unintuitive. >>> > >>> > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov >>> > <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote: >>> > >>> > Hi, >>> > Flink 1.12.1, Blink Planner, Scala 2.12 >>> > >>> > I have the following logical plan: >>> > >>> > >>> LogicalSink(table=[default_catalog.default_database.table], fields=[bar, >>> baz, hello_world, a, b]) >>> > +- LogicalProject(value=[$2], >>> > bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], >>> > baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], >>> > hello_world=[null:VARCHAR(2147483647) CHARACTER SET >>> > "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET >>> > "UTF-16LE"], b=[EMPTY_MAP()]) >>> > +- LogicalFilter(condition=[AND(=($4, >>> > _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))]) >>> > +- LogicalWatermarkAssigner(rowtime=[bar], >>> > watermark=[$0]) >>> > +- LogicalTableScan(table=[[default_catalog, >>> > default_database, foo]]) >>> > >>> > I have a custom source which creates a TableSchema >>> based >>> > on an external table. When I create the schema, I push >>> > the watermark definition to the schema: >>> > >>> > image.png >>> > >>> > When the HepPlanner starts the optimization phase and >>> > reaches the "PushFilterInotTableSourceScanRule", it >>> > matches on the LogicalFilter in the definition. But >>> > then, since the RelOptRuleOperandChildPolicy is set to >>> > "SOME", it attempts to do a full match on the child >>> > nodes. Since the rule is defined as so: >>> > >>> > image.png >>> > >>> > The child filter fails since the immediate child of the >>> > filter is a "LocalWatermarkAssigner", and not the >>> > "LogicalTableScan" which is the grandchild: >>> > >>> > image.png >>> > >>> > Is this the desired behavior? Should I create the >>> > TableSchema without the row time attribute and use >>> > "SupportsWatermarkPushdown" to generate the watermark >>> > dynamically from the source record? >>> > >>> > -- >>> > Best Regards, >>> > Yuval Itzchakov. >>> > >>> > >>> > >>> > -- >>> > Best Regards, >>> > Yuval Itzchakov. >>> > >>> > >>> > >>> > -- >>> > Best Regards, >>> > Yuval Itzchakov. >>> > >>> >>> >> >> -- >> Best Regards, >> Yuval Itzchakov. >> >