Re: Regarding FLIP-91's status

2021-05-28 Thread Sonam Mandal
Hi Matthias,

Thanks for your quick response! I have sent a reply on the FLIP-91 thread, 
thanks for pointing me to it.
@Jark Wu it'll be great if you have any context on 
this as well.

Thanks,
Sonam

From: Matthias Pohl 
Sent: Friday, May 28, 2021 5:18 AM
To: Sonam Mandal 
Cc: user@flink.apache.org ; Jark Wu 
Subject: Re: Regarding FLIP-91's status

Hi Sonam,
It looks like it has been stale for some time. You might be able to restart the 
discussion replying to the respective thread in the dev mailing list [1]. You 
seem to be right about the repository based on Jark's reply in the related 
ticket FLINK-15472 [2]. I'm adding Jark to the thread. Maybe, he can shed some 
light on the state of FLIP-91.

Best,
Matthias

[1] 
http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCADQYLGsCGDJkfd3L1hAy1y_M2625YkNHJGW82UraGLhzg6p7Ug%40mail.gmail.com%3E
[2] 
https://issues.apache.org/jira/browse/FLINK-15472

On Thu, May 27, 2021 at 9:51 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hello,

I was curious about the progress on 
FLIP-91.
 Is this actively being developed?
I believe the code is in development at 
https://github.com/ververica/flink-sql-gateway,
 is this the right REPO?

I haven't seen much activity on this since sometime last year. I wanted to 
understand if there is still a plan to continue developing this, and if not, I 
wanted to understand why.

Appreciate your help!

Thanks,
Sonam



Re: Running multiple CEP pattern rules

2021-05-28 Thread Tejas
Hi Dawid,
Do you have any plans to bring this functionality in flink CEP in future ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Running multiple CEP pattern rules

2021-05-28 Thread Dawid Wysakowicz
Hi Tejas,

It will not work that way. Bear in mind that every application of
CEP.pattern creates a new operator in the graph. The exceptions you are
seeing most probably result from calculating that huge graph and sending
that over. You are reaching the timeout on submitting that huge graph.
You can have many different patterns in a single job, but the number of
vertices in your graph is not unlimited.

In your scenario I'd try to combine the rules in a single operator. You
could try to use the ProcessFunction for that.

Best,

Dawid

On 28/05/2021 01:53, Tejas wrote:
> Hi,
> We are running into errors when running multiple CEP patterns. Here’s our
> use-case :
> We are planning to build a rule based engine on top of flink with huge
> number of rules and doing a POC for that. For POC we have around 1000
> pattern based rules which we are translating into CEP patterns and running
> these rules on a keyed stream of events data to detect patterns. We are
> partitioning the stream by orgId and each rule needs to be run into each
> org. Here’s the code we’ve written to implement that :
> /DataStream eventStream = null;
> DataStream partitionedInput =
> eventStream.keyBy((KeySelector) Event::getOrgid);
> List ruleList = new ArrayList<>();
> for (int i = 0; i < 100; i++) {
>   ruleList.add(new Rule("rule" + i, "process1", "process2", "process3"));
>   ruleList.add(
>   new Rule("rule" + (i + 500), "process4", "process5", "process6"));
> }
> for (Rule rule : ruleList) {
>   String st = rule.getStart();
>   String mi = rule.getMid();
>   String en = rule.getEnd();
>   String nm = rule.getName();
>   Pattern pattern =
>   Pattern.begin(
>   Pattern.begin("start")
>   .where(
>   new SimpleCondition() {
> @Override
> public boolean filter(Event value) throws Exception {
>   return value.getProcess().equals(st);
> }
>   })
>   .followedBy("middle")
>   .where(
>   new SimpleCondition() {
> @Override
> public boolean filter(Event event) {
>   return !event.getProcess().equals(mi);
> }
>   })
>   .optional()
>   .followedBy("end")
>   .where(
>   new SimpleCondition() {
> @Override
> public boolean filter(Event event) {
>   return event.getProcess().equals(en);
> }
>   }));
>   PatternStream patternStream = CEP.pattern(partitionedInput,
> pattern);
>   DataStream alerts =
>   patternStream.process(
>   new PatternProcessFunction() {
> @Override
> public void processMatch(
> Map> map, Context context,
> Collector collector)
> throws Exception {
>   Event start = map.containsKey("start") ?
> map.get("start").get(0) : null;
>   Event middle = map.containsKey("middle") ?
> map.get("middle").get(0) : null;
>   Event end = map.containsKey("end") ? map.get("end").get(0) :
> null;
>   StringJoiner joiner = new StringJoiner(",");
>   joiner
>   .add("Rule : " + nm + " ")
>   .add((start == null ? "" : start.getId()))
>   .add((middle == null ? "" : middle.getId()))
>   .add((end == null ? "" : end.getId()));
>   collector.collect(joiner.toString());
> }
>   });
>   alerts.print();/
> We tried to run this code on the flink cluster with 1 task manager with 4
> task slots and the task manager crashed with the error :
> /Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
>   at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>   at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>   at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
>   at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
>   at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
>   at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
>   at
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
>   at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
>   at
> org.apache.

Re: Statefun 2.2.2 Checkpoint restore NPE

2021-05-28 Thread Timothy Bess
Ok so after digging into it a bit it seems that the exception was thrown
here:
https://github.com/apache/flink-statefun/blob/release-2.2/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaIngressDeserializer.java#L48

I think it'd be useful to have a configuration to prevent null keys from
halting processing.
It looks like we are occasionally publishing with a key string that is
sometimes empty, and that is interpreted by Kafka as null. Then when it's
read back in, the ingress chokes on the null value.

I'm trying to keep from having to edit statefun and use my own jar, any
thoughts?

Thanks,

Tim

On Fri, May 28, 2021 at 10:33 AM Timothy Bess  wrote:

> Oh wow that Harness looks cool, I'll have to take a look at that.
>
> Unfortunately the JobManager UI seems to just show this:
> [image: image.png]
>
> Though it does seem that maybe the source function is where the failure is
> happening according to this?
> [image: image.png]
>
> Still investigating, but I do see a lot of these logs:
> 2021-05-28 14:25:09,199 WARN
>  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
> [] - Transaction KafkaTransactionState [transactionalId=feedback-union ->
> functions -> Sink:
> bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-39,
> producerId=2062, epoch=2684] has been open for 55399128 ms. This is close
> to or even exceeding the transaction timeout of 90 ms.
>
> Seems like it's restoring some old kafka transaction? Not sure. I like
> Arvid's idea of attaching a debugger, I'll definitely give that a try.
>
> On Fri, May 28, 2021 at 7:49 AM Arvid Heise  wrote:
>
>> If logs are not helping, I think the remaining option is to attach a
>> debugger [1]. I'd probably add a breakpoint to
>> LegacySourceFunctionThread#run and see what happens. If the issue is in
>> recovery, you should add a breakpoint to StreamTask#beforeInvoke.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters
>>
>> On Fri, May 28, 2021 at 1:11 PM Igal Shilman  wrote:
>>
>>> Hi Tim,
>>> Any additional logs from before are highly appreciated, this would help
>>> us to trace this issue.
>>> By the way, do you see something in the JobManager's UI?
>>>
>>> On Fri, May 28, 2021 at 9:06 AM Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
 Hi Timothy,

 It would indeed be hard to figure this out without any stack traces.

 Have you tried changing to debug level logs? Maybe you can also try
 using the StateFun Harness to restore and run your job in the IDE - in that
 case you should be able to see which code exactly is throwing this
 exception.

 Cheers,
 Gordon

 On Fri, May 28, 2021 at 12:39 PM Timothy Bess 
 wrote:

> Hi,
>
> Just checking to see if anyone has experienced this error. Might just
> be a Flink thing that's irrelevant to statefun, but my job keeps failing
> over and over with this message:
>
> 2021-05-28 03:51:13,001 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] -
> Starting FlinkKafkaInternalProducer (10/10) to produce into default
> topic __stateful_functions_random_topic_lNVlkW9SkYrtZ1oK
> 2021-05-28 03:51:13,001 INFO
> org.apache.flink.streaming.connectors.kafka.internal.
> FlinkKafkaInternalProducer [] - Attempting to resume transaction
> feedback-union -> functions -> Sink:
> bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-45
> with producerId 31 and epoch 3088
> 2021-05-28 03:51:13,017 WARN org.apache.flink.runtime.taskmanager.Task
> [] - Source: lead-leads-ingress -> router (leads) (10/10)
> (ff51aacdb850c6196c61425b82718862) switched from RUNNING to FAILED.
> java.lang.NullPointerException: null
>
> The null pointer doesn't come with any stack traces or anything. It's
> really mystifying. Seems to just fail while restoring continuously.
>
> Thanks,
>
> Tim
>



