[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798080#comment-17798080 ] Dong Lin commented on FLINK-25509: -- [~martijnvisser] Cool. I totally agree with your suggestion. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available, stale-assigned > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798066#comment-17798066 ] Martijn Visser commented on FLINK-25509: [~lindong] Yes, that's what I suggest. The one thing I would like to check with [~tzulitai] is if we want to cut {{main}} now first and create a Flink Kafka v3.1 branch, which would (at this point) have support for both Flink 1.17 and Flink 1.18. Then with the PR for this ticket opened, we would drop support for Flink 1.17 and therefore release it as part of a new Flink Kafka v4.0 branch/release which only would support Flink 1.18 and newer. For users of Flink 1.17, they would stick with the v3.1 version (until support for Flink 1.17 is stopped completely) > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available, stale-assigned > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798047#comment-17798047 ] Dong Lin commented on FLINK-25509: -- [~martijnvisser] Just to make sure that I understand you correctly, do you mean it is OK to put this feature in flink-kafka-connector's main branch? This would keep its main branch compatible with Flink 1.18. But it won't be compatible with Flink 1.17. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available, stale-assigned > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798044#comment-17798044 ] Martijn Visser commented on FLINK-25509: [~ruanhang1993] [~lindong] Both the {{v3.0}} and the {{main}} branch of the Flink Kafka connector work for Flink 1.18 (see https://github.com/apache/flink-connector-kafka/actions/runs/7235099577 which shows the nightly run). I'm inclined to make it only compatible with {{main}} so that a new Flink Kafka connector v3.1 would be released. There are some other changes already in {{main}} as well (different Kafka client) > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available, stale-assigned > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798026#comment-17798026 ] Dong Lin commented on FLINK-25509: -- [~ruanhang1993] I think we need to update flink-kafka-connector's master branch to depend on Flink 1.18.0 (it currectnly depends on Flink 1.17.0) before putting in your feature. And then a new branch/version of flink-kafka-connector will be created in its upcoming release. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available, stale-assigned > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798000#comment-17798000 ] Hang Ruan commented on FLINK-25509: --- Hi, [~martijnvisser] & [~lindong] . The Flink 1.18 has been released and I think we could push this feature now. Which kafka connector version should we put it into? This feature relies on the interface in 1.18 and is not compatible in 1.17 or earlier. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available, stale-assigned > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746860#comment-17746860 ] Dong Lin commented on FLINK-25509: -- [~martijnvisser] This Jira is open because we are still waiting for Flink 1.18 release before we can update apache/flink-connector-kafka with the kafka-side changes. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746850#comment-17746850 ] Martijn Visser commented on FLINK-25509: [~ruanhang1993] [~lindong] Is there a reason why this ticket is still open, given that the PR is merged? Did we miss it? > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17712144#comment-17712144 ] Dong Lin commented on FLINK-25509: -- [~syhily] In the PR related to this JIRA, we added an internal abstract method named `removeSplits` to `SplitFetcherManager`. This is is consistent with the existing abstract method `SplitFetcherManager#addSplits`. Both `addSplits` and `removeSplits` are abstract because (I believe) we might have other subclasses such as MultiThreadFetcherManager in the future and there is no good default implementation for `addSplits` and `removeSplits`. The use of abstract method such as `SplitFetcherManager#addSplits` exists before this JIRA. Maybe there is indeed a good way to add default implementation to it. Please feel free to propose a change. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711914#comment-17711914 ] Martijn Visser commented on FLINK-25509: [~syhily] I defer to [~lindong] for his input on that question :) > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711902#comment-17711902 ] Yufan Sheng commented on FLINK-25509: - [~martijnvisser] I think this breaking change should be resolved in Flink side. Why we can't just make this method as a default method? > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711777#comment-17711777 ] Martijn Visser commented on FLINK-25509: [~tison] It depends if adding support for this in Pulsar would mean a breaking change for the users of the Pulsar connector. If you need to add support for this feature, would that mean it would break using the connector in Flink 1.17? If so, then it should end up in a Pulsar 5.0 release. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708825#comment-17708825 ] Zili Chen commented on FLINK-25509: --- For the Pulsar side, I have a conversation with [~syhily] and he's willing to follow up this ticket. Shall we have a ticket for the subtask? Also, I wonder what versions will be affected. Since the external connectors compiled with release versions, shall we do the subtask after a specific Flink release? > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708809#comment-17708809 ] Dong Lin commented on FLINK-25509: -- [~martijnvisser] Sure, I am happy to participate in the discussion. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708793#comment-17708793 ] Martijn Visser commented on FLINK-25509: [~lindong] Agree on your points. I'll flag it to [~Tison] for Pulsar and towards [~tzulitai] for Kafka. I do wonder, given that both Pulsar and Kafka use this, should we have a discussion if this should be made Public instead of Internal? > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708769#comment-17708769 ] Dong Lin commented on FLINK-25509: -- [~martijnvisser] I agree with the discussion mentioned above that we should aim to let connectors use public API. It will be great for someone to update flink core so that flink-connector-pulsar use only public API from Flink. I guess my points are: - It is not a requirement that externalized connectors should only use public API (Please let me know if there is consensus on this requirement). We will have to stick with the existing practice (where connectors use internal API) until someone volunteers to improve this with new FLIPs. - The issue here is a bit different from FLINK-31324 in the sense that there is no good way for flink-connector-pulsar to compile with `SplitFetcherManager` from both Flink 1.17 and Flink 1.18-snapshot. I would be happy to create a PR if we can have a good fix for this issue. - I am not sure it is a good practice to require one code to compile with two versions. This is typically done with multiple branches if needed. In this particular case, I am not sure if it is problematic to add an internal abstract method to `SplitFetcherManager`. I would be happy to create a PR for flink-connector-pulsar if the maintainer of this connector can create dedicated branch for each target flink version. It is probably up to the maintainer to decide how to address this properly. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708755#comment-17708755 ] Martijn Visser commented on FLINK-25509: [~lindong] Yes, it was one of the reasons for externalization. There's https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development for all the info, including discussion threads, like https://lists.apache.org/thread/bywh947r2f5hfocxq598zhyh06zhksrm. That thread was also a driver for FLIP-196 https://cwiki.apache.org/confluence/display/FLINK/FLIP-196%3A+Source+API+stability+guarantees and FLIP-197 https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process. I believe this situation is similar to https://issues.apache.org/jira/browse/FLINK-31324 which was fixed for 1.17. In the end, it's up to the maintainer of a connector with how the minimum of two versions are supported as outlined on https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development. There are multiple ways documented there. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708741#comment-17708741 ] Dong Lin commented on FLINK-25509: -- [~martijnvisser] Hmm... I am not sure there is consensus in the community that externalized connectors should only use public APIs. Was this documented or discussed on the mailing list? I agree it is preferred to have externalized connectors using only public APIs. But it is also important that we only expose a minimum set of well-designed APIs rather than simply making the existing APIs public. This is non-trivial effort and someone in the community needs to voluntary to do this. Before this is done, I guess we have to stick with the fact that externalized repos will be using internal APIs in the near future. I guess we probably need to update the given externalized repo to only use public APIs before having nightly build to verify that it can always compile with the master head. In this particular case, we have added an abstract method `removeSplits()` to SplitFetcherManager. This is a backward incompatible change for externalized repos like flink-connector-pulsar. This is because PulsarSourceFetcherManager should override removeSplits() to compile with flink 1.18-snapshot. Then it will fail to compile with flink-1.17 as there is no such method to override in flink 1.17. The simplest way to address this issue is to update Flink master branch to make `removeSplits()` non-abstract. But IMO this approach makes the code hard to read and is very unconventional. I would prefer to build externalized repo in the same way as what most users would do. That is, the master branch should only be compiled against one version of Flink. And it can have a separate branch for each extra Flink version. What do you think? > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708707#comment-17708707 ] Martijn Visser commented on FLINK-25509: [~lindong] If it's an internal API that's being used by a connector and if that API should indeed be used by the connector, then the internal API should be made public. Especially with the externalization, it's ever more important that we pay more attention to our API strategy (and I hope that in 1.18 we will also improve on our automated testing for resolving these issues, especially for connectors that are on the target interfaces). > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708631#comment-17708631 ] Dong Lin commented on FLINK-25509: -- [~martijnvisser] This ticket is open because we have not updated flink-connector-kafka as described in FLIP-208. Huang has only merged the change needed in the Flink core repo. It seems that the flink-connector-pulsar Github Action is configured to build with both Flink 1.17 and Flink 1.18. I am not sure if it is going to work reliably. Let's say we make a backward-incompatible change to an internal API (e.g. changed a method name) that is used by our own connector repo, how is the flink-connector-pulsar code going to deal with such changes? > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708518#comment-17708518 ] Martijn Visser commented on FLINK-25509: [~ruanhang1993] [~lindong] This ticket is currently open, but it appears the change has been merged. It also appears that this change has broken Pulsar for 1.18-SNAPSHOT, see https://github.com/apache/flink-connector-pulsar/actions/runs/4608676287/jobs/8144772934#step:13:151 Could you take a look? > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703075#comment-17703075 ] Dong Lin commented on FLINK-25509: -- The change to the connector-base has been merged into apache/flink master branch 1d33773e6b7f9f76f03ff8ffd73171b95fa24ccb. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17650103#comment-17650103 ] Hang Ruan commented on FLINK-25509: --- Hi, [~lindong] , I have read the FLIP and I am willing to help to implement this feature. Would you mind to assign this ticket to me? Thanks~ > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)