[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-12-18 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798080#comment-17798080
 ] 

Dong Lin commented on FLINK-25509:
--

[~martijnvisser] Cool. I totally agree with your suggestion.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-12-18 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798066#comment-17798066
 ] 

Martijn Visser commented on FLINK-25509:


[~lindong] Yes, that's what I suggest. The one thing I would like to check with 
[~tzulitai] is if we want to cut {{main}} now first and create a Flink Kafka 
v3.1 branch, which would (at this point) have support for both Flink 1.17 and 
Flink 1.18. Then with the PR for this ticket opened, we would drop support for 
Flink 1.17 and therefore release it as part of a new Flink Kafka v4.0 
branch/release which only would support Flink 1.18 and newer. For users of 
Flink 1.17, they would stick with the v3.1 version (until support for Flink 
1.17 is stopped completely) 

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-12-18 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798047#comment-17798047
 ] 

Dong Lin commented on FLINK-25509:
--

[~martijnvisser] Just to make sure that I understand you correctly, do you mean 
it is OK to put this feature in flink-kafka-connector's main branch? This would 
keep its main branch compatible with Flink 1.18. But it won't be compatible 
with Flink 1.17.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-12-18 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798044#comment-17798044
 ] 

Martijn Visser commented on FLINK-25509:


[~ruanhang1993] [~lindong] Both the {{v3.0}} and the {{main}} branch of the 
Flink Kafka connector work for Flink 1.18 (see 
https://github.com/apache/flink-connector-kafka/actions/runs/7235099577 which 
shows the nightly run). I'm inclined to make it only compatible with {{main}} 
so that a new Flink Kafka connector v3.1 would be released. There are some 
other changes already in {{main}} as well (different Kafka client)

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-12-17 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798026#comment-17798026
 ] 

Dong Lin commented on FLINK-25509:
--

[~ruanhang1993] I think we need to update flink-kafka-connector's master branch 
to depend on Flink 1.18.0 (it currectnly depends on Flink 1.17.0) before 
putting in your feature. And then a new branch/version of flink-kafka-connector 
will be created in its upcoming release.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-12-17 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798000#comment-17798000
 ] 

Hang Ruan commented on FLINK-25509:
---

Hi, [~martijnvisser] & [~lindong] .

The Flink 1.18 has been released and I think we could push this feature now. 

Which kafka connector version should we put it into? This feature relies on the 
interface in 1.18 and is not compatible in 1.17 or earlier.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-07-25 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746860#comment-17746860
 ] 

Dong Lin commented on FLINK-25509:
--

[~martijnvisser] This Jira is open because we are still waiting for Flink 1.18 
release before we can update apache/flink-connector-kafka with the kafka-side 
changes.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-07-25 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746850#comment-17746850
 ] 

Martijn Visser commented on FLINK-25509:


[~ruanhang1993] [~lindong] Is there a reason why this ticket is still open, 
given that the PR is merged? Did we miss it?

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-13 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17712144#comment-17712144
 ] 

Dong Lin commented on FLINK-25509:
--

[~syhily] In the PR related to this JIRA, we added an internal abstract method 
named `removeSplits` to `SplitFetcherManager`. This is is consistent with the 
existing abstract method `SplitFetcherManager#addSplits`.  Both `addSplits` and 
`removeSplits` are abstract because (I believe) we might have other subclasses 
such as MultiThreadFetcherManager in the future and there is no good default 
implementation for `addSplits` and `removeSplits`.

The use of abstract method such as `SplitFetcherManager#addSplits` exists 
before this JIRA. Maybe there is indeed a good way to add default 
implementation to it. Please feel free to propose a change.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-13 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711914#comment-17711914
 ] 

Martijn Visser commented on FLINK-25509:


[~syhily] I defer to [~lindong] for his input on that question :)

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-13 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711902#comment-17711902
 ] 

Yufan Sheng commented on FLINK-25509:
-

[~martijnvisser] I think this breaking change should be resolved in Flink side. 
Why we can't just make this method as a default method?

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-13 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711777#comment-17711777
 ] 

Martijn Visser commented on FLINK-25509:


[~tison] It depends if adding support for this in Pulsar would mean a breaking 
change for the users of the Pulsar connector. If you need to add support for 
this feature, would that mean it would break using the connector in Flink 1.17? 
If so, then it should end up in a Pulsar 5.0 release. 

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-05 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708825#comment-17708825
 ] 

Zili Chen commented on FLINK-25509:
---

For the Pulsar side, I have a conversation with [~syhily] and he's willing to 
follow up this ticket. Shall we have a ticket for the subtask?

Also, I wonder what versions will be affected. Since the external connectors 
compiled with release versions, shall we do the subtask after a specific Flink 
release?

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-05 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708809#comment-17708809
 ] 

Dong Lin commented on FLINK-25509:
--

[~martijnvisser] Sure, I am happy to participate in the discussion.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-05 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708793#comment-17708793
 ] 

Martijn Visser commented on FLINK-25509:


[~lindong] Agree on your points. I'll flag it to [~Tison] for Pulsar and 
towards [~tzulitai] for Kafka.
I do wonder, given that both Pulsar and Kafka use this, should we have a 
discussion if this should be made Public instead of Internal?

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-05 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708769#comment-17708769
 ] 