Flink Metrics Naming

2021-05-28 Thread Mason Chen
Can anyone give insight as to why Flink allows 2 metrics with the same “name”?

For example,

getRuntimeContext.addGroup(“group”, “group1”).counter(“myMetricName”);

And

getRuntimeContext.addGroup(“other_group”, 
“other_group1”).counter(“myMetricName”);

Are totally valid.


It seems that it has lead to some not-so-great implementations—the prometheus 
reporter and attaching the labels to the metric name, making the name quite 
verbose.




How to check is Kafka partition "idle" in emitRecordsWithTimestamps

2021-05-28 Thread Alexey Trenikhun
Hello,
I'm thinking about implementing custom Kafka connector which provides event 
alignment (similar to FLINK-10921, which seems abandoned). What is the way to 
determine is partition is idle from override of 
AbstractFetcher.emitRecordsWithTimestamps()? Does KafkaTopicPartitionState has 
this information ?

Thanks,
Alexey


[ANNOUNCE] Apache Flink 1.13.1 released

2021-05-28 Thread Dawid Wysakowicz
|The Apache Flink community is very happy to announce the release of
Apache Flink 1.13.1, which is the first bugfix release for the Apache
Flink 1.13 series.|
 
|Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.|
 
|The release is available for download at:|
|https://flink.apache.org/downloads.html
|
 
|Please check out the release blog post for an overview of the
improvements for this bugfix release:|
|https://flink.apache.org/news/2021/05/28/release-1.13.1.html|
 
|The full release notes are available in Jira:|
|https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350058|
 
|We would like to thank all contributors of the Apache Flink community
who made this release possible!|
 
|Regards,|
|Dawid Wysakowicz
|


OpenPGP_signature
Description: OpenPGP digital signature


Re: Triggering Savepoint fails to write data to S3 store

2021-05-28 Thread Matthias Pohl
Yes, that would work. But it might be still interesting to understand why
you ran into the timeout. Was it just a big state that just took longer
than expected? Or some network issue? ...that's just for you to understand
the underlying issue in a better way. But I'm glad the savepoint creation
was successful in the end.

Best,
Matthias

On Fri, May 28, 2021 at 2:35 PM Robert Cullen  wrote:

> Hi Matthias,  You are correct.  After a few minutes I took another look at
> my savepoint folder and the data was there.  I think increasing the timeout
> may resolve the problem?
>
> On Fri, May 28, 2021 at 8:21 AM Matthias Pohl 
> wrote:
>
>> Hi Robert,
>> it would be interesting to see the corresponding taskmanager/jobmanager
>> logs. That would help in finding out why the savepoint creation failed.
>> Just to verify: The savepoint data wasn't written to S3 even after the
>> timeout happened, was it?
>>
>> Best,
>> Matthias
>>
>> On Thu, May 27, 2021 at 7:50 PM Robert Cullen 
>> wrote:
>>
>>> I triggered a savepoint from a currently running job. Although the
>>> directory structure gets created in the MINIO S3 store, the command
>>> ultimately fails without writing the data.
>>>
>>> root@flink-client:/opt/flink# ./bin/flink list --target kubernetes-session 
>>> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa
>>> 2021-05-27 17:37:00,409 INFO  
>>> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
>>> flink cluster flink-jobmanager successfully, JobManager Web Interface: 
>>> http://flink-jobmanager-rest.cmdaa:8081
>>> Waiting for response...
>>> -- Running/Restarting Jobs ---
>>> 27.05.2021 16:50:00 : 72f614340dc1a7416d0613362d1ef83b : Streaming Log 
>>> Count (RUNNING)
>>> --
>>> No scheduled jobs.
>>> root@flink-client:/opt/flink# ./bin/flink savepoint 
>>> 72f614340dc1a7416d0613362d1ef83b --target kubernetes-session 
>>> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa
>>> 2021-05-27 17:37:58,776 INFO  
>>> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
>>> flink cluster flink-jobmanager successfully, JobManager Web Interface: 
>>> http://flink-jobmanager-rest.cmdaa:8081
>>> Triggering savepoint for job 72f614340dc1a7416d0613362d1ef83b.
>>> Waiting for response...
>>>
>>> 
>>>  The program finished with the following exception:
>>>
>>> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
>>> 72f614340dc1a7416d0613362d1ef83b failed.
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:777)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:754)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:751)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1072)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>> at 
>>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>>> Caused by: java.util.concurrent.TimeoutException
>>> at 
>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
>>> at 
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:771)
>>> ... 7 more
>>> root@flink-client:/opt/flink#
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>


