Re: [DISCUSS] FLIP-356: Support Nested Fields Filter Pushdown

2023-08-27 Thread Venkatakrishnan Sowrirajan
Thanks for all the feedback and discussion everyone. Looks like we have
reached a consensus here.

Just to summarize:

1. Introduce a new *ReferenceExpression* (or *BaseReferenceExpression*)
abstract class which will be extended by both *FieldReferenceExpression*
and *NestedFieldReferenceExpression* (to be introduced as part of this FLIP)
2. No need of *supportsNestedFilters *check as the current
*SupportsFilterPushDown* should already ignore unknown expressions (
*NestedFieldReferenceExpression* for example) and return them as
*remainingFilters.
*Maybe this should be clarified explicitly in the Javadoc of
*SupportsFilterPushDown.
*I will file a separate JIRA to fix the documentation.
3. Refactor *SupportsProjectionPushDown* to use *ReferenceExpression *instead
of existing 2-d arrays to consolidate and be consistent with other
Supports*PushDown APIs - *outside the scope of this FLIP*
4. Similarly *SupportsAggregatePushDown* should also be evolved whenever
nested fields support is added to use the *ReferenceExpression - **outside
the scope of this FLIP*

Does this sound good? Please let me know if I have missed anything here. If
there are no concerns, I will start a vote tomorrow. I will also get the
FLIP-356 wiki updated. Thanks everyone once again!

Regards
Venkata krishnan


On Thu, Aug 24, 2023 at 8:19 PM Becket Qin  wrote:

> Hi Jark,
>
> How about having a separate NestedFieldReferenceExpression, and
> > abstracting a common base class "ReferenceExpression" for
> > NestedFieldReferenceExpression and FieldReferenceExpression? This makes
> > unifying expressions in
> > "SupportsProjectionPushdown#applyProjections(List
> > ...)"
> > possible.
>
>
> I'd be fine with this. It at least provides a consistent API style /
> formality.
>
>  Re: Yunhong,
>
> 3. Finally, I think we need to look at the costs and benefits of unifying
> > the SupportsFilterPushDown and SupportsProjectionPushDown (or others)
> from
> > the perspective of interface implementers. A stable API can reduce user
> > development and change costs, if the current API can fully meet the
> > functional requirements at the framework level, I personal suggest
> reducing
> > the impact on connector developers.
> >
>
> I agree that the cost and benefit should be measured. And the measurement
> should be in the long term instead of short term. That is why we always
> need to align on the ideal end state first.
> Meeting functionality requirements is the bare minimum bar for an API.
> Simplicity, intuitiveness, robustness and evolvability are also important.
> In addition, for projects with many APIs, such as Flink, a consistent API
> style is also critical for the user adoption as well as bug avoidance. It
> is very helpful for the community to agree on some API design conventions /
> principles.
> For example, in this particular case, via our discussion, hopefully we sort
> of established the following API design conventions / principles for all
> the Supports*PushDown interfaces.
>
> 1. By default, expressions should be used if applicable instead of other
> representations.
> 2. In general, the pushdown method should not assume all the pushdowns will
> succeed. So the applyX() method should return a boolean or List, to
> handle the cases that some of the pushdowns cannot be fulfilled by the
> implementation.
>
> Establishing such conventions and principles demands careful thinking for
> the aspects I mentioned earlier in addition to the API functionalities.
> This helps lower the bar of understanding, reduces the chance of having
> loose ends in the API, and will benefit all the participants in the project
> over time. I think this is the right way to achieve real API stability.
> Otherwise, we may end up chasing our tails to find ways not to change the
> existing non-ideal APIs.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Aug 25, 2023 at 9:33 AM yh z  wrote:
>
> > Hi, Venkat,
> >
> > Thanks for the FLIP, it sounds good to support nested fields filter
> > pushdown. Based on the design of flip and the above options, I would like
> > to make a few suggestions:
> >
> > 1.  At present, introducing NestedFieldReferenceExpression looks like a
> > better solution, which can fully meet our requirements while reducing
> > modifications to base class FieldReferenceExpression. In the long run, I
> > tend to abstract a basic class for NestedFieldReferenceExpression and
> > FieldReferenceExpression as u suggested.
> >
> > 2. Personally, I don't recommend introducing *supportsNestedFilters() in
> > supportsFilterPushdown. We just need to better declare the return value
> of
> > the method *applyFilters.
> >
> > 3. Finally, I think we need to look at the costs and benefits of unifying
> > the SupportsFilterPushDown and SupportsProjectionPushDown (or others)
> from
> > the perspective of interface implementers. A stable API can reduce user
> > development and change costs, if the current API can fully meet the
> > functional requirements at the framework 

[jira] [Created] (FLINK-32969) IfCallGen Bug

2023-08-27 Thread Lyn Zhang (Jira)
Lyn Zhang created FLINK-32969:
-

 Summary: IfCallGen Bug
 Key: FLINK-32969
 URL: https://issues.apache.org/jira/browse/FLINK-32969
 Project: Flink
  Issue Type: Bug
Reporter: Lyn Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32968) Update doc for customized catalog listener

2023-08-27 Thread Fang Yong (Jira)
Fang Yong created FLINK-32968:
-

 Summary: Update doc for customized catalog listener
 Key: FLINK-32968
 URL: https://issues.apache.org/jira/browse/FLINK-32968
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.18.0, 1.19.0
Reporter: Fang Yong


