RE: Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-12 Thread Salva Alcántara
Yep, I agree with that, but I guess losing the first-class citizen status
within Flink will make many companies currently in doubt finally adopt
Java. For non-FP shops or companies without a strong command of Scala,
using Java will simplify things in general and avoid some unnecessary pains
(hiring & training). Although possible, IMO using Scala without the
required serialization support for its specific types nor a nice API
wrapper feels a bit artificial and awkward, especially once Java 17 is
supported, and I don't think it is worth it for most cases.

Salva

On 2022/10/05 19:26:49 David Anderson wrote:
> I want to clarify one point here, which is that modifying jobs written in
> Scala to use Flink's Java API does not require porting them to Java. I can
> readily understand why folks using Scala might rather use Java 17 than
Java
> 11, but sticking to Scala will remain an option even if Flink's Scala API
> goes away.
>
> For more on this, see [1] and some of the examples it points to, such as
> those in [2].
>
> [1] https://flink.apache.org/2022/02/22/scala-free.html
> [2] https://github.com/sjwiesman/flink-scala-3
>
> On Tue, Oct 4, 2022 at 6:16 PM Clayton Wohl  wrote:
>
> > +1
> >
> > At my employer, we maintain several Flink jobs in Scala. We've been
> > writing newer jobs in Java, and we'd be fine with porting our Scala jobs
> > over to the Java API.
> >
> > I'd like to request Java 17 support. Specifically, Java records is a
> > feature our Flink code would use a lot of and make the Java syntax much
> > nicer.
> >
>


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-12 Thread Salva Alcántara
Hi Martijn,

Maybe a bit of an off-topic, but regarding Java 17 support, will it be
possible to replace POJOs with Java records in existing applications?

In a project I maintain we use Lombok a lot, but with Java records we would
probably stop using it (or significantly reduce its usage).

Will there be a way to promote existing POJOs (either written "manually" or
using Lombok) to Java records without breaking serialization? (assuming
that those POJOs are used as immutable values, e.g., setters are never
used).

Regards,

Salva

On Wed, Oct 12, 2022 at 9:11 PM Martijn Visser 
wrote:

> Hi everyone,
>
> Thanks again for all your feedback. It's very much appreciated.
>
> My overall feeling is that people are not opposed to the FLIP. There is
> demand for adding Java 17 support before dropping the Scala APIs. Given
> that the proposal for actually dropping the Scala APIs would only happen
> with a Flink 2.0 and Java 17 support would either happen in a new minor
> version or a new major version (I haven't seen a FLIP or discussion being
> opened adding Java 17 support, only on deprecating Java 8), Java 17 support
> would either be there earlier (in a new minor version) or at the same time
> (with Flink 2.0) when the Scala APIs would be dropped.
>
> If there are no more discussion topics, I would move this FLIP to a vote
> at the beginning of next week.
>
> Best regards,
>
> Martijn
>
> On Sun, Oct 9, 2022 at 10:36 AM guenterh.lists 
> wrote:
>
>> Hi Martijn
>>
>> I do not maintain a large production application based on Flink, so it
>> would not be a problem for me to convert existing implementations to
>> whatever API.
>>
>> I am working in the area of cultural heritage, which is mainly about the
>> processing of structured (meta)-data (scientific libraries, archives and
>> museums)
>> My impression: People without much background/experience with Java
>> implementations find it easier to get into the functional mindset as
>> supported in Scala. That's why I think it would be very unfortunate if the
>> use of Scala in Flink becomes more and more limited or neglected.
>>
>> I think using the Java API in Scala is a possible way also in my
>> environment.
>>
>> In the last weeks I tried to port the examples from the "Flink Course" of
>> Daniel Ciorcilan (https://rockthejvm.com/p/flink - he mainly offers
>> Scala courses), which are exclusively based on the native Scala API, to the
>> Java API. This has worked without any problems as far as I can see. So far
>> I haven't tried any examples based on the Table API or streaming workflows
>> in batch mode (which would be important for our environment).
>>
>> My main trouble: So far I don't know enough about the limitations of
>> using the Java API in a Scala implementation and what that means. My
>> current understanding: the limitation is mainly in deriving the type
>> information in generic APIs with Scala types. For me it would be very
>> significant and helpful if there would be more information, descriptions
>> and examples about this topic.
>>
>> So far unfortunately I had too little time to deal with a wrapper like
>> flink-scala-api (https://github.com/findify/flink-scala-api ) and the
>> current alternative is probably going to be deprecated in the future (
>> https://github.com/ariskk/flink4s/issues/17#issuecomment-1125806808 )
>>
>> Günter
>>
>>
>> On 04.10.22 13:58, Martijn Visser wrote:
>>
>> Hi Marton,
>>
>> You're making a good point, I originally wanted to include already the
>> User mailing list to get their feedback but forgot to do so. I'll do some
>> more outreach via other channels as well.
>>
>> @Users of Flink, I've made a proposal to deprecate and remove Scala API
>> support in a future version of Flink. Your feedback on this topic is very
>> much appreciated.
>>
>> Regarding the large Scala codebase for Flink, a potential alternative
>> could be to have a wrapper for all Java APIs that makes them available as
>> Scala APIs. However, this still requires Scala maintainers and I don't
>> think that we currently have those in our community. The easiest solution
>> for them would be to use the Java APIs directly. Yes it would involve work,
>> but we won't actually be able to remove the Scala APIs until Flink 2.0 so
>> there's still time for that :)
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, Oct 4, 2022 at 1:26 AM Márton Balassi 
>> wrote:
>>
>>> Hi Martjin,
>>>
>>> Thanks for compiling the FLIP. I agree with the sentiment that Scala
>>> poses
>>> considerable maintenance overhead and key improvements (like 2.13 or
>>> 2.12.8
>>> supports) are hanging stale. With that said before we make this move we
>>> should attempt to understand the userbase affected.
>>> A quick Slack and user mailing list search does return quite a bit of
>>> results for scala (admittedly a cursory look at them suggest that many of
>>> them have to do with missing features in Scala that exist in Java or
>>> Scala
>>> versions). I would love to see some polls on this 

allowNonRestoredState doesn't seem to be working

2022-10-12 Thread Yaroslav Tkachenko
Hey everyone,

I'm trying to redeploy an application using a savepoint. The new version of
the application has a few operators with new uids and a few operators with
the old uids. I'd like to keep the state for the old ones.

I passed the allowNonRestoredState flag (using Apache Kubernetes Operator
actually) and I can confirm that
"execution.savepoint.ignore-unclaimed-state" is "true" after that.

However, the application still fails with the following exception:

"java.lang.IllegalStateException: Failed to rollback to
checkpoint/savepoint s3p://. Cannot map checkpoint/savepoint
state for operator d9ea0f9654a3395802138c72c1bfd35b to the new program,
because the operator is not available in the new program. If you want to
allow to skip this, you can set the --allowNonRestoredState option on the
CLI."

Is there a situation where allowNonRestoredState may not work? Thanks.


Re: Question about SQL gateway

2022-10-12 Thread Ww J
Thanks Xuyang.

Jack

> On Oct 12, 2022, at 8:46 AM, Xuyang  wrote:
> 
> Hi, currently I think there is no ha about gateway. When the gateway crashes, 
> the job about being submitted sync will be cancelled, and the async job will 
> continue running. When the gateway restarts, the async job could be found by 
> gateway. BTW, the work about ha is in continuous progress.
> At 2022-10-11 13:35:50, "Ww J"  wrote:
>> Hi,
>> 
>> I submit a stream job from the SQL gateway. The stream job keeps outputting 
>> results to the SQL gateway. If the SQL gateway restarts or crashes, the 
>> stream job will continue running. After the SQL gateway restarts, how to get 
>> the results of the steam job?
>> 
>> Thanks.
>> 
>> Jack



[ANNOUNCE] Apache Flink Table Store 0.2.1 released

2022-10-12 Thread Jingsong Lee
The Apache Flink community is very happy to announce the release of
Apache Flink Table Store 0.2.1.

Apache Flink Table Store is a unified storage to build dynamic tables
for both streaming and batch processing in Flink, supporting
high-speed data ingestion and timely data query.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/10/13/release-table-store-0.2.1.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Table Store can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20table-store

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352257

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Best,
Jingsong Lee


Re: Flink KafkaSource still referencing deleted topic

2022-10-12 Thread Hang Ruan
Hi, Robert,

The configuration allowNonRestoredState should be used like this:
./bin/flink run --detached --allowNonRestoredState

Best,
Hang

Robert Cullen  于2022年10月12日周三 23:13写道:

> I don't see AllowNonRestoredState in the configuration documentation.  How
> would it be passed to a job? On the command line like this:
>
> ./bin/flink run --detached -Dallownonrestoredstate=true ...
>
> On Tue, Oct 4, 2022 at 4:52 PM Martijn Visser 
> wrote:
>
>> Hi Mason,
>>
>> Definitely! Feel free to open a PR and ping me for a review.
>>
>> Cheers, Martijn
>>
>> On Tue, Oct 4, 2022 at 3:51 PM Mason Chen  wrote:
>>
>>> Hi Martjin,
>>>
>>> I notice that this question comes up quite often. Would this be a good
>>> addition to the KafkaSource documentation? I'd be happy to contribute to
>>> the documentation.
>>>
>>> Best,
>>> Mason
>>>
>>> On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser 
>>> wrote:
>>>
 Hi Robert,

 Based on
 https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic
 I think you'll need to change the UID for your KafkaSource and restart your
 job with allowNonRestoredState enabled.

 Best regards,

 Martijn

 On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen 
 wrote:

> We've changed the KafkaSource to ingest from a new topic but the old
> name is still being referenced:
>
> 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure 
> triggered by OperatorCoordinator for 'Source: Grokfailures' (operator 
> feca28aff5a3958840bee985ee7de4d3).  at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
>at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)
>  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)Caused by: 
> org.apache.flink.util.FlinkRuntimeException: Failed to handle partition 
> splits change due to at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239)
>  at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
>  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>... 3 moreCaused by: java.lang.RuntimeException: Failed to get 
> topic metadata.  at 
> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
>at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212)
>  at 
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158)
>   at 
> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)   
>at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>... 3 moreCaused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition.  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
>