Re: How can I use different user run flink

2021-05-28 Thread Matthias Pohl
Hi igyu,
I'm not sure whether I can be of much help here because I'm not that
familiar with Kerberos. But the Flink documentation [1] suggests deploying
separate Flink clusters for each keytab. Did you try that?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/security/security-kerberos/#objective

On Wed, May 26, 2021 at 5:48 AM igyu  wrote:

> I use CDH 6.3.2
> flink-1.12.3
>
> I enabel kerberos
>
> I want to use different user with different keytab,because I creat many
> queue in YARN , different user use different queue.
>
> --
> igyu
>


Re: JM cannot recover with Kubernetes HA

2021-05-28 Thread Matthias Pohl
Hi Enrique,
thanks for reaching out to the community. I'm not 100% sure what problem
you're facing. The log messages you're sharing could mean that the Flink
cluster still behaves as normal having some outages and the HA
functionality kicking in.

The behavior you're seeing with leaders for the different actors (i.e.
RestServer, Dispatcher, ResourceManager) being located on different hosts
is fine and no indication for something going wrong as well.

It might help to share the entire logs with us if you need assistance in
investigating your issue.

Best,
Matthias

On Thu, May 27, 2021 at 12:42 PM Enrique  wrote:

> To add to my post, instead of using POD IP for the `jobmanager.rpc.address`
> configuration we start each JM pod with the Fully Qualified Name `--host
> ..ns.svc:8081`  and this address gets
> persisted
> to the ConfigMaps. In some scenarios, the leader address in the ConfigMaps
> might differ.
>
> For example, let's assume I have 3 JMs:
>
> jm-0.jm-statefulset.ns.svc:8081 <-- Leader
> jm-1.jm-statefulset.ns.svc:8081
> jm-2.jm-statefulset..ns.svc:8081
>
> I have seen the ConfigMaps in the following state:
>
> RestServer Configmap Address: jm-0.jm-statefulset.ns.svc:8081
> DispatchServer Configmap Address: jm-1.jm-statefulset.ns.svc:8081
> ResourceManager ConfigMap Address: jm-0.jm-statefulset.ns.svc:8081
>
> Is this the correct behaviour?
>
> I then have seen that the TM pods fail to connect due to
>
> ```
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
> token
> not set: Ignoring message
> RemoteFencedMessage(b870874c1c590d593178811f052a42c9,
> RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration, Time)))
> sent to
> akka.tcp://fl...@jm-1.jm-statefulset.ns.svc
> :6123/user/rpc/resourcemanager_0
> because the fencing token is null.
> ```
>
> This is explained by Till
>
> https://issues.apache.org/jira/browse/FLINK-18367?focusedCommentId=17141070&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17141070
>
> Has anyone else seen this?
>
> Thanks!
>
> Enrique
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


StreamingFileSink only writes data to MINIO during savepoint

2021-05-28 Thread Robert Cullen
On my kubernetes cluster when I set the StreamingFileSink to write to a
local instance of S3 (MINIO - 500 GB) it only writes the data after I
execute a savepoint

The expected behavior is to write the data in real-time. I'm guessing the
memory requirements have not been met or a configuration in MINIO is
missing?  Any ideas?

-- 
Robert Cullen
240-475-4490


Re: [VOTE] Release 1.13.1, release candidate #1

2021-05-28 Thread Dawid Wysakowicz
Thank you all for the votes. I am happy to say we approved the release.
I will write a separate summary mail.

Best,

Dawid

On 28/05/2021 14:40, Robert Metzger wrote:
> +1 (binding)
>
> - Tried out reactive mode in from the scala 2.11 binary locally (with
> scale up & stop with savepoint)
> - reviewed website update
> - randomly checked a jar file in the staging repo (flink-python jar
> looks okay (I just checked superifically))
>
>
>
>
> On Fri, May 28, 2021 at 5:16 AM Leonard Xu  > wrote:
>
>
> +1 (non-binding)
>
> - verified signatures and hashes
> - built from source code with scala 2.11 succeeded
> - started a cluster, WebUI was accessible, ran a window word count
> job, no suspicious log output
> - ran some SQL jobs in SQL Client, the queries result is expected
> - the web PR looks good
>
> Best,
> Leonard
>
>
> > 在 2021年5月28日,10:25,Xingbo Huang  > 写道:
> >
> > +1 (non-binding)
> >
> > - verified checksums and signatures
> > - built from source code
> > - check apache-flink source/wheel package content
> > - run python udf job
> >
> > Best,
> > Xingbo
> >
> > Dawid Wysakowicz    >> 于2021年5月27日周四 下午9:45写道:
> > +1 (binding)
> >
> > verified signatures and checksums
> > built from sources and run an example, quickly checked Web UI
> > checked diff of pom.xml and NOTICE files from 1.13.0,
> > there were no version changes,
> > checked the updated licenses of javascript dependencies
> > Best,
> >
> > Dawid
> >
> > On 26/05/2021 11:15, Matthias Pohl wrote:
> >> Hi Dawid,
> >> +1 (non-binding)
> >>
> >> Thanks for driving this release. I checked the following things:
> >> - downloaded and build source code
> >> - verified checksums
> >> - double-checked diff of pom files between 1.13.0 and 1.13.1-rc1
> >> - did a visual check of the release blog post
> >> - started cluster and ran jobs (WindowJoin and WordCount); nothing
> >> suspicious found in the logs
> >> - verified change FLINK-22866 manually whether the issue is fixed
> >>
> >> Best,
> >> Matthias
> >>
> >> On Tue, May 25, 2021 at 3:33 PM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>>
> >
> >> wrote:
> >>
> >>> Hi everyone,
> >>> Please review and vote on the release candidate #1 for the
> version 1.13.1,
> >>> as follows:
> >>> [ ] +1, Approve the release
> >>> [ ] -1, Do not approve the release (please provide specific
> comments)
> >>>
> >>>
> >>> The complete staging area is available for your review, which
> includes:
> >>> * JIRA release notes [1],
> >>> * the official Apache source release and binary convenience
> releases to be
> >>> deployed to dist.apache.org 
> > [2], which are
> signed with the key with
> >>> fingerprint 31D2DD10BFC15A2D [3],
> >>> * all artifacts to be deployed to the Maven Central Repository
> [4],
> >>> * source code tag "release-1.13.1-rc1" [5],
> >>> * website pull request listing the new release and adding
> announcement
> >>> blog post [6].
> >>>
> >>> The vote will be open for at least 72 hours. It is adopted by
> majority
> >>> approval, with at least 3 PMC affirmative votes.
> >>>
> >>> Best,
> >>> Dawid
> >>>
> >>> [1]
> >>>
> 
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350058
> 
> 
> 
>  
> >
> >>> [2]
> https://dist.apache.org/repos/dist/dev/flink/flink-1.13.1-rc1/
> 
>  >
> >>> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> 
>  >
> >>> [4]
> >>>
> https://repository.apache.org/content/repositories/orgapacheflink-1422/
> 
>  