Refer to https://issues.apache.org/jira/browse/FLINK-32798 for more details



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-27 Thread Jing Ge
Hi Xuannan,

Thanks for the proposal. +1 for me.

There is one tiny thing that I am not sure if I understand it correctly.
Since there will be many different WatermarkStrategies and different
WatermarkGenerators. Could you please update the FLIP and add the
description of how the watermark lag is calculated exactly? E.g. Watermark
lag = A - B with A is the timestamp of the watermark emitted to the
downstream and B is(this is the part I am not really sure after reading
the FLIP).

Best regards,
Jing


On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su  wrote:

> Hi Jark,
>
> Thanks for the comments.
>
> I agree that the current solution cannot support jobs that cannot define
> watermarks. However, after considering the pending-record-based solution, I
> believe the current solution is superior for the target use case as it is
> more intuitive for users. The backlog status gives users the ability to
> balance between throughput and latency. Making this trade-off decision
> based on the watermark lag is more intuitive from the user's perspective.
> For instance, a user can decide that if the job lags behind the current
> time by more than 1 hour, the result is not usable. In that case, we can
> optimize for throughput when the data lags behind by more than an hour.
> With the pending-record-based solution, it's challenging for users to
> determine when to optimize for throughput and when to prioritize latency.
>
> Regarding the limitations of the watermark-based solution:
>
> 1. The current solution can support jobs with sources that have event
> time. Users can always define a watermark at the source operator, even if
> it's not used by downstream operators, such as streaming join and unbounded
> aggregate.
>
> 2.I don't believe it's accurate to say that the watermark lag will keep
> increasing if no data is generated in Kafka. The watermark lag and backlog
> status are determined at the moment when the watermark is emitted to the
> downstream operator. If no data is emitted from the source, the watermark
> lag and backlog status will not be updated. If the WatermarkStrategy with
> idleness is used, the source becomes non-backlog when it becomes idle.
>
> 3. I think watermark lag is more intuitive to determine if a job is
> processing backlog data. Even when using pending records, it faces a
> similar issue. For example, if the source has 1K pending records, those
> records can span from 1 day  to 1 hour to 1 second. If the records span 1
> day, it's probably best to optimize for throughput. If they span 1 hour, it
> depends on the business logic. If they span 1 second, optimizing for
> latency is likely the better choice.
>
> In summary, I believe the watermark-based solution is a superior choice
> for the target use case where watermark/event time can be defined.
> Additionally, I haven't come across a scenario that requires low-latency
> processing and reads from a source that cannot define watermarks. If we
> encounter such a use case, we can create another FLIP to address those
> needs in the future. What do you think?
>
>
> Best,
> Xuannan
>
>
>
> > On Aug 20, 2023, at 23:27, Jark Wu  imj...@gmail.com>> wrote:
> >
> > Hi Xuannan,
> >
> > Thanks for opening this discussion.
> >
> > This current proposal may work in the mentioned watermark cases.
> > However, it seems this is not a general solution for sources to determine
> > "isProcessingBacklog".
> > From my point of view, there are 3 limitations of the current proposal:
> > 1. It doesn't cover jobs that don't have watermark/event-time defined,
> > for example streaming join and unbounded aggregate. We may still need to
> > figure out solutions for them.
> > 2. Watermark lag can not be trusted, because it increases unlimited if no
> > data is generated in the Kafka.
> > But in this case, there is no backlog at all.
> > 3. Watermark lag is hard to reflect the amount of backlog. If the
> watermark
> > lag is 1day or 1 hour or 1second,
> > there is possibly only 1 pending record there, which means no backlog at
> > all.
> >
> > Therefore, IMO, watermark maybe not the ideal metric used to determine
> > "isProcessingBacklog".
> > What we need is something that reflects the number of records unprocessed
> > by the job.
> > Actually, that is the "pendingRecords" metric proposed in FLIP-33 and has
> > been implemented by Kafka source.
> > Did you consider using "pendingRecords" metric to determine
> > "isProcessingBacklog"?
> >
> > Best,
> > Jark
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >
> >
> >
> >
> > On Tue, 15 Aug 2023 at 12:04, Xintong Song  > wrote:
> >
> >> Sounds good to me.
> >>
> >> It is true that, if we are introducing the generalized watermark, there
> >> will be other watermark related concepts / configurations that need to
> be
> >> updated anyway.

[jira] [Created] (FLINK-32967) [JUnit5 Migration] Module: flink-table-planner

2023-08-27 Thread Jiabao Sun (Jira)
Jiabao Sun created FLINK-32967:
--

 Summary: [JUnit5 Migration] Module: flink-table-planner
 Key: FLINK-32967
 URL: https://issues.apache.org/jira/browse/FLINK-32967
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Jiabao Sun






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32966) [JUnit5 Migration] Module: flink-table-planner

2023-08-27 Thread Jiabao Sun (Jira)
Jiabao Sun created FLINK-32966:
--

 Summary: [JUnit5 Migration] Module: flink-table-planner
 Key: FLINK-32966
 URL: https://issues.apache.org/jira/browse/FLINK-32966
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.17.1
Reporter: Jiabao Sun






--
This message was sent by Atlassian Jira
(v8.20.10#820010)