Dong Lin commented on FLINK-25509:
--

[~martijnvisser] I agree with the discussion mentioned above that we should aim 
to let connectors use public API. It will be great for someone to update flink 
core so that flink-connector-pulsar use only public API from Flink.

I guess my points are:
- It is not a requirement that externalized connectors should only use public 
API (Please let me know if there is consensus on this requirement). We will 
have to stick with the existing practice (where connectors use internal API) 
until someone volunteers to improve this with new FLIPs.

- The issue here is a bit different from FLINK-31324 in the sense that there is 
no good way for flink-connector-pulsar to compile with `SplitFetcherManager` 
from both Flink 1.17 and Flink 1.18-snapshot. I would be happy to create a PR 
if we can have a good fix for this issue.

- I am not sure it is a good practice to require one code to compile with two 
versions. This is typically done with multiple branches if needed.

In this particular case, I am not sure if it is problematic to add an internal 
abstract method to `SplitFetcherManager`. I would be happy to create a PR for 
flink-connector-pulsar if the maintainer of this connector can create dedicated 
branch for each target flink version. 

It is probably up to the maintainer to decide how to address this properly.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-05 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708755#comment-17708755
 ] 

Martijn Visser commented on FLINK-25509:


[~lindong] Yes, it was one of the reasons for externalization. There's 
https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development
 for all the info, including discussion threads, like 
https://lists.apache.org/thread/bywh947r2f5hfocxq598zhyh06zhksrm. That thread 
was also a driver for FLIP-196 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-196%3A+Source+API+stability+guarantees
 and FLIP-197 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process.
 

I believe this situation is similar to 
https://issues.apache.org/jira/browse/FLINK-31324 which was fixed for 1.17. 

In the end, it's up to the maintainer of a connector with how the minimum of 
two versions are supported as outlined on 
https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development.
 There are multiple ways documented there.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-05 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708741#comment-17708741
 ] 

Dong Lin commented on FLINK-25509:
--

[~martijnvisser] Hmm... I am not sure there is consensus in the community that 
externalized connectors should only use public APIs. Was this documented or 
discussed on the mailing list?

I agree it is preferred to have externalized connectors using only public APIs. 
But it is also important that we only expose a minimum set of well-designed 
APIs rather than simply making the existing APIs public. This is non-trivial 
effort and someone in the community needs to voluntary to do this. Before this 
is done, I guess we have to stick with the fact that externalized repos will be 
using internal APIs in the near future. 

I guess we probably need to update the given externalized repo to only use 
public APIs before having nightly build to verify that it can always compile 
with the master head.

In this particular case, we have added an abstract method `removeSplits()` to 
SplitFetcherManager. This is a backward incompatible change for externalized 
repos like flink-connector-pulsar. This is because PulsarSourceFetcherManager 
should override removeSplits() to compile with flink 1.18-snapshot. Then it 
will fail to compile with flink-1.17 as there is no such method to override in 
flink 1.17.

The simplest way to address this issue is to update Flink master branch to make 
`removeSplits()` non-abstract. But IMO this approach makes the code hard to 
read and is very unconventional.

I would prefer to build externalized repo in the same way as what most users 
would do. That is, the master branch should only be compiled against one 
version of Flink. And it can have a separate branch for each extra Flink 
version. What do you think?

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-04 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708707#comment-17708707
 ] 

Martijn Visser commented on FLINK-25509:


[~lindong] If it's an internal API that's being used by a connector and if that 
API should indeed be used by the connector, then the internal API should be 
made public. Especially with the externalization, it's ever more important that 
we pay more attention to our API strategy (and I hope that in 1.18 we will also 
improve on our automated testing for resolving these issues, especially for 
connectors that are on the target interfaces). 

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-04 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708631#comment-17708631
 ] 

Dong Lin commented on FLINK-25509:
--

[~martijnvisser] This ticket is open because we have not updated 
flink-connector-kafka as described in FLIP-208. Huang has only merged the 
change needed in the Flink core repo.

It seems that the flink-connector-pulsar Github Action is configured to build 
with both Flink 1.17 and Flink 1.18. I am not sure if it is going to work 
reliably. Let's say we make a backward-incompatible change to an internal API 
(e.g. changed a method name) that is used by our own connector repo, how is the 
flink-connector-pulsar code going to deal with such changes?

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-04-04 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708518#comment-17708518
 ] 

Martijn Visser commented on FLINK-25509:


[~ruanhang1993] [~lindong] This ticket is currently open, but it appears the 
change has been merged. It also appears that this change has broken Pulsar for 
1.18-SNAPSHOT, see 
https://github.com/apache/flink-connector-pulsar/actions/runs/4608676287/jobs/8144772934#step:13:151

Could you take a look?

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-03-21 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703075#comment-17703075
 ] 

Dong Lin commented on FLINK-25509:
--

The change to the connector-base has been merged into apache/flink master 
branch 1d33773e6b7f9f76f03ff8ffd73171b95fa24ccb.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2022-12-20 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17650103#comment-17650103
 ] 

Hang Ruan commented on FLINK-25509:
---

Hi, [~lindong] ,

I have read the FLIP and I am willing to help to implement this feature. Would 
you mind to assign this ticket to me? 

Thanks~

 

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)