Re: Statefun 2.2.2 Checkpoint restore NPE

2021-05-28 Thread Timothy Bess
Oh wow that Harness looks cool, I'll have to take a look at that.

Unfortunately the JobManager UI seems to just show this:
[image: image.png]

Though it does seem that maybe the source function is where the failure is
happening according to this?
[image: image.png]

Still investigating, but I do see a lot of these logs:
2021-05-28 14:25:09,199 WARN
 org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
[] - Transaction KafkaTransactionState [transactionalId=feedback-union ->
functions -> Sink:
bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-39,
producerId=2062, epoch=2684] has been open for 55399128 ms. This is close
to or even exceeding the transaction timeout of 90 ms.

Seems like it's restoring some old kafka transaction? Not sure. I like
Arvid's idea of attaching a debugger, I'll definitely give that a try.

On Fri, May 28, 2021 at 7:49 AM Arvid Heise  wrote:

> If logs are not helping, I think the remaining option is to attach a
> debugger [1]. I'd probably add a breakpoint to
> LegacySourceFunctionThread#run and see what happens. If the issue is in
> recovery, you should add a breakpoint to StreamTask#beforeInvoke.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters
>
> On Fri, May 28, 2021 at 1:11 PM Igal Shilman  wrote:
>
>> Hi Tim,
>> Any additional logs from before are highly appreciated, this would help
>> us to trace this issue.
>> By the way, do you see something in the JobManager's UI?
>>
>> On Fri, May 28, 2021 at 9:06 AM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi Timothy,
>>>
>>> It would indeed be hard to figure this out without any stack traces.
>>>
>>> Have you tried changing to debug level logs? Maybe you can also try
>>> using the StateFun Harness to restore and run your job in the IDE - in that
>>> case you should be able to see which code exactly is throwing this
>>> exception.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On Fri, May 28, 2021 at 12:39 PM Timothy Bess 
>>> wrote:
>>>
 Hi,

 Just checking to see if anyone has experienced this error. Might just
 be a Flink thing that's irrelevant to statefun, but my job keeps failing
 over and over with this message:

 2021-05-28 03:51:13,001 INFO
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] -
 Starting FlinkKafkaInternalProducer (10/10) to produce into default
 topic __stateful_functions_random_topic_lNVlkW9SkYrtZ1oK
 2021-05-28 03:51:13,001 INFO
 org.apache.flink.streaming.connectors.kafka.internal.
 FlinkKafkaInternalProducer [] - Attempting to resume transaction
 feedback-union -> functions -> Sink:
 bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-45 with
 producerId 31 and epoch 3088
 2021-05-28 03:51:13,017 WARN org.apache.flink.runtime.taskmanager.Task
 [] - Source: lead-leads-ingress -> router (leads) (10/10)
 (ff51aacdb850c6196c61425b82718862) switched from RUNNING to FAILED.
 java.lang.NullPointerException: null

 The null pointer doesn't come with any stack traces or anything. It's
 really mystifying. Seems to just fail while restoring continuously.

 Thanks,

 Tim

>>>


Re: Query related to Minimum scrape interval for Prometheus and fetching metrics of all vertices in a job through Flink Rest API

2021-05-28 Thread Matthias Pohl
Hi Ashutosh,
you can set the metrics update interval
through metrics.fetcher.update-interval [1]. Unfortunately, there is no
single endpoint to collect all the metrics in a more efficient way other
than the metrics endpoints provided in [2].

I hope that helps.
Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#metrics-fetcher-update-interval
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/rest_api/

On Wed, May 26, 2021 at 2:01 PM Ashutosh Uttam 
wrote:

