Thank you Shenkai, That does explain what I'm seeing. Jark / Shenkai - Is there any workaround to get Flink to work with push watermarks and predicate pushdown until this is resolved?
On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang <fskm...@gmail.com> wrote: > Hi, Yuval, Jark, Timo. > > Currently the watermark push down happens in the logical rewrite phase but > the filter push down happens in the local phase, which means the planner > will first check the Filter push down and then check the watermark push > down. > > I think we need a rule to transpose between the filter and watermark > assigner or extend the filter push down rule to capture the structure that > the watermark assigner is the parent of the table scan. > > Best, > Shengkai > > Yuval Itzchakov <yuva...@gmail.com> 于2021年3月8日周一 上午12:13写道: > >> Hi Jark, >> >> Even after implementing both, I don't see the watermark being pushed to >> the tablesource in the logical plan and avoids predicate pushdown from >> running. >> >> On Sun, Mar 7, 2021, 15:43 Jark Wu <imj...@gmail.com> wrote: >> >>> Hi Yuval, >>> >>> That's correct you will always get a LogicalWatermarkAssigner if you >>> assigned a watermark. >>> If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner >>> will be pushed >>> into TableSource, and then you can push Filter into source if source >>> implement SupportsFilterPushdown. >>> >>> Best, >>> Jark >>> >>> On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <yuva...@gmail.com> wrote: >>> >>>> Hi Timo, >>>> After investigating this further, this is actually non related to >>>> implementing SupportsWatermarkPushdown. >>>> >>>> Once I create a TableSchema for my custom source's RowData, and assign >>>> it a watermark (see my example in the original mail), the plan will always >>>> include a LogicalWatermarkAssigner. This assigner that is between the >>>> LogicalTableScan and the LogicalFilter will then go on and fail the >>>> HepPlanner from invoking the optimization since it requires >>>> LogicalTableScan to be a direct child of LogicalFilter. Since I have >>>> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't >>>> work. >>>> >>>> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <twal...@apache.org> wrote: >>>> >>>>> Hi Yuval, >>>>> >>>>> sorry that nobody replied earlier. Somehow your email fell through the >>>>> cracks. >>>>> >>>>> If I understand you correctly, could would like to implement a table >>>>> source that implements both `SupportsWatermarkPushDown` and >>>>> `SupportsFilterPushDown`? >>>>> >>>>> The current behavior might be on purpose. Filters and Watermarks are >>>>> not >>>>> very compatible. Filtering would also mean that records (from which >>>>> watermarks could be generated) are skipped. If the filter is very >>>>> strict, we would not generate any new watermarks and the pipeline >>>>> would >>>>> stop making progress in time. >>>>> >>>>> Watermark push down is only necessary, if per-partition watermarks are >>>>> required. Otherwise the watermarks are generated in a subsequent >>>>> operator after the source. So you can still use rowtime without >>>>> implementing `SupportsWatermarkPushDown` in your custom source. >>>>> >>>>> I will lookp in Shengkai who worked on this topic recently. >>>>> >>>>> Regards, >>>>> Timo >>>>> >>>>> >>>>> On 04.03.21 18:52, Yuval Itzchakov wrote: >>>>> > Bumping this up again, would appreciate any help if anyone is >>>>> familiar >>>>> > with the blink planner. >>>>> > >>>>> > Thanks, >>>>> > Yuval. >>>>> > >>>>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com >>>>> > <mailto:yuva...@gmail.com>> wrote: >>>>> > >>>>> > Hi Jark, >>>>> > Would appreciate your help with this. >>>>> > >>>>> > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan < >>>>> ro...@apache.org >>>>> > <mailto:ro...@apache.org>> wrote: >>>>> > >>>>> > Hi Yuval, >>>>> > >>>>> > I'm not familiar with the Blink planner but probably Jark >>>>> can help. >>>>> > >>>>> > Regards, >>>>> > Roman >>>>> > >>>>> > >>>>> > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov >>>>> > <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote: >>>>> > >>>>> > Update: When I don't set the watermark explicitly on the >>>>> > TableSchema, `applyWatermarkStrategy` never gets called >>>>> on >>>>> > my ScanTableSource, which does make sense. But now the >>>>> > question is what should be done? This feels a bit >>>>> unintuitive. >>>>> > >>>>> > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov >>>>> > <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote: >>>>> > >>>>> > Hi, >>>>> > Flink 1.12.1, Blink Planner, Scala 2.12 >>>>> > >>>>> > I have the following logical plan: >>>>> > >>>>> > >>>>> LogicalSink(table=[default_catalog.default_database.table], fields=[bar, >>>>> baz, hello_world, a, b]) >>>>> > +- LogicalProject(value=[$2], >>>>> > bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], >>>>> > baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], >>>>> > hello_world=[null:VARCHAR(2147483647) CHARACTER SET >>>>> > "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER >>>>> SET >>>>> > "UTF-16LE"], b=[EMPTY_MAP()]) >>>>> > +- LogicalFilter(condition=[AND(=($4, >>>>> > _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))]) >>>>> > +- LogicalWatermarkAssigner(rowtime=[bar], >>>>> > watermark=[$0]) >>>>> > +- >>>>> LogicalTableScan(table=[[default_catalog, >>>>> > default_database, foo]]) >>>>> > >>>>> > I have a custom source which creates a TableSchema >>>>> based >>>>> > on an external table. When I create the schema, I >>>>> push >>>>> > the watermark definition to the schema: >>>>> > >>>>> > image.png >>>>> > >>>>> > When the HepPlanner starts the optimization phase and >>>>> > reaches the "PushFilterInotTableSourceScanRule", it >>>>> > matches on the LogicalFilter in the definition. But >>>>> > then, since the RelOptRuleOperandChildPolicy is set >>>>> to >>>>> > "SOME", it attempts to do a full match on the child >>>>> > nodes. Since the rule is defined as so: >>>>> > >>>>> > image.png >>>>> > >>>>> > The child filter fails since the immediate child of >>>>> the >>>>> > filter is a "LocalWatermarkAssigner", and not the >>>>> > "LogicalTableScan" which is the grandchild: >>>>> > >>>>> > image.png >>>>> > >>>>> > Is this the desired behavior? Should I create the >>>>> > TableSchema without the row time attribute and use >>>>> > "SupportsWatermarkPushdown" to generate the watermark >>>>> > dynamically from the source record? >>>>> > >>>>> > -- >>>>> > Best Regards, >>>>> > Yuval Itzchakov. >>>>> > >>>>> > >>>>> > >>>>> > -- >>>>> > Best Regards, >>>>> > Yuval Itzchakov. >>>>> > >>>>> > >>>>> > >>>>> > -- >>>>> > Best Regards, >>>>> > Yuval Itzchakov. >>>>> > >>>>> >>>>> >>>> >>>> -- >>>> Best Regards, >>>> Yuval Itzchakov. >>>> >>> -- Best Regards, Yuval Itzchakov.