Re: Job Manager getting restarted while restarting task manager

2022-10-12 Thread Xintong Song
I meant your jobmanager also received a SIGTERM signal, and you would need
to figure out where it comes from.

To be specific, this line of log:

> 2022-10-11 22:11:21,683 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED
> SIGNAL 15: SIGTERM. Shutting down as requested.
>

I believe this is from the jobmanager log, as `ClusterEntrypoint` is a
class used by jobmanager only.

Best,

Xintong



On Thu, Oct 13, 2022 at 9:06 AM yu'an huang  wrote:

> Hi,
>
> Which deployment mode do you use? What is the Flink version?
> I think killing TaskManagers won't make the JobMananger restart. You can
> provide the whole log as an attachment to investigate.
>
> On Wed, 12 Oct 2022 at 6:01 PM, Puneet Duggal 
> wrote:
>
>> Hi Xintong Song,
>>
>> Thanks for your immediate reply. Yes, I do restart task manager via kill
>> command and then flink restart because I have seen cases where simple flink
>> restart does not pickup the latest configuration. But what I am confused
>> about is why killing the task manager process and then restarting it is
>> causing the job manager to stop and restart.
>>
>> Regards,
>> Puneet
>>
>>
>> On 12-Oct-2022, at 7:33 AM, Xintong Song  wrote:
>>
>> The log shows that the jobmanager received a SIGTERM signal from
>> external. Depending on how you deploy Flink, that could be a 'kill '
>> command, or a kubernetes pod removal / eviction, etc. You may want to check
>> where the signal came from.
>>
>> Best,
>> Xintong
>>
>>
>>
>> On Wed, Oct 12, 2022 at 6:26 AM Puneet Duggal 
>> wrote:
>>
>>> Hi,
>>>
>>> I am facing an issue where when restarting task manager after adding
>>> some configuration changes, even though task manager restarts successfully
>>> with the updated configuration change, is causing the leader job manager to
>>> restart as well. Pasting the leader job manager logs here
>>>
>>>
>>> 2022-10-11 22:11:02,207 WARN  akka.remote.ReliableDeliverySupervisor
>>>[] - Association with remote system [
>>> akka.tcp://flink@:35376] has failed, address is now gated for
>>> [50] ms. Reason: [Disassociated]
>>> 2022-10-11 22:11:02,411 WARN
>>> akka.remote.transport.netty.NettyTransport   [] - Remote
>>> connection to [null] failed with java.net.ConnectException: Connection
>>> refused: /:35376
>>> 2022-10-11 22:11:02,413 WARN  akka.remote.ReliableDeliverySupervisor
>>>[] - Association with remote system [
>>> akka.tcp://flink@:35376] has failed, address is now gated for
>>> [50] ms. Reason: [Association failed with [
>>> akka.tcp://flink@:35376]] Caused by: [java.net.ConnectException:
>>> Connection refused: /:35376]
>>> 2022-10-11 22:11:02,682 WARN
>>> akka.remote.transport.netty.NettyTransport   [] - Remote
>>> connection to [null] failed with java.net.ConnectException: Connection
>>> refused: /:35376
>>> 2022-10-11 22:11:02,683 WARN  akka.remote.ReliableDeliverySupervisor
>>>[] - Association with remote system [
>>> akka.tcp://flink@:35376] has failed, address is now gated for
>>> [50] ms. Reason: [Association failed with [
>>> akka.tcp://flink@:35376]] Caused by: [java.net.ConnectException:
>>> Connection refused: /:35376]
>>> 2022-10-11 22:11:12,702 WARN
>>> akka.remote.transport.netty.NettyTransport   [] - Remote
>>> connection to [null] failed with java.net.ConnectException: Connection
>>> refused: /:35376
>>> 2022-10-11 22:11:12,703 WARN  akka.remote.ReliableDeliverySupervisor
>>>[] - Association with remote system [
>>> akka.tcp://flink@:35376] has failed, address is now gated for
>>> [50] ms. Reason: [Association failed with [
>>> akka.tcp://flink@:35376]] Caused by: [java.net.ConnectException:
>>> Connection refused: /:35376]
>>> 2022-10-11 22:11:21,683 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED
>>> SIGNAL 15: SIGTERM. Shutting down as requested.
>>> 2022-10-11 22:11:21,687 INFO  org.apache.flink.runtime.blob.BlobServer
>>>[] - Stopped BLOB server at 0.0.0.0:33887
>>>
>>>
>>> Regards,
>>> Puneet
>>>
>>>
>>>
>>


Re: Job Manager getting restarted while restarting task manager

2022-10-12 Thread yu'an huang
Hi,

Which deployment mode do you use? What is the Flink version?
I think killing TaskManagers won't make the JobMananger restart. You can
provide the whole log as an attachment to investigate.

On Wed, 12 Oct 2022 at 6:01 PM, Puneet Duggal 
wrote:

> Hi Xintong Song,
>
> Thanks for your immediate reply. Yes, I do restart task manager via kill
> command and then flink restart because I have seen cases where simple flink
> restart does not pickup the latest configuration. But what I am confused
> about is why killing the task manager process and then restarting it is
> causing the job manager to stop and restart.
>
> Regards,
> Puneet
>
>
> On 12-Oct-2022, at 7:33 AM, Xintong Song  wrote:
>
> The log shows that the jobmanager received a SIGTERM signal from external.
> Depending on how you deploy Flink, that could be a 'kill ' command, or
> a kubernetes pod removal / eviction, etc. You may want to check where the
> signal came from.
>
> Best,
> Xintong
>
>
>
> On Wed, Oct 12, 2022 at 6:26 AM Puneet Duggal 
> wrote:
>
>> Hi,
>>
>> I am facing an issue where when restarting task manager after adding some
>> configuration changes, even though task manager restarts successfully with
>> the updated configuration change, is causing the leader job manager to
>> restart as well. Pasting the leader job manager logs here
>>
>>
>> 2022-10-11 22:11:02,207 WARN  akka.remote.ReliableDeliverySupervisor
>>  [] - Association with remote system [
>> akka.tcp://flink@:35376] has failed, address is now gated for
>> [50] ms. Reason: [Disassociated]
>> 2022-10-11 22:11:02,411 WARN  akka.remote.transport.netty.NettyTransport
>>  [] - Remote connection to [null] failed with
>> java.net.ConnectException: Connection refused: /:35376
>> 2022-10-11 22:11:02,413 WARN  akka.remote.ReliableDeliverySupervisor
>>  [] - Association with remote system [
>> akka.tcp://flink@:35376] has failed, address is now gated for
>> [50] ms. Reason: [Association failed with [akka.tcp://flink@:35376]]
>> Caused by: [java.net.ConnectException: Connection refused: /:35376]
>> 2022-10-11 22:11:02,682 WARN  akka.remote.transport.netty.NettyTransport
>>  [] - Remote connection to [null] failed with
>> java.net.ConnectException: Connection refused: /:35376
>> 2022-10-11 22:11:02,683 WARN  akka.remote.ReliableDeliverySupervisor
>>  [] - Association with remote system [
>> akka.tcp://flink@:35376] has failed, address is now gated for
>> [50] ms. Reason: [Association failed with [akka.tcp://flink@:35376]]
>> Caused by: [java.net.ConnectException: Connection refused: /:35376]
>> 2022-10-11 22:11:12,702 WARN  akka.remote.transport.netty.NettyTransport
>>  [] - Remote connection to [null] failed with
>> java.net.ConnectException: Connection refused: /:35376
>> 2022-10-11 22:11:12,703 WARN  akka.remote.ReliableDeliverySupervisor
>>  [] - Association with remote system [
>> akka.tcp://flink@:35376] has failed, address is now gated for
>> [50] ms. Reason: [Association failed with [akka.tcp://flink@:35376]]
>> Caused by: [java.net.ConnectException: Connection refused: /:35376]
>> 2022-10-11 22:11:21,683 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED
>> SIGNAL 15: SIGTERM. Shutting down as requested.
>> 2022-10-11 22:11:21,687 INFO  org.apache.flink.runtime.blob.BlobServer
>>  [] - Stopped BLOB server at 0.0.0.0:33887
>>
>>
>> Regards,
>> Puneet
>>
>>
>>
>


Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-12 Thread Martijn Visser
Hi everyone,

Thanks again for all your feedback. It's very much appreciated.

My overall feeling is that people are not opposed to the FLIP. There is
demand for adding Java 17 support before dropping the Scala APIs. Given
that the proposal for actually dropping the Scala APIs would only happen
with a Flink 2.0 and Java 17 support would either happen in a new minor
version or a new major version (I haven't seen a FLIP or discussion being
opened adding Java 17 support, only on deprecating Java 8), Java 17 support
would either be there earlier (in a new minor version) or at the same time
(with Flink 2.0) when the Scala APIs would be dropped.

If there are no more discussion topics, I would move this FLIP to a vote at
the beginning of next week.

Best regards,

Martijn

On Sun, Oct 9, 2022 at 10:36 AM guenterh.lists 
wrote:

> Hi Martijn
>
> I do not maintain a large production application based on Flink, so it
> would not be a problem for me to convert existing implementations to
> whatever API.
>
> I am working in the area of cultural heritage, which is mainly about the
> processing of structured (meta)-data (scientific libraries, archives and
> museums)
> My impression: People without much background/experience with Java
> implementations find it easier to get into the functional mindset as
> supported in Scala. That's why I think it would be very unfortunate if the
> use of Scala in Flink becomes more and more limited or neglected.
>
> I think using the Java API in Scala is a possible way also in my
> environment.
>
> In the last weeks I tried to port the examples from the "Flink Course" of
> Daniel Ciorcilan (https://rockthejvm.com/p/flink - he mainly offers Scala
> courses), which are exclusively based on the native Scala API, to the Java
> API. This has worked without any problems as far as I can see. So far I
> haven't tried any examples based on the Table API or streaming workflows in
> batch mode (which would be important for our environment).
>
> My main trouble: So far I don't know enough about the limitations of using
> the Java API in a Scala implementation and what that means. My current
> understanding: the limitation is mainly in deriving the type information in
> generic APIs with Scala types. For me it would be very significant and
> helpful if there would be more information, descriptions and examples about
> this topic.
>
> So far unfortunately I had too little time to deal with a wrapper like
> flink-scala-api (https://github.com/findify/flink-scala-api ) and the
> current alternative is probably going to be deprecated in the future (
> https://github.com/ariskk/flink4s/issues/17#issuecomment-1125806808 )
>
> Günter
>
>
> On 04.10.22 13:58, Martijn Visser wrote:
>
> Hi Marton,
>
> You're making a good point, I originally wanted to include already the
> User mailing list to get their feedback but forgot to do so. I'll do some
> more outreach via other channels as well.
>
> @Users of Flink, I've made a proposal to deprecate and remove Scala API
> support in a future version of Flink. Your feedback on this topic is very
> much appreciated.
>
> Regarding the large Scala codebase for Flink, a potential alternative
> could be to have a wrapper for all Java APIs that makes them available as
> Scala APIs. However, this still requires Scala maintainers and I don't
> think that we currently have those in our community. The easiest solution
> for them would be to use the Java APIs directly. Yes it would involve work,
> but we won't actually be able to remove the Scala APIs until Flink 2.0 so
> there's still time for that :)
>
> Best regards,
>
> Martijn
>
> On Tue, Oct 4, 2022 at 1:26 AM Márton Balassi 
> wrote:
>
>> Hi Martjin,
>>
>> Thanks for compiling the FLIP. I agree with the sentiment that Scala poses
>> considerable maintenance overhead and key improvements (like 2.13 or
>> 2.12.8
>> supports) are hanging stale. With that said before we make this move we
>> should attempt to understand the userbase affected.
>> A quick Slack and user mailing list search does return quite a bit of
>> results for scala (admittedly a cursory look at them suggest that many of
>> them have to do with missing features in Scala that exist in Java or Scala
>> versions). I would love to see some polls on this topic, we could also use
>> the Flink twitter handle to ask the community about this.
>>
>> I am aware of users having large existing Scala codebases for Flink. This
>> move would pose a very large effort on them, as they would need to rewrite
>> much of their existing code. What are the alternatives in your opinion,
>> Martjin?
>>
>> On Tue, Oct 4, 2022 at 6:22 AM Martijn Visser 
>> wrote:
>>
>> > Hi everyone,
>> >
>> > I would like to open a discussion thread on FLIP-265 Deprecate and
>> remove
>> > Scala API support. Please take a look at
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support
>> > and provide your feedback.
>> >
>> > Best regards,
>> >

Re: Utilizing Kafka headers in Flink Kafka connector

2022-10-12 Thread Yaroslav Tkachenko
Hi,

You can implement a custom KafkaRecordDeserializationSchema (example
https://docs.immerok.cloud/docs/cookbook/reading-apache-kafka-headers-with-apache-flink/#the-custom-deserializer)
and just avoid emitting the record if the header value matches what you
need.

On Wed, Oct 12, 2022 at 11:04 AM Great Info  wrote:

> I have some flink applications that read streams from Kafka, now
> the producer side code has introduced some additional information in Kafka
> headers while producing records.
> Now I need to change my consumer-side logic to process the records if the
> header contains a specific value, if the header value is different than the
> one I am looking I just need to move forward with the next steam.
>
> I got some sample reference code
> but this logic needs to
> deserialize and verify the header. Is there any simple way to ignore the
> record before deserializing?
>


Utilizing Kafka headers in Flink Kafka connector

2022-10-12 Thread Great Info
I have some flink applications that read streams from Kafka, now
the producer side code has introduced some additional information in Kafka
headers while producing records.
Now I need to change my consumer-side logic to process the records if the
header contains a specific value, if the header value is different than the
one I am looking I just need to move forward with the next steam.

I got some sample reference code
but this logic needs to
deserialize and verify the header. Is there any simple way to ignore the
record before deserializing?


退订

2022-10-12 Thread 陈鑫
退订



Re:flink cdc什么时候支持flink 1.15.x?

2022-10-12 Thread Xuyang
Hi,你可以参考cdc社区中支持flink 
1.15的issue[1]和pr[2],着急的话,可以尝试先cp这个pr到本地分支[1]https://github.com/ververica/flink-cdc-connectors/issues/1363[2]https://github.com/ververica/flink-cdc-connectors/pull/1504
在 2022-10-11 11:01:25,"casel.chen"  写道:
>当前flinlk cdc master分支的snapshot版本最高支持到flink 1.14.4,尝试使用flink 
>1.15.2编译会出错,请问flink cdc什么时候支持flink 1.15.x?


Re:Question about SQL gateway

2022-10-12 Thread Xuyang
Hi, currently I think there is no ha about gateway. When the gateway crashes, 
the job about being submitted sync will be cancelled, and the async job will 
continue running. When the gateway restarts, the async job could be found by 
gateway. BTW, the work about ha is in continuous progress.
At 2022-10-11 13:35:50, "Ww J"  wrote:
>Hi,
>
>I submit a stream job from the SQL gateway. The stream job keeps outputting 
>results to the SQL gateway. If the SQL gateway restarts or crashes, the stream 
>job will continue running. After the SQL gateway restarts, how to get the 
>results of the steam job?
>
>Thanks.
>
>Jack


Re: Flink KafkaSource still referencing deleted topic

2022-10-12 Thread Robert Cullen
I don't see AllowNonRestoredState in the configuration documentation.  How
would it be passed to a job? On the command line like this:

./bin/flink run --detached -Dallownonrestoredstate=true ...

On Tue, Oct 4, 2022 at 4:52 PM Martijn Visser 
wrote:

> Hi Mason,
>
> Definitely! Feel free to open a PR and ping me for a review.
>
> Cheers, Martijn
>
> On Tue, Oct 4, 2022 at 3:51 PM Mason Chen  wrote:
>
>> Hi Martjin,
>>
>> I notice that this question comes up quite often. Would this be a good
>> addition to the KafkaSource documentation? I'd be happy to contribute to
>> the documentation.
>>
>> Best,
>> Mason
>>
>> On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser 
>> wrote:
>>
>>> Hi Robert,
>>>
>>> Based on
>>> https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic
>>> I think you'll need to change the UID for your KafkaSource and restart your
>>> job with allowNonRestoredState enabled.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen 
>>> wrote:
>>>
 We've changed the KafkaSource to ingest from a new topic but the old
 name is still being referenced:

 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure 
 triggered by OperatorCoordinator for 'Source: Grokfailures' (operator 
 feca28aff5a3958840bee985ee7de4d3).   at 
 org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
   at 
 org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
at 
 org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
   at 
 org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)
  at 
 org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)Caused by: 
 org.apache.flink.util.FlinkRuntimeException: Failed to handle partition 
 splits change due to at 
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239)
  at 
 org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
  at 
 org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
... 3 moreCaused by: java.lang.RuntimeException: Failed to get 
 topic metadata.  at 
 org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
at 
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212)
  at 
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158)
   at 
 org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
  at 
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
... 3 moreCaused by: java.util.concurrent.ExecutionException: 
 org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
 server does not host this topic-partition.  at 
 org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
   at 
 org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
 at 
 org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
 at 
 org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
   at 
 org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
... 10 moreCaused by: 
 org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
 server does not host this topic-partition.


 --
 Robert Cullen
 240-475-4490

>>>

-- 
Robert Cullen
240-475-4490


Re: Job uptime metric in Flink Operator managed cluster

2022-10-12 Thread Gyula Fóra
Hello!
The Flink operator currently does not delete the jobmanager pod when a
deployment is suspended.
This way the rest api stay available but no other resources are consumed
(taskmanagers are deleted)

When you delete the FlinkDeployment resource completely, then the
jobmanager deployment is also deleted.

In theory we could improve the logic to eventually delete the jobmanager
for suspended resources but we currently use this is a way to guarantee
more resiliency for the operator flow.

Cheers,
Gyula

On Wed, Oct 12, 2022 at 3:56 PM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hello,
>
> I recently deployed a Flink Operator in Kubernetes and wrote a simple
> FlinkDeployment CRD  to run it in application mode following this
> 
> .
>
> I noticed that, even after I edited the CRD and marked the spec.job.state
> field as *suspended, *the metric *jobmanager_job_uptime_value *continued
> to show the job status as *running*. I did verify that after re-applying
> these changes, the JM and TM pods were deleted and the cluster was not
> running anymore.
>
> Am I doing something incorrect or is there some other metric to monitor
> the job status when using Flink Operator ?
>
>
>
> --
> *Regards,*
> *Meghajit*
>


Job uptime metric in Flink Operator managed cluster

2022-10-12 Thread Meghajit Mazumdar
Hello,

I recently deployed a Flink Operator in Kubernetes and wrote a simple
FlinkDeployment CRD  to run it in application mode following this

.

I noticed that, even after I edited the CRD and marked the spec.job.state
field as *suspended, *the metric *jobmanager_job_uptime_value *continued to
show the job status as *running*. I did verify that after re-applying these
changes, the JM and TM pods were deleted and the cluster was not running
anymore.

Am I doing something incorrect or is there some other metric to monitor the
job status when using Flink Operator ?



-- 
*Regards,*
*Meghajit*


回复: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-12 Thread Ruan Hang
Thanks for raising the discussion, Qingsheng,

+1 on reverting the breaking changes.
+1 for making a unified and clearer metric definition in Flink 2.0

Best,
Hang


发件人: Jing Ge 
发送时间: 2022年10月12日 19:20
收件人: Qingsheng Ren 
抄送: Chesnay Schepler ; dev ; user 
; Martijn Visser ; Becket Qin 
; Jingsong Li ; Jark Wu 
; Leonard Xu ; Xintong Song 

主题: Re: [DISCUSS] Reverting sink metric name changes made in 1.15

Hi Qingsheng,

Just want to make sure we are on the same page. Are you suggesting switching 
the naming between "numXXXSend" and "numXXXOut" or reverting all the changes we 
did with FLINK-26126 and FLINK-26492?

For the naming switch, please pay attention that the behaviour has been changed 
since we introduced SinkV2[1]. So, please be aware of different 
numbers(behaviour change) even with the same metrics name. Sticking with the 
old name with the new behaviour (very bad idea, IMHO) might seem like saving 
the effort in the first place, but it might end up with monitoring unexpected 
metrics, which is even worse for users, i.e. I didn't change anything, but 
something has been broken since the last update.

For reverting, I am not sure how to fix the issue mentioned in FLINK-26126 
after reverting all changes. Like Chesnay has already pointed out, with SinkV2 
we have two different output lines - one with the external system and the other 
with the downstream operator. In this case, "numXXXSend" is rather a new metric 
than a replacement of "numXXXOut". The "numXXXOut" metric can still be used, 
depending on what the user wants to monitor.


Best regards,
Jing

[1] 
https://github.com/apache/flink/blob/51fc20db30d001a95de95b3b9993eeb06f558f6c/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java#L48

On Wed, Oct 12, 2022 at 12:48 PM Qingsheng Ren 
mailto:re...@apache.org>> wrote:
As a supplement, considering it could be a big reconstruction
redefining internal and external traffic and touching metric names in
almost all operators, this requires a lot of discussions and we might
do it finally in Flink 2.0. I think compatibility is a bigger blocker
in front of us, as the output of sink is a metric that users care a
lot about.

Thanks,
Qingsheng

On Wed, Oct 12, 2022 at 6:20 PM Qingsheng Ren 
mailto:re...@apache.org>> wrote:
>
> Thanks Chesnay for the reply. +1 for making a unified and clearer
> metric definition distinguishing internal and external data transfers.
> As you described, having IO in operators is quite common such as
> dimension tables in Table/SQL API. This definitely deserves a FLIP and
> an overall design.
>
> However I think it's necessary to change the metric back to
> numRecordsOut instead of sticking with numRecordsSend in 1.15 and
> 1.16. The most important argument is for compatibility as I mentioned
> in my previous email, otherwise all users have to modify their configs
> of metric systems after upgrading to Flink 1.15+, and all custom
> connectors have to change their implementations to migrate to the new
> metric name. I believe other ones participating and approving this
> proposal share the same concern about compatibility too. Also
> considering this issue is blocking the release of 1.16, maybe we could
> fix this asap, and as for defining a new metric for internal data
> transfers we can have an in-depth discussion later. WDYT?
>
> Best,
> Qingsheng
>
> On Tue, Oct 11, 2022 at 6:06 PM Chesnay Schepler 
> mailto:ches...@apache.org>> wrote:
> >
> > Currently I think that would be a mistake.
> >
> > Ultimately what we have here is the culmination of us never really 
> > considering how the numRecordsOut metric should behave for operators that 
> > emit data to other operators _and_ external systems. This goes beyond sinks.
> > This even applies to numRecordsIn, for cases where functions query/write 
> > data from/to the outside, (e.g., Async IO).
> >
> > Having 2 separate metrics for that, 1 exclusively for internal data 
> > transfers, and 1 exclusively for external data transfers, is the only way 
> > to get a consistent metric definition in the long-run.
> > We can jump back-and-forth now or just commit to it.
> >
> > I don't think we can really judge this based on FLIP-33. It was IIRC 
> > written before the two phase sinks were added, which heavily blurred the 
> > lines of what a sink even is. Because it definitely is _not_ the last 
> > operator in a chain anymore.
> >
> > What I would suggest is to stick with what we got (although I despise the 
> > name numRecordsSend), and alias the numRecordsOut metric for all 
> > non-TwoPhaseCommittingSink.
> >
> > On 11/10/2022 05:54, Qingsheng Ren wrote:
> >
> > Thanks for the details Chesnay!
> >
> > By “alias” I mean to respect the original definition made in FLIP-33 for 
> > numRecordsOut, which is the number of records written to the external 
> > system, and keep numRecordsSend as the same value as numRecordsOut for 
> > compatibility.
> 

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-12 Thread Jing Ge
Hi Qingsheng,

Just want to make sure we are on the same page. Are you suggesting
switching the naming between "numXXXSend" and "numXXXOut" or reverting all
the changes we did with FLINK-26126 and FLINK-26492?

For the naming switch, please pay attention that the behaviour has been
changed since we introduced SinkV2[1]. So, please be aware of different
numbers(behaviour change) even with the same metrics name. Sticking with
the old name with the new behaviour (very bad idea, IMHO) might seem like
saving the effort in the first place, but it might end up with monitoring
unexpected metrics, which is even worse for users, i.e. I didn't change
anything, but something has been broken since the last update.

For reverting, I am not sure how to fix the issue mentioned in FLINK-26126
after reverting all changes. Like Chesnay has already pointed out, with
SinkV2 we have two different output lines - one with the external system
and the other with the downstream operator. In this case, "numXXXSend" is
rather a new metric than a replacement of "numXXXOut". The "numXXXOut"
metric can still be used, depending on what the user wants to monitor.


Best regards,
Jing

[1]
https://github.com/apache/flink/blob/51fc20db30d001a95de95b3b9993eeb06f558f6c/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java#L48


On Wed, Oct 12, 2022 at 12:48 PM Qingsheng Ren  wrote:

> As a supplement, considering it could be a big reconstruction
> redefining internal and external traffic and touching metric names in
> almost all operators, this requires a lot of discussions and we might
> do it finally in Flink 2.0. I think compatibility is a bigger blocker
> in front of us, as the output of sink is a metric that users care a
> lot about.
>
> Thanks,
> Qingsheng
>
> On Wed, Oct 12, 2022 at 6:20 PM Qingsheng Ren  wrote:
> >
> > Thanks Chesnay for the reply. +1 for making a unified and clearer
> > metric definition distinguishing internal and external data transfers.
> > As you described, having IO in operators is quite common such as
> > dimension tables in Table/SQL API. This definitely deserves a FLIP and
> > an overall design.
> >
> > However I think it's necessary to change the metric back to
> > numRecordsOut instead of sticking with numRecordsSend in 1.15 and
> > 1.16. The most important argument is for compatibility as I mentioned
> > in my previous email, otherwise all users have to modify their configs
> > of metric systems after upgrading to Flink 1.15+, and all custom
> > connectors have to change their implementations to migrate to the new
> > metric name. I believe other ones participating and approving this
> > proposal share the same concern about compatibility too. Also
> > considering this issue is blocking the release of 1.16, maybe we could
> > fix this asap, and as for defining a new metric for internal data
> > transfers we can have an in-depth discussion later. WDYT?
> >
> > Best,
> > Qingsheng
> >
> > On Tue, Oct 11, 2022 at 6:06 PM Chesnay Schepler 
> wrote:
> > >
> > > Currently I think that would be a mistake.
> > >
> > > Ultimately what we have here is the culmination of us never really
> considering how the numRecordsOut metric should behave for operators that
> emit data to other operators _and_ external systems. This goes beyond sinks.
> > > This even applies to numRecordsIn, for cases where functions
> query/write data from/to the outside, (e.g., Async IO).
> > >
> > > Having 2 separate metrics for that, 1 exclusively for internal data
> transfers, and 1 exclusively for external data transfers, is the only way
> to get a consistent metric definition in the long-run.
> > > We can jump back-and-forth now or just commit to it.
> > >
> > > I don't think we can really judge this based on FLIP-33. It was IIRC
> written before the two phase sinks were added, which heavily blurred the
> lines of what a sink even is. Because it definitely is _not_ the last
> operator in a chain anymore.
> > >
> > > What I would suggest is to stick with what we got (although I despise
> the name numRecordsSend), and alias the numRecordsOut metric for all
> non-TwoPhaseCommittingSink.
> > >
> > > On 11/10/2022 05:54, Qingsheng Ren wrote:
> > >
> > > Thanks for the details Chesnay!
> > >
> > > By “alias” I mean to respect the original definition made in FLIP-33
> for numRecordsOut, which is the number of records written to the external
> system, and keep numRecordsSend as the same value as numRecordsOut for
> compatibility.
> > >
> > > I think keeping numRecordsOut for the output to the external system is
> more intuitive to end users because in most cases the metric of data flow
> output is more essential. I agree with you that a new metric is required,
> but considering compatibility and users’ intuition I prefer to keep the
> initial definition of numRecordsOut in FLIP-33 and name a new metric for
> sink writer’s output to downstream operators. This might be against
> consistency with 

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-12 Thread Qingsheng Ren
As a supplement, considering it could be a big reconstruction
redefining internal and external traffic and touching metric names in
almost all operators, this requires a lot of discussions and we might
do it finally in Flink 2.0. I think compatibility is a bigger blocker
in front of us, as the output of sink is a metric that users care a
lot about.

Thanks,
Qingsheng

On Wed, Oct 12, 2022 at 6:20 PM Qingsheng Ren  wrote:
>
> Thanks Chesnay for the reply. +1 for making a unified and clearer
> metric definition distinguishing internal and external data transfers.
> As you described, having IO in operators is quite common such as
> dimension tables in Table/SQL API. This definitely deserves a FLIP and
> an overall design.
>
> However I think it's necessary to change the metric back to
> numRecordsOut instead of sticking with numRecordsSend in 1.15 and
> 1.16. The most important argument is for compatibility as I mentioned
> in my previous email, otherwise all users have to modify their configs
> of metric systems after upgrading to Flink 1.15+, and all custom
> connectors have to change their implementations to migrate to the new
> metric name. I believe other ones participating and approving this
> proposal share the same concern about compatibility too. Also
> considering this issue is blocking the release of 1.16, maybe we could
> fix this asap, and as for defining a new metric for internal data
> transfers we can have an in-depth discussion later. WDYT?
>
> Best,
> Qingsheng
>
> On Tue, Oct 11, 2022 at 6:06 PM Chesnay Schepler  wrote:
> >
> > Currently I think that would be a mistake.
> >
> > Ultimately what we have here is the culmination of us never really 
> > considering how the numRecordsOut metric should behave for operators that 
> > emit data to other operators _and_ external systems. This goes beyond sinks.
> > This even applies to numRecordsIn, for cases where functions query/write 
> > data from/to the outside, (e.g., Async IO).
> >
> > Having 2 separate metrics for that, 1 exclusively for internal data 
> > transfers, and 1 exclusively for external data transfers, is the only way 
> > to get a consistent metric definition in the long-run.
> > We can jump back-and-forth now or just commit to it.
> >
> > I don't think we can really judge this based on FLIP-33. It was IIRC 
> > written before the two phase sinks were added, which heavily blurred the 
> > lines of what a sink even is. Because it definitely is _not_ the last 
> > operator in a chain anymore.
> >
> > What I would suggest is to stick with what we got (although I despise the 
> > name numRecordsSend), and alias the numRecordsOut metric for all 
> > non-TwoPhaseCommittingSink.
> >
> > On 11/10/2022 05:54, Qingsheng Ren wrote:
> >
> > Thanks for the details Chesnay!
> >
> > By “alias” I mean to respect the original definition made in FLIP-33 for 
> > numRecordsOut, which is the number of records written to the external 
> > system, and keep numRecordsSend as the same value as numRecordsOut for 
> > compatibility.
> >
> > I think keeping numRecordsOut for the output to the external system is more 
> > intuitive to end users because in most cases the metric of data flow output 
> > is more essential. I agree with you that a new metric is required, but 
> > considering compatibility and users’ intuition I prefer to keep the initial 
> > definition of numRecordsOut in FLIP-33 and name a new metric for sink 
> > writer’s output to downstream operators. This might be against consistency 
> > with metrics in other operators in Flink but maybe it’s acceptable to have 
> > the sink as a special case.
> >
> > Best,
> > Qingsheng
> > On Oct 10, 2022, 19:13 +0800, Chesnay Schepler , wrote:
> >
> > > I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut
> >
> > But that's not possible. If it were that simple there would have never been 
> > a need to introduce another metric in the first place.
> >
> > It's a rather fundamental issue with how the new sinks work, in that they 
> > emit data to the external system (usually considered as "numRecordsOut" of 
> > sinks) while _also_ sending data to a downstream operator (usually 
> > considered as "numRecordsOut" of tasks).
> > The original issue was that the numRecordsOut of the sink counted both 
> > (which is completely wrong).
> >
> > A new metric was always required; otherwise you inevitably end up breaking 
> > some semantic.
> > Adding a new metric for what the sink writes to the external system is, for 
> > better or worse, more consistent with how these metrics usually work in 
> > Flink.
> >
> > On 10/10/2022 12:45, Qingsheng Ren wrote:
> >
> > Thanks everyone for joining the discussion!
> >
> > > Do you have any idea what has happened in the process here?
> >
> > The discussion in this PR [1] shows some details and could be helpful to 
> > understand the original motivation of the renaming. We do have a test case 
> > for guarding metrics but unfortunaly the case was also 

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-12 Thread Qingsheng Ren
Thanks Chesnay for the reply. +1 for making a unified and clearer
metric definition distinguishing internal and external data transfers.
As you described, having IO in operators is quite common such as
dimension tables in Table/SQL API. This definitely deserves a FLIP and
an overall design.

However I think it's necessary to change the metric back to
numRecordsOut instead of sticking with numRecordsSend in 1.15 and
1.16. The most important argument is for compatibility as I mentioned
in my previous email, otherwise all users have to modify their configs
of metric systems after upgrading to Flink 1.15+, and all custom
connectors have to change their implementations to migrate to the new
metric name. I believe other ones participating and approving this
proposal share the same concern about compatibility too. Also
considering this issue is blocking the release of 1.16, maybe we could
fix this asap, and as for defining a new metric for internal data
transfers we can have an in-depth discussion later. WDYT?

Best,
Qingsheng

On Tue, Oct 11, 2022 at 6:06 PM Chesnay Schepler  wrote:
>
> Currently I think that would be a mistake.
>
> Ultimately what we have here is the culmination of us never really 
> considering how the numRecordsOut metric should behave for operators that 
> emit data to other operators _and_ external systems. This goes beyond sinks.
> This even applies to numRecordsIn, for cases where functions query/write data 
> from/to the outside, (e.g., Async IO).
>
> Having 2 separate metrics for that, 1 exclusively for internal data 
> transfers, and 1 exclusively for external data transfers, is the only way to 
> get a consistent metric definition in the long-run.
> We can jump back-and-forth now or just commit to it.
>
> I don't think we can really judge this based on FLIP-33. It was IIRC written 
> before the two phase sinks were added, which heavily blurred the lines of 
> what a sink even is. Because it definitely is _not_ the last operator in a 
> chain anymore.
>
> What I would suggest is to stick with what we got (although I despise the 
> name numRecordsSend), and alias the numRecordsOut metric for all 
> non-TwoPhaseCommittingSink.
>
> On 11/10/2022 05:54, Qingsheng Ren wrote:
>
> Thanks for the details Chesnay!
>
> By “alias” I mean to respect the original definition made in FLIP-33 for 
> numRecordsOut, which is the number of records written to the external system, 
> and keep numRecordsSend as the same value as numRecordsOut for compatibility.
>
> I think keeping numRecordsOut for the output to the external system is more 
> intuitive to end users because in most cases the metric of data flow output 
> is more essential. I agree with you that a new metric is required, but 
> considering compatibility and users’ intuition I prefer to keep the initial 
> definition of numRecordsOut in FLIP-33 and name a new metric for sink 
> writer’s output to downstream operators. This might be against consistency 
> with metrics in other operators in Flink but maybe it’s acceptable to have 
> the sink as a special case.
>
> Best,
> Qingsheng
> On Oct 10, 2022, 19:13 +0800, Chesnay Schepler , wrote:
>
> > I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut
>
> But that's not possible. If it were that simple there would have never been a 
> need to introduce another metric in the first place.
>
> It's a rather fundamental issue with how the new sinks work, in that they 
> emit data to the external system (usually considered as "numRecordsOut" of 
> sinks) while _also_ sending data to a downstream operator (usually considered 
> as "numRecordsOut" of tasks).
> The original issue was that the numRecordsOut of the sink counted both (which 
> is completely wrong).
>
> A new metric was always required; otherwise you inevitably end up breaking 
> some semantic.
> Adding a new metric for what the sink writes to the external system is, for 
> better or worse, more consistent with how these metrics usually work in Flink.
>
> On 10/10/2022 12:45, Qingsheng Ren wrote:
>
> Thanks everyone for joining the discussion!
>
> > Do you have any idea what has happened in the process here?
>
> The discussion in this PR [1] shows some details and could be helpful to 
> understand the original motivation of the renaming. We do have a test case 
> for guarding metrics but unfortunaly the case was also modified so the 
> defense was broken.
>
> I think the reason why both the developer and the reviewer forgot to trigger 
> an discussion and gave a green pass on the change is that metrics are quite 
> “trivial” to be noticed as public APIs. As mentioned by Martijn I couldn’t 
> find a place noting that metrics are public APIs and should be treated 
> carefully while contributing and reviewing.
>
> IMHO three actions could be made to prevent this kind of changes in the 
> future:
>
> a. Add test case for metrics (which we already have in SinkMetricsITCase)
> b. We emphasize that any public-interface breaking changes 

Re: Job Manager getting restarted while restarting task manager

2022-10-12 Thread Puneet Duggal
Hi Xintong Song,

Thanks for your immediate reply. Yes, I do restart task manager via kill 
command and then flink restart because I have seen cases where simple flink 
restart does not pickup the latest configuration. But what I am confused about 
is why killing the task manager process and then restarting it is causing the 
job manager to stop and restart.

Regards,
Puneet

> On 12-Oct-2022, at 7:33 AM, Xintong Song  wrote:
> 
> The log shows that the jobmanager received a SIGTERM signal from external. 
> Depending on how you deploy Flink, that could be a 'kill ' command, or a 
> kubernetes pod removal / eviction, etc. You may want to check where the 
> signal came from.
> 
> Best,
> Xintong
> 
> 
> On Wed, Oct 12, 2022 at 6:26 AM Puneet Duggal  > wrote:
> Hi,
> 
> I am facing an issue where when restarting task manager after adding some 
> configuration changes, even though task manager restarts successfully with 
> the updated configuration change, is causing the leader job manager to 
> restart as well. Pasting the leader job manager logs here
> 
> 
> 2022-10-11 22:11:02,207 WARN  akka.remote.ReliableDeliverySupervisor  
>  [] - Association with remote system 
> [akka.tcp://flink@:35376] has failed, address is now gated for [50] 
> ms. Reason: [Disassociated]
> 2022-10-11 22:11:02,411 WARN  akka.remote.transport.netty.NettyTransport  
>  [] - Remote connection to [null] failed with 
> java.net.ConnectException: Connection refused: /:35376
> 2022-10-11 22:11:02,413 WARN  akka.remote.ReliableDeliverySupervisor  
>  [] - Association with remote system 
> [akka.tcp://flink@:35376] has failed, address is now gated for [50] 
> ms. Reason: [Association failed with [akka.tcp://flink@:35376]] Caused 
> by: [java.net.ConnectException: Connection refused: /:35376]
> 2022-10-11 22:11:02,682 WARN  akka.remote.transport.netty.NettyTransport  
>  [] - Remote connection to [null] failed with 
> java.net.ConnectException: Connection refused: /:35376
> 2022-10-11 22:11:02,683 WARN  akka.remote.ReliableDeliverySupervisor  
>  [] - Association with remote system 
> [akka.tcp://flink@:35376] has failed, address is now gated for [50] 
> ms. Reason: [Association failed with [akka.tcp://flink@:35376]] Caused 
> by: [java.net.ConnectException: Connection refused: /:35376]
> 2022-10-11 22:11:12,702 WARN  akka.remote.transport.netty.NettyTransport  
>  [] - Remote connection to [null] failed with 
> java.net.ConnectException: Connection refused: /:35376
> 2022-10-11 22:11:12,703 WARN  akka.remote.ReliableDeliverySupervisor  
>  [] - Association with remote system 
> [akka.tcp://flink@:35376] has failed, address is now gated for [50] 
> ms. Reason: [Association failed with [akka.tcp://flink@:35376]] Caused 
> by: [java.net.ConnectException: Connection refused: /:35376]
> 2022-10-11 22:11:21,683 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2022-10-11 22:11:21,687 INFO  org.apache.flink.runtime.blob.BlobServer
>  [] - Stopped BLOB server at 0.0.0.0:33887 
> 
> 
> Regards,
> Puneet
> 
> 



Re: Flink falls back on to kryo serializer for GenericTypes

2022-10-12 Thread Chesnay Schepler
There's no alternative to Kryo for generic types, apart from 
implementing your Flink serializer (but technically at that point the 
type is no longer treated as a generic type).


enableForAvro only forces Avro to be used for POJO types.

On 11/10/2022 09:29, Sucheth S wrote:

Hello,

How to avoid flink's kryo serializer for GenericTypes ? Kryo is having 
some performance issues.


Tried below but no luck.
env.getConfig().disableForceKryo();
env.getConfig().enableForceAvro();
Tried this - env.getConfig().disableGenericTypes();
getting - Generic types have been disabled in the ExecutionConfig and type 
org.apache.avro.generic.GenericRecord is treated as a generic type


Regards,
Sucheth Shivakumar
website: https://sucheths.com
mobile : +1(650)-576-8050
San Mateo, United States




Re: Flink SQL 中同时写入多个 sink 时,是否能够保证先后次序

2022-10-12 Thread Shuo Cheng
Flink SQL 自身机制无法保证同一个作业多个 sink 的写入次序。 是否可以考虑从业务逻辑上动手脚,比如写入消息队列 sink 前加个 udf
filter, udf 查询 database,满足条件才写入消息队列,当然这种方式对性能可能有影响。

On Wed, Oct 12, 2022 at 2:41 PM Zhiwen Sun  wrote:

> hi all:
>
> 我们有个场景,需要 Flink SQL 同时写入消息和 database, 后续实时任务消费消息,再次读取 database, 如果消息先于
> database 写入,这就可能导致读取的数据不正确。
>
> 是否有办法保证 database 写入后,再发送消息?
>
> Zhiwen Sun
>


fail to mount hadoop-config-volume when using flink-k8s-operator

2022-10-12 Thread Liting Liu (litiliu)
Hi, community:
  I'm using flink-k8s-operator v1.2.0 to deploy flink job. And the 
"HADOOP_CONF_DIR" environment variable was setted in the image that i buiilded 
from flink:1.15.  I found the taskmanager pod was trying to mount a volume 
named "hadoop-config-volume" from configMap.  But the configMap with the name 
"hadoop-config-volume" was't created.

Do i need to remove the "HADOOP_CONF_DIR" environment variable in dockerfile?
If yes, what should i do to specify the hadoop conf?



回复: Flink SQL 中同时写入多个 sink 时,是否能够保证先后次序

2022-10-12 Thread 仙路尽头谁为峰
Hi Zhiwen:

 
可以试试将kafkasink设置为exactly-once,同时下游任务的kafkasource设置为读已提交,不过如果你的checkpoint间隔很长,下游要很久才能从kafka拿到数据。

从 Windows 版邮件发送

发件人: Zhiwen Sun
发送时间: 2022年10月12日 14:42
收件人: user-zh@flink.apache.org
主题: Flink SQL 中同时写入多个 sink 时,是否能够保证先后次序

hi all:

我们有个场景,需要 Flink SQL 同时写入消息和 database, 后续实时任务消费消息,再次读取 database, 如果消息先于
database 写入,这就可能导致读取的数据不正确。

是否有办法保证 database 写入后,再发送消息?

Zhiwen Sun



Re: videos Flink Forward San Francisco 2022

2022-10-12 Thread guenterh.lists

Thanks for your open feedback Jun - I appreciate it

Very best wishes from Basel

Günter

On 11.10.22 18:12, Jun Qin wrote:

Hi

Totally agree, rest assured that it was some venue limitations and 
some post-pandemic organizational challenges that meant no videos this 
year. Thanks a lot for the feedback and please let's stay positive and 
not draw the wrong conclusions.


Thanks
Jun

On Oct 10, 2022, at 2:39 PM, guenterh.lists 
 wrote:


really very sad - as far as I know this happens for the first time, 
attitude of new Ververica?


Hopefully immerok may resume the open mentality of data artisans.

Günter

On 10.10.22 11:26, Martijn Visser wrote:

Hi Günter,

I've understood that only the keynotes were recorded and not the 
other sessions.


Best regards,

Martijn

On Sun, Oct 9, 2022 at 4:10 PM guenterh.lists 
 wrote:


Sorry if this question was already posted

By now only a few videos of the conference were published
(mainly the
keynotes)
https://www.youtube.com/playlist?list=PLDX4T_cnKjD10qp1y2B4sLNW5KL_P6RuB

Are the other presentations not going to be published?

Günter




--
Günter Hipler
University library Leipzig