> Hi team,
>
> I have two queries as mentioned below:
>
> *Query1:*
> I am using PrometheusReporter to expose metrics to Prometheus Server.
> What should be the minimum recommended scrape interval to be defined on
> Prometheus server?
> Is there any interval in which Flink reports metrics?
>
> *Query2:*
> Is there any way I can fetch the metrics of all vertices (including
> subtasks) of a job through a single Monitoring Rest API of Flink.
>
> As of now what I have tried is first finding the vertices and then
> querying individual vertex for metrics as below:
>
> *Step 1:* Finding jobId (http://:/jobs)
> *Step 2:* Finding vertices Id (http://:/jobs/)
> *Step 3:* Finding aggregated metrics (including parallelism) of a vertex
> (http://:/jobs//vertices//subtasks/metrics?get=,)
>
>
> So like wise I have to invoke multiple rest apis for each vertex id . Is
> there any optimised way to get metrics of all vertices?
>
>
> Thanks & Regards,
> Ashutosh
>


Re: Alternate way to pass plugin Jars

2021-05-28 Thread Matthias Pohl
Hi Vijayendra,
Thanks for reaching out to the Flink community. There is no other way I
know of to achieve what you're looking for. The plugins support is provided
through the respective ./plugins/ directory as described in the docs [1].

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/

On Fri, May 28, 2021 at 7:07 AM Vijayendra Yadav 
wrote:

> Hi Team,
>
> I am trying to find an alternate way to set the plugin directory (not by
> manually creating it in the flink library), maybe passing them  through the
> dependency Jar in App classpath ?
>
> Plugin directory containing:
>
> 1) flink-metrics-graphite-1.11.0.jar
> 2) flink-s3-fs-hadoop-1.11.0.jar
>
> Thanks,
> Vijay
>


Re: Heartbeat Timeout

2021-05-28 Thread Robert Cullen
Matthias,  I increased the JVM Heap size as Jan suggested and it appears to
be a memory leak in the user code (although I'm not sure why since this is
a simple job that uses a loop to simulate data being written to an S3 data
store).  Yes, the logs show no apparent problem but the timestamp
corresponds to the job failure.  Forgive me but I don't know how to analyze
a heap dump.

On Fri, May 28, 2021 at 8:27 AM Matthias Pohl 
wrote:

> Hi Robert,
> increasing heap memory usage could be due to some memory leak in the user
> code. Have you analyzed a heap dump? About the TM logs you shared. I don't
> see anything suspicious there. Nothing about memory problems. Are those the
> correct logs?
>
> Best,
> Matthias
>
> On Thu, May 27, 2021 at 6:01 PM Jan Brusch 
> wrote:
>
>> Hi Robert,
>>
>> that sounds like a case of either your application state ultimately being
>> bigger than the available RAM or a memory leak in your application (e.g.,
>> some states are not properly cleaned out after they are not needed anymore).
>>
>> If you have the available resources you could try and increase the
>> TaskManager RAM size by a large amount and see where RAM usage tops out. If
>> it ever does... in case of a memory leak it would grow indefinitely. Then
>> you could reason about how to fix the memory leak or how to achieve your
>> goal with a smaller application state.
>>
>> A remedy for application states larger than your available RAM is to use
>> the RocksDB State backend, which allows for states larger than your
>> application RAM. But that requires your kubernetes nodes to be equipped
>> with a fast hard drive (SSD, optimally).
>>
>> That's how I would approach your problem...
>>
>>
>> Hope that helps
>>
>> Jan
>> On 27.05.21 17:51, Robert Cullen wrote:
>>
>> Hello Jan,
>>
>> My flink cluster is running on a kubernetes single node (rke). I have the
>> JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The
>> TaskManger reaches the max JVM Heap Size after about one hour then fails.
>> Here is a snippet from the TaskManager log:
>>
>> 2021-05-27 15:36:36,040 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
>> JobManager address, beginning registration
>> 2021-05-27 15:36:36,041 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 
>> Successful registration at job manager 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
>> c5ff9686e944f62a24c10c6bf20a5a55.
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Establish 
>> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Offer 
>> reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55.
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
>> TaskSlot(index:2, state:ALLOCATED, resource profile: 
>> ResourceProfile{cpuCores=1., taskHeapMemory=500.000mb 
>> (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb 
>> (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: 
>> 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55).
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove 
>> job c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring.
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Close 
>> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
>> 2021-05-27 15:36:36,043 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Receive 
>> slot request 85433366f8bf1c5efd3b88f634676764 for job 
>> c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id 
>> .
>> 2021-05-27 15:36:36,043 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Allocated 
>> slot for 85433366f8bf1c5efd3b88f634676764.
>> 2021-05-27 15:36:36,043 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
>> c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring.
>> 2021-05-27 15:36:36,043 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
>> register at job manager 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id 
>> ----.
>> 2021-05-27 15:36:36,044 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
>> JobManager address, beginning registration
>> 2021-05-27 15:36:36,045 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 
>> Successful registration at job manager 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
>> c5ff9686e944f62a24c10c6bf20a5a55.
>>
>> I guess the 

Re: [VOTE] Release 1.13.1, release candidate #1

2021-05-28 Thread Robert Metzger
+1 (binding)

- Tried out reactive mode in from the scala 2.11 binary locally (with scale
up & stop with savepoint)
- reviewed website update
- randomly checked a jar file in the staging repo (flink-python jar looks
okay (I just checked superifically))




On Fri, May 28, 2021 at 5:16 AM Leonard Xu  wrote:

>
> +1 (non-binding)
>
> - verified signatures and hashes
> - built from source code with scala 2.11 succeeded
> - started a cluster, WebUI was accessible, ran a window word count job, no
> suspicious log output
> - ran some SQL jobs in SQL Client, the queries result is expected
> - the web PR looks good
>
> Best,
> Leonard
>
>
> > 在 2021年5月28日,10:25,Xingbo Huang  写道:
> >
> > +1 (non-binding)
> >
> > - verified checksums and signatures
> > - built from source code
> > - check apache-flink source/wheel package content
> > - run python udf job
> >
> > Best,
> > Xingbo
> >
> > Dawid Wysakowicz mailto:dwysakow...@apache.org>>
> 于2021年5月27日周四 下午9:45写道:
> > +1 (binding)
> >
> > verified signatures and checksums
> > built from sources and run an example, quickly checked Web UI
> > checked diff of pom.xml and NOTICE files from 1.13.0,
> > there were no version changes,
> > checked the updated licenses of javascript dependencies
> > Best,
> >
> > Dawid
> >
> > On 26/05/2021 11:15, Matthias Pohl wrote:
> >> Hi Dawid,
> >> +1 (non-binding)
> >>
> >> Thanks for driving this release. I checked the following things:
> >> - downloaded and build source code
> >> - verified checksums
> >> - double-checked diff of pom files between 1.13.0 and 1.13.1-rc1
> >> - did a visual check of the release blog post
> >> - started cluster and ran jobs (WindowJoin and WordCount); nothing
> >> suspicious found in the logs
> >> - verified change FLINK-22866 manually whether the issue is fixed
> >>
> >> Best,
> >> Matthias
> >>
> >> On Tue, May 25, 2021 at 3:33 PM Dawid Wysakowicz <
> dwysakow...@apache.org> 
> >> wrote:
> >>
> >>> Hi everyone,
> >>> Please review and vote on the release candidate #1 for the version
> 1.13.1,
> >>> as follows:
> >>> [ ] +1, Approve the release
> >>> [ ] -1, Do not approve the release (please provide specific comments)
> >>>
> >>>
> >>> The complete staging area is available for your review, which includes:
> >>> * JIRA release notes [1],
> >>> * the official Apache source release and binary convenience releases
> to be
> >>> deployed to dist.apache.org  [2], which are
> signed with the key with
> >>> fingerprint 31D2DD10BFC15A2D [3],
> >>> * all artifacts to be deployed to the Maven Central Repository [4],
> >>> * source code tag "release-1.13.1-rc1" [5],
> >>> * website pull request listing the new release and adding announcement
> >>> blog post [6].
> >>>
> >>> The vote will be open for at least 72 hours. It is adopted by majority
> >>> approval, with at least 3 PMC affirmative votes.
> >>>
> >>> Best,
> >>> Dawid
> >>>
> >>> [1]
> >>>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350058
> <
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350058
> >
> >>> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.1-rc1/ <
> https://dist.apache.org/repos/dist/dev/flink/flink-1.13.1-rc1/>
> >>> [3] https://dist.apache.org/repos/dist/release/flink/KEYS <
> https://dist.apache.org/repos/dist/release/flink/KEYS>
> >>> [4]
> >>>
> https://repository.apache.org/content/repositories/orgapacheflink-1422/ <
> https://repository.apache.org/content/repositories/orgapacheflink-1422/>
> >>> [5] https://github.com/apache/flink/tree/release-1.13.1-rc1 <
> https://github.com/apache/flink/tree/release-1.13.1-rc1>
> >>> [6] https://github.com/apache/flink-web/pull/448 <
> https://github.com/apache/flink-web/pull/448>
> >>>
>
>


Re: Triggering Savepoint fails to write data to S3 store

2021-05-28 Thread Robert Cullen
Hi Matthias,  You are correct.  After a few minutes I took another look at
my savepoint folder and the data was there.  I think increasing the timeout
may resolve the problem?

On Fri, May 28, 2021 at 8:21 AM Matthias Pohl 
wrote:

> Hi Robert,
> it would be interesting to see the corresponding taskmanager/jobmanager
> logs. That would help in finding out why the savepoint creation failed.
> Just to verify: The savepoint data wasn't written to S3 even after the
> timeout happened, was it?
>
> Best,
> Matthias
>
> On Thu, May 27, 2021 at 7:50 PM Robert Cullen 
> wrote:
>
>> I triggered a savepoint from a currently running job. Although the
>> directory structure gets created in the MINIO S3 store, the command
>> ultimately fails without writing the data.
>>
>> root@flink-client:/opt/flink# ./bin/flink list --target kubernetes-session 
>> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa
>> 2021-05-27 17:37:00,409 INFO  
>> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
>> flink cluster flink-jobmanager successfully, JobManager Web Interface: 
>> http://flink-jobmanager-rest.cmdaa:8081
>> Waiting for response...
>> -- Running/Restarting Jobs ---
>> 27.05.2021 16:50:00 : 72f614340dc1a7416d0613362d1ef83b : Streaming Log Count 
>> (RUNNING)
>> --
>> No scheduled jobs.
>> root@flink-client:/opt/flink# ./bin/flink savepoint 
>> 72f614340dc1a7416d0613362d1ef83b --target kubernetes-session 
>> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa
>> 2021-05-27 17:37:58,776 INFO  
>> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
>> flink cluster flink-jobmanager successfully, JobManager Web Interface: 
>> http://flink-jobmanager-rest.cmdaa:8081
>> Triggering savepoint for job 72f614340dc1a7416d0613362d1ef83b.
>> Waiting for response...
>>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
>> 72f614340dc1a7416d0613362d1ef83b failed.
>> at 
>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:777)
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:754)
>> at 
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
>> at 
>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:751)
>> at 
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1072)
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>> at 
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>> at 
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>> Caused by: java.util.concurrent.TimeoutException
>> at 
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
>> at 
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
>> at 
>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:771)
>> ... 7 more
>> root@flink-client:/opt/flink#
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490


Re: Heartbeat Timeout

2021-05-28 Thread Matthias Pohl
Hi Robert,
increasing heap memory usage could be due to some memory leak in the user
code. Have you analyzed a heap dump? About the TM logs you shared. I don't
see anything suspicious there. Nothing about memory problems. Are those the
correct logs?

Best,
Matthias

On Thu, May 27, 2021 at 6:01 PM Jan Brusch 
wrote:

> Hi Robert,
>
> that sounds like a case of either your application state ultimately being
> bigger than the available RAM or a memory leak in your application (e.g.,
> some states are not properly cleaned out after they are not needed anymore).
>
> If you have the available resources you could try and increase the
> TaskManager RAM size by a large amount and see where RAM usage tops out. If
> it ever does... in case of a memory leak it would grow indefinitely. Then
> you could reason about how to fix the memory leak or how to achieve your
> goal with a smaller application state.
>
> A remedy for application states larger than your available RAM is to use
> the RocksDB State backend, which allows for states larger than your
> application RAM. But that requires your kubernetes nodes to be equipped
> with a fast hard drive (SSD, optimally).
>
> That's how I would approach your problem...
>
>
> Hope that helps
>
> Jan
> On 27.05.21 17:51, Robert Cullen wrote:
>
> Hello Jan,
>
> My flink cluster is running on a kubernetes single node (rke). I have the
> JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The
> TaskManger reaches the max JVM Heap Size after about one hour then fails.
> Here is a snippet from the TaskManager log:
>
> 2021-05-27 15:36:36,040 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
> JobManager address, beginning registration
> 2021-05-27 15:36:36,041 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
> registration at job manager 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
> c5ff9686e944f62a24c10c6bf20a5a55.
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Establish 
> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Offer 
> reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55.
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
> TaskSlot(index:2, state:ALLOCATED, resource profile: 
> ResourceProfile{cpuCores=1., taskHeapMemory=500.000mb 
> (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb 
> (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: 
> 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55).
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
> c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring.
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Close 
> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
> 2021-05-27 15:36:36,043 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Receive 
> slot request 85433366f8bf1c5efd3b88f634676764 for job 
> c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id 
> .
> 2021-05-27 15:36:36,043 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Allocated 
> slot for 85433366f8bf1c5efd3b88f634676764.
> 2021-05-27 15:36:36,043 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
> c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring.
> 2021-05-27 15:36:36,043 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
> register at job manager 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id 
> ----.
> 2021-05-27 15:36:36,044 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
> JobManager address, beginning registration
> 2021-05-27 15:36:36,045 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
> registration at job manager 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
> c5ff9686e944f62a24c10c6bf20a5a55.
>
> I guess the simple resolution is to increase the JVM Heap Size?
>
> On Thu, May 27, 2021 at 10:51 AM Jan Brusch 
> wrote:
>
>> Hi Robert,
>>
>> do you have some additional info? For example the last log message of the
>> unreachable TaskManagers. Is the Job running in kubernetes? What backend
>> are you using?
>>
>> From the first looks of it, I have seen this behaviour mostly in cases
>> where one or more taskmanagers shut down due to GarbageCollection issues or
>> OutOfMemory-Errors.
>>
>>
>> Best regards
>>
>> Jan
>> On 27.05.21 16:44, Robert Cul

Re: Triggering Savepoint fails to write data to S3 store

2021-05-28 Thread Matthias Pohl
Hi Robert,
it would be interesting to see the corresponding taskmanager/jobmanager
logs. That would help in finding out why the savepoint creation failed.
Just to verify: The savepoint data wasn't written to S3 even after the
timeout happened, was it?

Best,
Matthias

On Thu, May 27, 2021 at 7:50 PM Robert Cullen  wrote:

> I triggered a savepoint from a currently running job. Although the
> directory structure gets created in the MINIO S3 store, the command
> ultimately fails without writing the data.
>
> root@flink-client:/opt/flink# ./bin/flink list --target kubernetes-session 
> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa
> 2021-05-27 17:37:00,409 INFO  
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
> flink cluster flink-jobmanager successfully, JobManager Web Interface: 
> http://flink-jobmanager-rest.cmdaa:8081
> Waiting for response...
> -- Running/Restarting Jobs ---
> 27.05.2021 16:50:00 : 72f614340dc1a7416d0613362d1ef83b : Streaming Log Count 
> (RUNNING)
> --
> No scheduled jobs.
> root@flink-client:/opt/flink# ./bin/flink savepoint 
> 72f614340dc1a7416d0613362d1ef83b --target kubernetes-session 
> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa
> 2021-05-27 17:37:58,776 INFO  
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
> flink cluster flink-jobmanager successfully, JobManager Web Interface: 
> http://flink-jobmanager-rest.cmdaa:8081
> Triggering savepoint for job 72f614340dc1a7416d0613362d1ef83b.
> Waiting for response...
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
> 72f614340dc1a7416d0613362d1ef83b failed.
> at 
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:777)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:754)
> at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
> at 
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:751)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1072)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.util.concurrent.TimeoutException
> at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> at 
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:771)
> ... 7 more
> root@flink-client:/opt/flink#
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Regarding FLIP-91's status

2021-05-28 Thread Matthias Pohl
Hi Sonam,
It looks like it has been stale for some time. You might be able to restart
the discussion replying to the respective thread in the dev mailing list
[1]. You seem to be right about the repository based on Jark's reply in the
related ticket FLINK-15472 [2]. I'm adding Jark to the thread. Maybe, he
can shed some light on the state of FLIP-91.

Best,
Matthias

[1]
http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCADQYLGsCGDJkfd3L1hAy1y_M2625YkNHJGW82UraGLhzg6p7Ug%40mail.gmail.com%3E
[2] https://issues.apache.org/jira/browse/FLINK-15472

On Thu, May 27, 2021 at 9:51 PM Sonam Mandal  wrote:

> Hello,
>
> I was curious about the progress on FLIP-91
> .
> Is this actively being developed?
> I believe the code is in development at
> https://github.com/ververica/flink-sql-gateway, is this the right REPO?
>
> I haven't seen much activity on this since sometime last year. I wanted to
> understand if there is still a plan to continue developing this, and if
> not, I wanted to understand why.
>
> Appreciate your help!
>
> Thanks,
> Sonam
>


Re: KafkaSource

2021-05-28 Thread Matthias Pohl
Hi Alexey,
the two implementations are not compatible. You can find information on how
to work around this in the Kafka Connector docs [1].

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version

On Tue, May 18, 2021 at 2:21 AM Alexey Trenikhun  wrote:

> Hello,
>
> Is new KafkaSource/KafkaSourceBuilder ready to be used ? If so, is KafkaSource
> state compatible with legacy FlinkKafkaConsumer, for example if I replace 
> FlinkKafkaConsumer
> by KafkaSource, will offsets continue from what we had in
> FlinkKafkaConsumer ?
>
> Thanks,
> Alexey
>


Re: Flink upgraded from 1.10.0 to 1.12.0

2021-05-28 Thread Matthias Pohl
Hi 王炳焱,
thanks for reaching out to the Flink community and sorry for the late
reply. Unfortunately, Flink SQL does not support state backwards
compatibility. There is no clear pointer in the documentation that states
that. I created FLINK-22799 [1] to cover this. In the mean time, you could
try using the state processor API [2] to rewrite the savepoint to
workaround your issue. I haven't used the API myself, yet. Hence, I cannot
give direct hints on how to do it.

I hope that helps.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-22799
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Tue, May 18, 2021 at 2:11 PM 王炳焱 <15307491...@163.com> wrote:

> When I upgraded from Flink1.10.0 to Flink1.12.0.  Unable to restore SavePoint 
>  And prompt the following error
>
>
> 2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup
>  [] - The operator name Calc(select=[((CAST((log_info 
> get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
> _UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
> _UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
> _UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
> _UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
> _UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
> _UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
> get_json_object2 _UTF-16LE'status') SEARCH 
> Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
> get_json_object2 _UTF-16LE'data.itemType') SEARCH 
> Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
> (_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
> (_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
> get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
> characters length limit and was truncated.
> 2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup
>  [] - The operator name 
> SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
> fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
> truncated.
> 2021-05-14 22:02:44,879 ERROR 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
> Caught unexpected exception.
> java.io.IOException: Could not find class 
> 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
>  in classpath.
> at 
> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269)
>  ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at 
> org.apache.flink.contrib.str

Re: Statefun 2.2.2 Checkpoint restore NPE

2021-05-28 Thread Arvid Heise
If logs are not helping, I think the remaining option is to attach a
debugger [1]. I'd probably add a breakpoint to
LegacySourceFunctionThread#run and see what happens. If the issue is in
recovery, you should add a breakpoint to StreamTask#beforeInvoke.

[1]
https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters

On Fri, May 28, 2021 at 1:11 PM Igal Shilman  wrote:

> Hi Tim,
> Any additional logs from before are highly appreciated, this would help us
> to trace this issue.
> By the way, do you see something in the JobManager's UI?
>
> On Fri, May 28, 2021 at 9:06 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Timothy,
>>
>> It would indeed be hard to figure this out without any stack traces.
>>
>> Have you tried changing to debug level logs? Maybe you can also try using
>> the StateFun Harness to restore and run your job in the IDE - in that case
>> you should be able to see which code exactly is throwing this exception.
>>
>> Cheers,
>> Gordon
>>
>> On Fri, May 28, 2021 at 12:39 PM Timothy Bess  wrote:
>>
>>> Hi,
>>>
>>> Just checking to see if anyone has experienced this error. Might just be
>>> a Flink thing that's irrelevant to statefun, but my job keeps failing over
>>> and over with this message:
>>>
>>> 2021-05-28 03:51:13,001 INFO
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] -
>>> Starting FlinkKafkaInternalProducer (10/10) to produce into default
>>> topic __stateful_functions_random_topic_lNVlkW9SkYrtZ1oK
>>> 2021-05-28 03:51:13,001 INFO
>>> org.apache.flink.streaming.connectors.kafka.internal.
>>> FlinkKafkaInternalProducer [] - Attempting to resume transaction
>>> feedback-union -> functions -> Sink:
>>> bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-45 with
>>> producerId 31 and epoch 3088
>>> 2021-05-28 03:51:13,017 WARN org.apache.flink.runtime.taskmanager.Task
>>> [] - Source: lead-leads-ingress -> router (leads) (10/10)
>>> (ff51aacdb850c6196c61425b82718862) switched from RUNNING to FAILED.
>>> java.lang.NullPointerException: null
>>>
>>> The null pointer doesn't come with any stack traces or anything. It's
>>> really mystifying. Seems to just fail while restoring continuously.
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>


Re: Flink in k8s operators list

2021-05-28 Thread Yuval Itzchakov
https://github.com/lyft/flinkk8soperator

On Fri, May 28, 2021, 10:09 Ilya Karpov  wrote:

> Hi there,
>
> I’m making a little research about the easiest way to deploy link job to
> k8s cluster and manage its lifecycle by *k8s operator*. The list of
> solutions is below:
> - https://github.com/fintechstudios/ververica-platform-k8s-operator
> - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
> - https://kudo.dev/docs/examples/apache-flink.html
> - https://github.com/wangyang0918/flink-native-k8s-operator
>
> If you are using smth that is not listed above please share! Any share
> about how specific solution works is greatly appreciated.
>
> Thanks in advance
>


Re: Statefun 2.2.2 Checkpoint restore NPE

2021-05-28 Thread Igal Shilman
Hi Tim,
Any additional logs from before are highly appreciated, this would help us
to trace this issue.
By the way, do you see something in the JobManager's UI?

On Fri, May 28, 2021 at 9:06 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Timothy,
>
> It would indeed be hard to figure this out without any stack traces.
>
> Have you tried changing to debug level logs? Maybe you can also try using
> the StateFun Harness to restore and run your job in the IDE - in that case
> you should be able to see which code exactly is throwing this exception.
>
> Cheers,
> Gordon
>
> On Fri, May 28, 2021 at 12:39 PM Timothy Bess  wrote:
>
>> Hi,
>>
>> Just checking to see if anyone has experienced this error. Might just be
>> a Flink thing that's irrelevant to statefun, but my job keeps failing over
>> and over with this message:
>>
>> 2021-05-28 03:51:13,001 INFO org.apache.flink.streaming.connectors.kafka.
>> FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (10/10) to
>> produce into default topic
>> __stateful_functions_random_topic_lNVlkW9SkYrtZ1oK
>> 2021-05-28 03:51:13,001 INFO
>> org.apache.flink.streaming.connectors.kafka.internal.
>> FlinkKafkaInternalProducer [] - Attempting to resume transaction
>> feedback-union -> functions -> Sink:
>> bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-45 with
>> producerId 31 and epoch 3088
>> 2021-05-28 03:51:13,017 WARN org.apache.flink.runtime.taskmanager.Task
>> [] - Source: lead-leads-ingress -> router (leads) (10/10)
>> (ff51aacdb850c6196c61425b82718862) switched from RUNNING to FAILED.
>> java.lang.NullPointerException: null
>>
>> The null pointer doesn't come with any stack traces or anything. It's
>> really mystifying. Seems to just fail while restoring continuously.
>>
>> Thanks,
>>
>> Tim
>>
>


Re: Enable Multiple kafka Consumer sources for job

2021-05-28 Thread 刘建刚
For debug, you can just pull data from inputStream2.

sudhansu069 [via Apache Flink User Mailing List archive.] <
ml+s2336050n44010...@n4.nabble.com> 于2021年5月27日周四 下午11:22写道:

> Hi Team ,
>
> We are trying to build a data pipeline where we have to set up two
> different kafka consumers for two different kafka topics and with a single
> SNS sink.
> Below is the sample code for the same , but looks like from one of the
> sources the events are not flowing into the cluster. We are using the merge
> API for merging two input sources here.
>
>
> DataStream> inputStream1 = 
> env.addSource(flinkKafkaConsumer)
> .uid(configParams.get(AppConstant.JOB_PUBLISHER_STATE_KAFKA_SOURCE_UUID))
> .name(AppConstant.FHIR_SOURCE);
>
> DataStream> inputStream2 = 
> env.addSource(flinkKafkaConsumerFromRejectedTopic)
> .uid("testUID")
> .name(AppConstant.FHIR_SOURCE_FOR_REJECTED_QUEUE);
>
> DataStream> allStreams = 
> inputStream1.union(inputStream2);
>
>
> In the above code snippet, allStreams is only pulling events
> from inputStream1 but expectation is allStreams should be pulling events
> from both inputStream1 and inputStream2. Could you please help us in
> understanding if this is the right approach or if we are missing something.
>
> Thanks,
> Sudhansu
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Enable-Multiple-kafka-Consumer-sources-for-job-tp44010.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>


Re: Configure Kafka ingress through property files in Stateful function 3.0.0

2021-05-28 Thread Tzu-Li (Gordon) Tai
Hi Jessy,

I assume "consumer.properties" is a file you have included in your StateFun
application's image?

The ingress.spec.properties field in the module YAML specification file
expects a list of key value pairs, not a properties file. See for example
[1].

I think it could make sense to supporting specifying property files directly
as while. Could you open a JIRA for this?

Thanks,
Gordon

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kafka-ingress.yaml#L36



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink in k8s operators list

2021-05-28 Thread Ilya Karpov
Hi there,

I’m making a little research about the easiest way to deploy link job to k8s 
cluster and manage its lifecycle by k8s operator. The list of solutions is 
below:
- https://github.com/fintechstudios/ververica-platform-k8s-operator
- https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
- https://kudo.dev/docs/examples/apache-flink.html
- https://github.com/wangyang0918/flink-native-k8s-operator 


If you are using smth that is not listed above please share! Any share about 
how specific solution works is greatly appreciated.

Thanks in advance

Re: Statefun 2.2.2 Checkpoint restore NPE

2021-05-28 Thread Tzu-Li (Gordon) Tai
Hi Timothy,

It would indeed be hard to figure this out without any stack traces.

Have you tried changing to debug level logs? Maybe you can also try using
the StateFun Harness to restore and run your job in the IDE - in that case
you should be able to see which code exactly is throwing this exception.

Cheers,
Gordon

On Fri, May 28, 2021 at 12:39 PM Timothy Bess  wrote:

> Hi,
>
> Just checking to see if anyone has experienced this error. Might just be a
> Flink thing that's irrelevant to statefun, but my job keeps failing over
> and over with this message:
>
> 2021-05-28 03:51:13,001 INFO org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (10/10) to
> produce into default topic
> __stateful_functions_random_topic_lNVlkW9SkYrtZ1oK
> 2021-05-28 03:51:13,001 INFO
> org.apache.flink.streaming.connectors.kafka.internal.
> FlinkKafkaInternalProducer [] - Attempting to resume transaction
> feedback-union -> functions -> Sink:
> bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-45 with
> producerId 31 and epoch 3088
> 2021-05-28 03:51:13,017 WARN org.apache.flink.runtime.taskmanager.Task []
> - Source: lead-leads-ingress -> router (leads) (10/10)
> (ff51aacdb850c6196c61425b82718862) switched from RUNNING to FAILED.
> java.lang.NullPointerException: null
>
> The null pointer doesn't come with any stack traces or anything. It's
> really mystifying. Seems to just fail while restoring continuously.
>
> Thanks,
>
> Tim
>