Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

2021-05-26 Thread Yang Wang
I think your attached exception has been fixed via FLINK-22597[1]. Could
you please have a try with the latest version.

Moreover, it is not the desired Flink behavior that TaskManager could not
retrieve the new JobManager address and re-register successfully. I think
you need to share
the staled TaskManager logs so that we could move forward the debugging.


[1]. https://issues.apache.org/jira/browse/FLINK-22597

Best,
Yang

Jerome Li  于2021年5月27日周四 上午4:54写道:

> Hi Yang,
>
>
>
> Thanks for getting back to me.
>
>
>
> By “restart master node”, I mean do “kubctl get nodes” to find the node’s
> role as master and “ssh” into one of master nodes as ubuntu user. Then run
> “sudo /sbin/reboot -f” to restart the master node.
>
>
>
> It looks like The JobManager would cancel the running job and log this
> after that.
>
> 2021-05-26 18:28:37,997 [INFO]
> org.apache.flink.runtime.executiongraph.ExecutionGraph   - Discarding
> the results produced by task execution 34eb9f5009dc7cf07117e720e7d393de.
>
> 2021-05-26 18:28:37,999 [INFO]
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore -
> Suspending
>
> 2021-05-26 18:28:37,999 [INFO]
> org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter
> - Shutting down.
>
> 2021-05-26 18:28:38,000 [INFO]
> org.apache.flink.runtime.executiongraph.ExecutionGraph   - Job
> 74fc5c858e50f5efc91db9ee16c17a8c has been suspended.
>
> 2021-05-26 18:28:38,007 [INFO]
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending
> SlotPool.
>
> 2021-05-26 18:28:38,007 [INFO]
> org.apache.flink.runtime.jobmaster.JobMaster - Close
> ResourceManager connection 5bac86fb0b5c984ef429225b8de82cc0: JobManager is
> no longer the leader..
>
> 2021-05-26 18:28:38,019 [INFO]
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl  - JobManager
> runner for job hogger (74fc5c858e50f5efc91db9ee16c17a8c) was granted
> leadership with session id 14b9004a-3807-42e8-ac03-c0d77efe5611 at
> akka.tcp://flink@hoggerflink-jobmanager:6123/user/rpc/jobmanager_2.
>
> 2021-05-26 18:28:38,292 [INFO]
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc
> endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started
> yet. Discarding message
> org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
> is started.
>
> 2021-05-26 18:28:38,292 [INFO]
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc
> endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started
> yet. Discarding message
> org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
> is started.
>
> 2021-05-26 18:28:38,292 [INFO]
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc
> endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started
> yet. Discarding message
> org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
> is started.
>
> 2021-05-26 18:28:38,293 [INFO]
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc
> endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started
> yet. Discarding message
> org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
> is started.
>
> 2021-05-26 18:28:38,293 [INFO]
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc
> endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started
> yet. Discarding message
> org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
> is started.
>
> 2021-05-26 18:28:38,293 [INFO]
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc
> endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started
> yet. Discarding message
> org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
> is started.
>
> 2021-05-26 18:28:38,293 [INFO]
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc
> endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started
> yet. Discarding message
> org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing
> is started.
>
> 2021-05-26 18:28:38,293 [INFO]
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc
> endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started
> yet. Discarding message
> org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing
> is started.
>
> 2021-05-26 18:28:38,293 [INFO]
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc
> endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started
> yet. Discarding message
> org.apache.flink.runtime.rpc.messages.LocalFencedMessage until processing
> is started.
>
> 2021-05-26 18:28:38,295 [INFO]
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc
> endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started
> yet. Discarding message
> o

Re: yarn ship from s3

2021-05-26 Thread Vijayendra Yadav
Thank You Xintong, I will look for these updates in the near future.

Regards,
Vijay

On Wed, May 26, 2021 at 6:40 PM Xintong Song  wrote:

> Hi Vijay,
>
> Currently, Flink only supports shipping files from the local machine where
> job is submitted.
>
> There are tickets [1][2][3] tracking the efforts that shipping files from
> remote paths, e.g., http, hdfs, etc. Once the efforts are done, adding s3
> as an additional supported schema should be straightforward.
>
> Unfortunately, these efforts are still in progress, and are more or less
> staled recently.
>
> Thank you~
>
> Xintong Song
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-20681
> [2] https://issues.apache.org/jira/browse/FLINK-20811
> [3] https://issues.apache.org/jira/browse/FLINK-20867
>
> On Thu, May 27, 2021 at 12:23 AM Vijayendra Yadav 
> wrote:
>
>> Hi Pohl,
>>
>> I tried to ship my property file. Example: *-yarn.ship-files
>> s3://applib/xx/xx/1.0-SNAPSHOT/application.properties  \*
>>
>>
>> *Error:*
>>
>> 6:21:37.163 [main] ERROR org.apache.flink.client.cli.CliFrontend -
>> Invalid command line arguments.
>> org.apache.flink.client.cli.CliArgsException: Could not build the program
>> from JAR file: JAR file does not exist: -yarn.ship-files
>> at
>> org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:244)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:223)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at java.security.AccessController.doPrivileged(Native Method)
>> ~[?:1.8.0_292]
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> [?:1.8.0_292]
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
>> [hadoop-common-2.10.0-amzn-0.jar:?]
>> at
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> Caused by: java.io.FileNotFoundException: JAR file does not exist:
>> -yarn.ship-files
>> at
>> org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:740)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:717)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:242)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> ... 8 more
>> Could not build the program from JAR file: JAR file does not exist:
>> -yarn.ship-files
>>
>>
>> *Thanks,*
>>
>> *Vijay*
>>
>> On Tue, May 25, 2021 at 11:58 PM Matthias Pohl 
>> wrote:
>>
>>> Hi Vijay,
>>> have you tried yarn-ship-files [1] or yarn-ship-archives [2]? Maybe,
>>> that's what you're looking for...
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-files
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-archives
>>>
>>> On Tue, May 25, 2021 at 5:56 PM Vijayendra Yadav 
>>> wrote:
>>>
 Hi Piotr,

 I have been doing the same process as you mentioned so far, now I am
 migrating the deployment process using AWS CDK and AWS Step Functions, kind
 of like the CICD process.
 I added a download step of jar and configs (1, 2, 3 and 4) from S3
 using command-runner.jar (AWS Step); it loaded that into one of the Master
 nodes (out of 3). In the next step when I launched Flink Job it would not
 find build because Job is launched in some other yarn node.

 I was hoping just like *Apache spark *where whatever files we provide
 in *--file*s are shipped to yarn (s3 to yarn workfirectory), Flink
 should also have a solution.

 Thanks,
 Vijay


 On Tue, May 25, 2021 at 12:50 AM Piotr Nowojski 
 wrote:

> Hi Vijay,
>
> I'm not sure if I understand your question correctly. You have jar and
> configs (1, 2, 3 and 4) on S3 and you want to start a Flink job using
> those? Can you simply download those things (whole directory containing
> those) to the machine that will be starting the Flink job?
>
> Best, Piotrek
>
> wt., 25 maj 2021 o 07:50 Vijayendra Yadav 
> napisał(a):
>
>> Hi Team,
>>
>> I am trying to find a way to ship files from aws s3 for a flink
>> streaming job, I am running on AWS EMR. What i need to ship are 
>> following:
>> 1) application jar
>> 2) application property file

Re: Time needed to read from Kafka source

2021-05-26 Thread B.B.
Hi,
I forgot to mention that we are using Flink 1.12.0. This is a job that has
only minimum components. Reading from source and printing it.
Profiling was my next step to do. Regarding memory I didn't see any
bottlenecks.
I guess I will have to do some investigating in the metric part of Flink.

BR,
BB

On Tue, 25 May 2021 at 17:12, Piotr Nowojski  wrote:

> Hi,
>
> That's a throughput of 700 records/second, which should be well below
> theoretical limits of any deserializer (from hundreds thousands up to tens
> of millions records/second/per single operator), unless your records are
> huge or very complex.
>
> Long story short, I don't know of a magic bullet to help you solve your
> problem. As always you have two options, either optimise/speed up your
> code/job, or scale up.
>
> If you choose the former, think about Flink as just another Java
> application. Check metrics and resource usage, and understand what resource
> is the problem (cpu? memory? machine is swapping? io?). You might be able
> to guess what's your bottleneck (reading from kafka? deserialisation?
> something else? Flink itself?) by looking at some of the metrics
> (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or
> you can also simplify your job to bare minimum and test performance of
> independent components. Also you can always attach a code profiler and
> simply look at what's happening. First identify what's the source of the
> bottleneck and then try to understand what's causing it.
>
> Best,
> Piotrek
>
> [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also
> comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes
> in the job graph based on busy/back pressured status and Flamegraph
> support)
>
> wt., 25 maj 2021 o 15:44 B.B.  napisał(a):
>
>> Hi,
>>
>> I am in the process of optimizing my job which at the moment by our
>> thinking is too slow.
>>
>> We are deploying job in kubernetes with 1 job manager with 1gb ram and 1
>> cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and
>> parallelism of two).
>>
>> The main problem is one kafka source that has 3,8 million events that we
>> have to process.
>> As a test we made a simple job that connects to kafka using a custom
>> implementation of KafkaDeserializationSchema. There we are using
>> ObjectMapper that mapps input values eg.
>>
>> *var event = objectMapper.readValue(consumerRecord.value(),
>> MyClass.class);*
>>
>> This is then validated with hibernate validator and output of this
>> source is printed on the console.
>>
>> The time needed for the job to consume all the events was one and a half
>> hours, which seems a bit long.
>> Is there a way we can speed up this process?
>>
>> Is more cpu cores or memory solution?
>> Should we switch to avro deserialization schema?
>>
>>
>>
>> --
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane


Re: Time needed to read from Kafka source

2021-05-26 Thread B.B.
Hi,
I forgot to mention that we are running Flink 1.12.0.

This is the main function (some parts of codes are abbreviated and this is
the main part). As you can see the job was simplified to minimum. Just
reading from source and printing.


[image: Screenshot 2021-05-26 at 08.05.53.png]


And this is deserializer:

[image: Screenshot 2021-05-26 at 07.49.17.png]

BR,

BB


On Tue, 25 May 2021 at 17:51, Arvid Heise  wrote:

> Could you share your KafkaDeserializationSchema, we might be able to spot
> some optimization potential. You could also try out enableObjectReuse [1],
> which avoids copying data between tasks (not sure if you have any
> non-chained tasks).
>
> If you are on 1.13, you could check out the flamegraph to see where the
> bottleneck occurs. [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/execution/execution_configuration/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>
> On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> That's a throughput of 700 records/second, which should be well below
>> theoretical limits of any deserializer (from hundreds thousands up to tens
>> of millions records/second/per single operator), unless your records are
>> huge or very complex.
>>
>> Long story short, I don't know of a magic bullet to help you solve your
>> problem. As always you have two options, either optimise/speed up your
>> code/job, or scale up.
>>
>> If you choose the former, think about Flink as just another Java
>> application. Check metrics and resource usage, and understand what resource
>> is the problem (cpu? memory? machine is swapping? io?). You might be able
>> to guess what's your bottleneck (reading from kafka? deserialisation?
>> something else? Flink itself?) by looking at some of the metrics
>> (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or
>> you can also simplify your job to bare minimum and test performance of
>> independent components. Also you can always attach a code profiler and
>> simply look at what's happening. First identify what's the source of the
>> bottleneck and then try to understand what's causing it.
>>
>> Best,
>> Piotrek
>>
>> [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also
>> comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes
>> in the job graph based on busy/back pressured status and Flamegraph
>> support)
>>
>> wt., 25 maj 2021 o 15:44 B.B.  napisał(a):
>>
>>> Hi,
>>>
>>> I am in the process of optimizing my job which at the moment by our
>>> thinking is too slow.
>>>
>>> We are deploying job in kubernetes with 1 job manager with 1gb ram and 1
>>> cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and
>>> parallelism of two).
>>>
>>> The main problem is one kafka source that has 3,8 million events that we
>>> have to process.
>>> As a test we made a simple job that connects to kafka using a custom
>>> implementation of KafkaDeserializationSchema. There we are using
>>> ObjectMapper that mapps input values eg.
>>>
>>> *var event = objectMapper.readValue(consumerRecord.value(),
>>> MyClass.class);*
>>>
>>> This is then validated with hibernate validator and output of this
>>> source is printed on the console.
>>>
>>> The time needed for the job to consume all the events was one and a half
>>> hours, which seems a bit long.
>>> Is there a way we can speed up this process?
>>>
>>> Is more cpu cores or memory solution?
>>> Should we switch to avro deserialization schema?
>>>
>>>
>>>
>>> --
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane


multiple streams joining

2021-05-26 Thread Lian Jiang
Hi,

Imagine I have one class having 4 fields: ID, A, B, C.  There are three
data sources providing data in the form of (ID, A), (ID, B), (ID, C)
respectively. I want to join these three data sources to get final (ID, A,
B, C) without any window. For example, (ID, A) could come one month after
(ID, B). Such joining needs global states. There are two designs in my mind.

1. Stream connect with separated kafka topic
streamA_B = DataSourceA connect DataSourceB
streamA_B_C = streamA_B connect DataSourceC

Each data source is ingested via a dedicated kafka topic. This design seems
not scalable because I need N stream connect operations for N+1 data
sources. Each stream connect needs to maintain a global state. For example,
streamA_B needs a global state for maintaining (ID, A, B) and streamA_B_C
needs another for maintaining (ID, A, B, C).

2. Shared kafka topic
All data sources are ingested via a shared kafka topic (using union event
type or schema reference). Then one Flink job can handle all events from
these data sources by maintaining one global state. This design seems more
scalable than solution 1.

Which one is recommended? Is there a better way that is missed? Appreciate
very much for any hints!


Re: How can I use different user run flink

2021-05-26 Thread Jake
Hi igyu:

You can submit job use these arguements like this

```
-m yarn-cluster \
-yqu root.realtime \
-ynm “test" \
-yjm 2g \
-ytm 2g \
-n \
-d \
-ys 1 \
-yD security.kerberos.login.principal=xxx...@x.com \
-yD security.kerberos.login.keytab=/tmp/xx.keytab \
...
```



> On May 27, 2021, at 08:34, igyu  wrote:
> 
> I use CDH 6.3.2
> flink-1.12.3
> 
> I enabel kerberos
> 
> I want to use different user with different keytab,because I creat many queue 
> in YARN , different user use different queue. 
> 
> 
> igyu



Re: yarn ship from s3

2021-05-26 Thread Xintong Song
Hi Vijay,

Currently, Flink only supports shipping files from the local machine where
job is submitted.

There are tickets [1][2][3] tracking the efforts that shipping files from
remote paths, e.g., http, hdfs, etc. Once the efforts are done, adding s3
as an additional supported schema should be straightforward.

Unfortunately, these efforts are still in progress, and are more or less
staled recently.

Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-20681
[2] https://issues.apache.org/jira/browse/FLINK-20811
[3] https://issues.apache.org/jira/browse/FLINK-20867

On Thu, May 27, 2021 at 12:23 AM Vijayendra Yadav 
wrote:

> Hi Pohl,
>
> I tried to ship my property file. Example: *-yarn.ship-files
> s3://applib/xx/xx/1.0-SNAPSHOT/application.properties  \*
>
>
> *Error:*
>
> 6:21:37.163 [main] ERROR org.apache.flink.client.cli.CliFrontend - Invalid
> command line arguments.
> org.apache.flink.client.cli.CliArgsException: Could not build the program
> from JAR file: JAR file does not exist: -yarn.ship-files
> at
> org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:244)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:223)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_292]
> at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_292]
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
> [hadoop-common-2.10.0-amzn-0.jar:?]
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> Caused by: java.io.FileNotFoundException: JAR file does not exist:
> -yarn.ship-files
> at
> org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:740)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:717)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:242)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> ... 8 more
> Could not build the program from JAR file: JAR file does not exist:
> -yarn.ship-files
>
>
> *Thanks,*
>
> *Vijay*
>
> On Tue, May 25, 2021 at 11:58 PM Matthias Pohl 
> wrote:
>
>> Hi Vijay,
>> have you tried yarn-ship-files [1] or yarn-ship-archives [2]? Maybe,
>> that's what you're looking for...
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-files
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-archives
>>
>> On Tue, May 25, 2021 at 5:56 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Piotr,
>>>
>>> I have been doing the same process as you mentioned so far, now I am
>>> migrating the deployment process using AWS CDK and AWS Step Functions, kind
>>> of like the CICD process.
>>> I added a download step of jar and configs (1, 2, 3 and 4) from S3 using
>>> command-runner.jar (AWS Step); it loaded that into one of the Master nodes
>>> (out of 3). In the next step when I launched Flink Job it would not find
>>> build because Job is launched in some other yarn node.
>>>
>>> I was hoping just like *Apache spark *where whatever files we provide
>>> in *--file*s are shipped to yarn (s3 to yarn workfirectory), Flink
>>> should also have a solution.
>>>
>>> Thanks,
>>> Vijay
>>>
>>>
>>> On Tue, May 25, 2021 at 12:50 AM Piotr Nowojski 
>>> wrote:
>>>
 Hi Vijay,

 I'm not sure if I understand your question correctly. You have jar and
 configs (1, 2, 3 and 4) on S3 and you want to start a Flink job using
 those? Can you simply download those things (whole directory containing
 those) to the machine that will be starting the Flink job?

 Best, Piotrek

 wt., 25 maj 2021 o 07:50 Vijayendra Yadav 
 napisał(a):

> Hi Team,
>
> I am trying to find a way to ship files from aws s3 for a flink
> streaming job, I am running on AWS EMR. What i need to ship are following:
> 1) application jar
> 2) application property file
> 3) custom flink-conf.yaml
> 4) log4j application specific
>
> Please let me know options.
>
> Thanks,
> Vijay




How can I use different user run flink

2021-05-26 Thread igyu
I use CDH 6.3.2
flink-1.12.3

I enabel kerberos

I want to use different user with different keytab,because I creat many queue 
in YARN , different user use different queue. 




igyu


Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

2021-05-26 Thread Jerome Li
Hi Yang,

Thanks for getting back to me.

By “restart master node”, I mean do “kubctl get nodes” to find the node’s role 
as master and “ssh” into one of master nodes as ubuntu user. Then run “sudo 
/sbin/reboot -f” to restart the master node.

It looks like The JobManager would cancel the running job and log this after 
that.

2021-05-26 18:28:37,997 [INFO] 
org.apache.flink.runtime.executiongraph.ExecutionGraph   - Discarding the 
results produced by task execution 34eb9f5009dc7cf07117e720e7d393de.

2021-05-26 18:28:37,999 [INFO] 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore - Suspending

2021-05-26 18:28:37,999 [INFO] 
org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter - 
Shutting down.

2021-05-26 18:28:38,000 [INFO] 
org.apache.flink.runtime.executiongraph.ExecutionGraph   - Job 
74fc5c858e50f5efc91db9ee16c17a8c has been suspended.

2021-05-26 18:28:38,007 [INFO] 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending 
SlotPool.

2021-05-26 18:28:38,007 [INFO] org.apache.flink.runtime.jobmaster.JobMaster 
- Close ResourceManager connection 
5bac86fb0b5c984ef429225b8de82cc0: JobManager is no longer the leader..

2021-05-26 18:28:38,019 [INFO] 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl  - JobManager 
runner for job hogger (74fc5c858e50f5efc91db9ee16c17a8c) was granted leadership 
with session id 14b9004a-3807-42e8-ac03-c0d77efe5611 at 
akka.tcp://flink@hoggerflink-jobmanager:6123/user/rpc/jobmanager_2.

2021-05-26 18:28:38,292 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,292 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,292 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage 
until processing is started.

2021-05-26 18:28:38,295 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage 
until processing is started.

2021-05-26 18:28:38,295 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.LocalFencedMessage 
until processing is started.

2021-05-26 18:28:38,295 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.

Re: KafkaSource metrics

2021-05-26 Thread Alexey Trenikhun
Found https://issues.apache.org/jira/browse/FLINK-22766


From: Alexey Trenikhun 
Sent: Tuesday, May 25, 2021 3:25 PM
To: Ardhani Narasimha ; 陳樺威 
; Flink User Mail List 
Subject: Re: KafkaSource metrics

Looks like when KafkaSource is used instead of FlinkKafkaConsumer, metrics 
listed below are not available. Bug? Work in progress?


Thanks,
Alexey

From: Ardhani Narasimha 
Sent: Monday, May 24, 2021 9:08 AM
To: 陳樺威 
Cc: user 
Subject: Re: KafkaSource metrics

Use below respectively

flink_taskmanager_job_task_operator_KafkaConsumer_bytes_consumed_rate - 
Consumer rate
flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max - Consumer lag
flink_taskmanager_job_task_operator_KafkaConsumer_commit_latency_max - commit 
latency

unsure if reactive mode makes any difference.
On Mon, May 24, 2021 at 7:44 PM 陳樺威 
mailto:oscar8492...@gmail.com>> wrote:
Hello,

Our team tries to test reactive mode and replace FlinkKafkaConsumer with the 
new KafkaSource.
But we can’t find the KafkaSource metrics list. Does anyone have any idea? In 
our case, we want to know the Kafka consume delay and consume rate.

Thanks,
Oscar

---
IMPORTANT: The contents of this email and any attachments are confidential and 
protected by applicable laws. If you have received this email by mistake, 
please (i) notify the sender immediately; (ii) delete it from your database; 
and (iii) do not disclose the contents to anyone or make copies thereof. 
Razorpay accepts no liability caused due to any inadvertent/ unintentional data 
transmitted through this email.
---


Error restarting job from Savepoint

2021-05-26 Thread Yashwant Ganti
Hello,

We are facing an error restarting a job from a savepoint. We believe it is
because one of the common classes used across all of our jobs was changed
but there was no *serialVersionUID* assigned to the class. There error we
are facing is

java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:254)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:272)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:425)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:535)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:525)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_
> (2/4) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:160)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:345)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:163)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:115)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:559)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:101)
> at org.apache.flink.runtime.state.StateBackend
> .createKeyedStateBackend(StateBackend.java:181)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:328)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
> ... 11 more
> Caused by: java.io.InvalidClassException: com..**.***; local
> class incompatible: stream classdesc serialVersionUID = -
> 7317586767482317266, local class serialVersionUID = -8797204481428423223
> at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
> at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source
> )
> at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
> at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown
> Source)
> at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown
> Source)
> at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
> at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown
> Source)
> at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
> at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
> at org.apache.beam.sdk.coders.SerializableCoder.decode(
> SerializableCoder.java:194)
> at org.apache.beam.sdk.coders.SerializableCoder.decode(
> SerializableCoder.java:54)
> at org.apache.beam.sdk.io.
> Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(
> Read.java:669)
> at org.apache.beam.sdk.io.
> Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(
> Read.java:642)
> at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer
> .deserialize(CoderTypeSerializer.java:118)
> at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
> .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> at org.apache.flink.runtime.state.
> KeyGroupPartitioner$PartitioningResultKeyGroupReader
> .readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
> at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> .readKeyGroupS

Re: yarn ship from s3

2021-05-26 Thread Vijayendra Yadav
Hi Pohl,

I tried to ship my property file. Example: *-yarn.ship-files
s3://applib/xx/xx/1.0-SNAPSHOT/application.properties  \*


*Error:*

6:21:37.163 [main] ERROR org.apache.flink.client.cli.CliFrontend - Invalid
command line arguments.
org.apache.flink.client.cli.CliArgsException: Could not build the program
from JAR file: JAR file does not exist: -yarn.ship-files
at
org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:244)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:223)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_292]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_292]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
[hadoop-common-2.10.0-amzn-0.jar:?]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.io.FileNotFoundException: JAR file does not exist:
-yarn.ship-files
at
org.apache.flink.client.cli.CliFrontend.getJarFile(CliFrontend.java:740)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:717)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:242)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
... 8 more
Could not build the program from JAR file: JAR file does not exist:
-yarn.ship-files


*Thanks,*

*Vijay*

On Tue, May 25, 2021 at 11:58 PM Matthias Pohl 
wrote:

> Hi Vijay,
> have you tried yarn-ship-files [1] or yarn-ship-archives [2]? Maybe,
> that's what you're looking for...
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-files
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#yarn-ship-archives
>
> On Tue, May 25, 2021 at 5:56 PM Vijayendra Yadav 
> wrote:
>
>> Hi Piotr,
>>
>> I have been doing the same process as you mentioned so far, now I am
>> migrating the deployment process using AWS CDK and AWS Step Functions, kind
>> of like the CICD process.
>> I added a download step of jar and configs (1, 2, 3 and 4) from S3 using
>> command-runner.jar (AWS Step); it loaded that into one of the Master nodes
>> (out of 3). In the next step when I launched Flink Job it would not find
>> build because Job is launched in some other yarn node.
>>
>> I was hoping just like *Apache spark *where whatever files we provide in
>> *--file*s are shipped to yarn (s3 to yarn workfirectory), Flink should
>> also have a solution.
>>
>> Thanks,
>> Vijay
>>
>>
>> On Tue, May 25, 2021 at 12:50 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi Vijay,
>>>
>>> I'm not sure if I understand your question correctly. You have jar and
>>> configs (1, 2, 3 and 4) on S3 and you want to start a Flink job using
>>> those? Can you simply download those things (whole directory containing
>>> those) to the machine that will be starting the Flink job?
>>>
>>> Best, Piotrek
>>>
>>> wt., 25 maj 2021 o 07:50 Vijayendra Yadav 
>>> napisał(a):
>>>
 Hi Team,

 I am trying to find a way to ship files from aws s3 for a flink
 streaming job, I am running on AWS EMR. What i need to ship are following:
 1) application jar
 2) application property file
 3) custom flink-conf.yaml
 4) log4j application specific

 Please let me know options.

 Thanks,
 Vijay
>>>
>>>


Re: Customer operator in BATCH execution mode

2021-05-26 Thread 陳昌倬
On Wed, May 26, 2021 at 01:03:53PM +0200, Dawid Wysakowicz wrote:
> Hi,
> 
> No there is no API in the operator to know which mode it works in. We
> aim to have separate operators for both modes if required. You can check
> e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1].

Thanks for the information. We implement this according to Piotrek's
suggestion.

> 
> Yes, it should be possible to register a timer for Long.MAX_WATERMARK if
> you want to apply a transformation at the end of each key. You could
> also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode.

According to [0], timer time is irrelevant since timer will be triggered
at the end of time right? If that is the case, we can use the same code
for both streaming and batch mode.

[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/


> 
> A side note, I don't fully get what you mean by "build state for our
> streaming application". Bear in mind though you cannot take a savepoint
> from a job running in the BATCH execution mode. Moreover it uses a
> different kind of StateBackend. Actually a dummy one, which just
> imitates a real state backend.

What we plan to do here is:

1. Load configuration from broadcast event (custom source backed by REST
   API).
2. Load historical events as batch mode input (From GCS).
3. Use timer to trigger output so that the following will happen:
   a. Serialize keyed states into JSON.
   b. Output to Kafka.
   c. Streaming application consumes data from Kafka, and update its
  keyed states according to it.

We hope that in this way, we can rebuild our states with almost the same
code in streaming.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Query related to Minimum scrape interval for Prometheus and fetching metrics of all vertices in a job through Flink Rest API

2021-05-26 Thread Ashutosh Uttam
Hi team,

I have two queries as mentioned below:

*Query1:*
I am using PrometheusReporter to expose metrics to Prometheus Server.
What should be the minimum recommended scrape interval to be defined on
Prometheus server?
Is there any interval in which Flink reports metrics?

*Query2:*
Is there any way I can fetch the metrics of all vertices (including
subtasks) of a job through a single Monitoring Rest API of Flink.

As of now what I have tried is first finding the vertices and then querying
individual vertex for metrics as below:

*Step 1:* Finding jobId (http://:/jobs)
*Step 2:* Finding vertices Id (http://:/jobs/)
*Step 3:* Finding aggregated metrics (including parallelism) of a vertex
(http://:/jobs//vertices//subtasks/metrics?get=,)


So like wise I have to invoke multiple rest apis for each vertex id . Is
there any optimised way to get metrics of all vertices?


Thanks & Regards,
Ashutosh


Re: Managing Jobs entirely with Flink Monitoring API

2021-05-26 Thread Piotr Nowojski
Glad to hear it!  Thanks for confirming that it works.

Piotrek

śr., 26 maj 2021 o 12:59 Barak Ben Nathan 
napisał(a):

>
>
> Hi Piotrek,
>
>
>
> This is exactly what I was searching for. Thanks!
>
>
>
> Barak
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Wednesday, May 26, 2021 9:59 AM
> *To:* Barak Ben Nathan 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Managing Jobs entirely with Flink Monitoring API
>
>
>
> *CAUTION*: external source
>
> Hi Barak,
>
>
>
> Before starting the JobManager I don't think there is any API running at
> all. If you want to be able to submit/stop multiple jobs to the same
> cluster session mode is indeed the way to go. But first you need to to
> start the cluster ( start-cluster.sh ) [1]
>
>
>
> Piotrek
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/
> 
>
>
>
> wt., 25 maj 2021 o 14:10 Barak Ben Nathan 
> napisał(a):
>
>
>
> I want to manage the execution of Flink Jobs programmatically through
> Flink Monitoring API.
>
>
>
> I.e. I want to run/delete jobs ONLY with the
>  POST /jars/:jarid/run
>  POST /jobs/:jobid/stop
> API commands.
>
>
>
> Now, it seems that the Session Mode may fits my needs: “Session Mode: one
> JobManager instance manages multiple jobs sharing the same cluster of
> TaskManagers” (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/
> 
> )
>
> However, I couldn’t find a way to start the API server (i.e. a JobManager)
> that didn’t already include submitting a JAR file for a job execution.
>
> Any suggestions?
>
> Do not click on links or open attachments unless you recognize the sender.
> Please use the report button if you believe this email is suspicious.
>


Re: Customer operator in BATCH execution mode

2021-05-26 Thread Dawid Wysakowicz
Hi,

No there is no API in the operator to know which mode it works in. We
aim to have separate operators for both modes if required. You can check
e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1].

Yes, it should be possible to register a timer for Long.MAX_WATERMARK if
you want to apply a transformation at the end of each key. You could
also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode.

A side note, I don't fully get what you mean by "build state for our
streaming application". Bear in mind though you cannot take a savepoint
from a job running in the BATCH execution mode. Moreover it uses a
different kind of StateBackend. Actually a dummy one, which just
imitates a real state backend.

Best,

Dawid


[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/KeyedBroadcastStateTransformationTranslator.java

On 25/05/2021 17:04, ChangZhuo Chen (陳昌倬) wrote:
> Hi,
>
> Currently, we want to use batch execution mode [0] and historical data
> to build state for our streaming application. Due to different between
> batch & streaming mode, we want to check current execution mode in
> custom operator. So our question is:
>
>
> * Is there any API for custom operator to know current execution mode
>   (batch or streaming)?
>
> * If we want to output after all elements of one specific key are
>   processed, can we just use timer since timer is triggered at the end
>   of input [0]?
>
>
> [0] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
>



OpenPGP_signature
Description: OpenPGP digital signature


Re: Job recovery issues with state restoration

2021-05-26 Thread Peter Westermann
/mnt/data is a local disk, so there shouldn’t be any additional latency. I’ll 
provide more information when/if this happens again.

Peter

From: Roman Khachatryan 
Date: Tuesday, May 25, 2021 at 6:54 PM
To: Peter Westermann 
Cc: user@flink.apache.org 
Subject: Re: Job recovery issues with state restoration
> I am not able to consistently reproduce this issue. It seems to only occur 
> when the failover happens at the wrong time. I have disabled task local 
> recovery and will report back if we see this again.

Thanks, please post any results here.

> The SST files are not the ones for task local recovery, those would be in a 
> different directory (we have configured io.tmp.dirs as /mnt/data/tmp).

Those files on /mnt could still be checked against the ones in
checkpoint directories (on S3/DFS), the size should match.

I'm also curious why do you place local recovery files on a remote FS?
(I assume /mnt/data/tmp is a remote FS or a persistent volume).
Currently, if a TM is lost (e.g. process dies) then those files can
not be used - and recovery will fallback to S3/DFS. So this probably
incurs some IO/latency unnecessarily.

Regards,
Roman

On Tue, May 25, 2021 at 2:16 PM Peter Westermann
 wrote:
>
> Hi Roman,
>
>
>
> I am not able to consistently reproduce this issue. It seems to only occur 
> when the failover happens at the wrong time. I have disabled task local 
> recovery and will report back if we see this again. We need incremental 
> checkpoints for our workload.
>
> The SST files are not the ones for task local recovery, those would be in a 
> different directory (we have configured io.tmp.dirs as /mnt/data/tmp).
>
>
>
> Thanks,
>
> Peter
>
>
>
>
>
> From: Roman Khachatryan 
> Date: Thursday, May 20, 2021 at 4:54 PM
> To: Peter Westermann 
> Cc: user@flink.apache.org 
> Subject: Re: Job recovery issues with state restoration
>
> Hi Peter,
>
> Do you experience this issue if running without local recovery or
> incremental checkpoints enabled?
> Or have you maybe compared local (on TM) and  remove (on DFS) SST files?
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 5:54 PM Peter Westermann
>  wrote:
> >
> > Hello,
> >
> >
> >
> > I’ve reported issues around checkpoint recovery in case of a job failure 
> > due to zookeeper connection loss in the past. I am still seeing issues 
> > occasionally.
> >
> > This is for Flink 1.12.3 with zookeeper for HA, S3 as the state backend, 
> > incremental checkpoints, and task-local recovery enabled.
> >
> >
> >
> > Here’s what happened: A zookeeper instance was terminated as part of a 
> > deployment for our zookeeper service, this caused a new jobmanager leader 
> > election (so far so good). A leader was elected and the job was restarted 
> > from the latest checkpoint but never became healthy. The root exception and 
> > the logs show issues reading state:
> >
> > o.r.RocksDBException: Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003579.sst.
> >  Size recorded in manifest 36718, actual size 2570\
> > Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003573.sst.
> >  Size recorded in manifest 13756, actual size 1307\
> > Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003575.sst.
> >  Size recorded in manifest 16278, actual size 1138\
> > Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003576.sst.
> >  Size recorded in manifest 23108, actual size 1267\
> > Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003577.sst.
> >  Size recorded in manifest 148089, actual size 1293\
> > \
> > \\tat org.rocksdb.RocksDB.open(RocksDB.java)\
> > \\tat org.rocksdb.RocksDB.open(RocksDB.java:286)\
> > \\tat 
> > o.a.f.c.s.s.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:80)\
> > \\t... 22 common frames omitted\
> > Wrapped by: java.io.IOException: Error while opening RocksDB instance.\
> > \\tat 
> > o.a.f.c.s.s.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92)\
> > \\tat 
> > o.a.f.c.s.s.r.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:145)\
> > \\tat 
> > o.a.f.c.s.s.r.RocksDBIncrementalRestoreOper

Re: [VOTE] Release 1.13.1, release candidate #1

2021-05-26 Thread Matthias Pohl
Hi Dawid,
+1 (non-binding)

Thanks for driving this release. I checked the following things:
- downloaded and build source code
- verified checksums
- double-checked diff of pom files between 1.13.0 and 1.13.1-rc1
- did a visual check of the release blog post
- started cluster and ran jobs (WindowJoin and WordCount); nothing
suspicious found in the logs
- verified change FLINK-22866 manually whether the issue is fixed

Best,
Matthias

On Tue, May 25, 2021 at 3:33 PM Dawid Wysakowicz 
wrote:

> Hi everyone,
> Please review and vote on the release candidate #1 for the version 1.13.1,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [2], which are signed with the key with
> fingerprint 31D2DD10BFC15A2D [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.13.1-rc1" [5],
> * website pull request listing the new release and adding announcement
> blog post [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Best,
> Dawid
>
> [1]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350058
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.1-rc1/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1422/
> [5] https://github.com/apache/flink/tree/release-1.13.1-rc1
> [6] https://github.com/apache/flink-web/pull/448
>


Re: Flink 1.12.4 docker image

2021-05-26 Thread Arvid Heise
Just FYI https://hub.docker.com/_/flink is updated now as well.

On Wed, May 26, 2021 at 9:57 AM Nikola Hrusov  wrote:

> Hello Arvid,
>
> Thank you for your fast response
>
> Regards
> ,
> Nikola
>
>
> On Tue, May 25, 2021 at 7:11 PM Arvid Heise  wrote:
>
>> Hi Nikola,
>>
>> https://hub.docker.com/r/apache/flink now contains the images. It takes
>> a few days until https://hub.docker.com/_/flink is updated though.
>>
>> Sorry for the hassle.
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, May 25, 2021 at 3:08 PM Arvid Heise  wrote:
>>
>>> Hi Nikola,
>>>
>>> I'm looking into it. I might have missed a step during release.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Mon, May 24, 2021 at 3:47 PM Nikola Hrusov 
>>> wrote:
>>>
 Hello,

 I saw that flink 1.12.4 just got released. However I am struggling to
 find the docker image.

 I checked both:
 - https://hub.docker.com/_/flink
 - https://hub.docker.com/r/apache/flink

 but on both 1.12.4 is not available.

 Are there plans to publish it as a docker image?

 Regards
 ,
 Nikola




Fwd: Getting error in pod template

2021-05-26 Thread Priyanka Manickam
-- Forwarded message -
From: Priyanka Manickam 
Date: Tue, 25 May 2021, 21:13
Subject: Fwd: Getting error in pod template
To: user , Yang Wang 




-- Forwarded message -
From: Priyanka Manickam 
Date: Tue, 25 May 2021, 21:11
Subject: Re: Getting error in pod template
To: Yang Wang 


Hi team ,

Now i am able to run the flink with pod-template. Thanks for the input.


Problem 1:
 But i am not able to pull the jar from the blob storage throught command
used in the pod template file.

Problem 2:
Also, we are trying to write the events from one topic to another topic.
Where with parallelism 8, task slots 8, -Djobmanager.memory.process.size=1g,
-Dtaskmanager.memory.process.size=2g,
-Dkubernetes.jobmanager.cpu=0.5,
-Dtaskmanager.cpu=2.

Kafka(eventhub ) partition =3, we have planned to get 1 lac messages per
second.

But,I was able to get the through put of 555 mesages per seconds.

I have tried to increase the parallelism also, it doesnot work.

Could you please help me out here.

Thanks,
Priyanka Manickam.

On Fri, 14 May 2021, 21:00 Priyanka Manickam, 
wrote:

> Hi yang,
>
> I was using pod template to fetch the logs to the particular repository.
>
> But while deploying i have got some error , says "
> jobmanager-pod-template" is invalid : spec.containers(0).image: required
> value.
>
> . And if i try to give add the image for flink-main-container. Its giving
> image pull back of error.
>
> Am i proceeding in the a correct way . Because in the flink official
> website , no image is added after the flink-main-container.
>
> Could you please help with this. I have also searchsd for the demo videos
> for using the pod template with flink native kubernetes but i could not
> able to find..If you could share any demo videos on the website it will
> very useful for everyone.
>
> Good year ahead..
>
> Thanks,
> Priyanka Manickam.
>
>
>


Re: Flink 1.12.4 docker image

2021-05-26 Thread Nikola Hrusov
Hello Arvid,

Thank you for your fast response

Regards
,
Nikola


On Tue, May 25, 2021 at 7:11 PM Arvid Heise  wrote:

> Hi Nikola,
>
> https://hub.docker.com/r/apache/flink now contains the images. It takes a
> few days until https://hub.docker.com/_/flink is updated though.
>
> Sorry for the hassle.
>
> Best,
>
> Arvid
>
> On Tue, May 25, 2021 at 3:08 PM Arvid Heise  wrote:
>
>> Hi Nikola,
>>
>> I'm looking into it. I might have missed a step during release.
>>
>> Best,
>>
>> Arvid
>>
>> On Mon, May 24, 2021 at 3:47 PM Nikola Hrusov  wrote:
>>
>>> Hello,
>>>
>>> I saw that flink 1.12.4 just got released. However I am struggling to
>>> find the docker image.
>>>
>>> I checked both:
>>> - https://hub.docker.com/_/flink
>>> - https://hub.docker.com/r/apache/flink
>>>
>>> but on both 1.12.4 is not available.
>>>
>>> Are there plans to publish it as a docker image?
>>>
>>> Regards
>>> ,
>>> Nikola
>>>
>>>


Re: Flink 1.11.3 NoClassDefFoundError: Could not initialize class

2021-05-26 Thread Piotr Nowojski
Hi,

Maybe before deleting the pods, you could look inside them and inspect your
job's jar? What classes does it have inside it? The job's jar should be in
a local directory. Or maybe even first inspect the jar before submitting it?

Best, Piotrek

wt., 25 maj 2021 o 17:40 Georgi Stoyanov  napisał(a):

> Hi Piotr, thank you for the fast reply.
>
>
>
> The job is restarting in the same flink session and fails with that
> exception. When I delete the pods (we are using the google cdr, so I just
> kubectl delete FlinkCluster …) and the yaml is applied again, it’s working
> as expected. It looks to me that it’s jar problem, since I just notice it
> started to fail with a class from a internal common library, not only the
> jobs
>
> java.lang.NoClassDefFoundError: Could not initialize
> com.my.organization.core.cfg.PropertiesConfigurationClass
> at
> com.my.organization.core.CassandraSink$1.buildCluster(CassandraSink.java:162)
> at
> org.apache.flink.streaming.connectors.cassandra.ClusterBuilder.getCluster(ClusterBuilder.java:32)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:86)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:106)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Tuesday, May 25, 2021 6:18 PM
> *To:* Georgi Stoyanov 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink 1.11.3 NoClassDefFoundError: Could not initialize
> class
>
>
>
> Hi Georgi,
>
>
>
> I don't think it's a bug in Flink. It sounds like some problem with
> dependencies or jars in your job. Can you explain a bit more what do you
> mean by:
>
>
>
> > that some of them are constantly restarting with the following
> exception. After restart, everything is working as expected
>
>
>
> constantly restarting, but after a restart everything is working?
>
>
>
> Best,
>
> Piotrek
>
>
>
> wt., 25 maj 2021 o 16:12 Georgi Stoyanov  napisał(a):
>
> Hi all,
>
>
> We have running several Flink jobs on k8s with flink 1.11.3 and recently
> we notice that some of them are constantly restarting with the following
> exception. After restart, everything is working as expected.
> Could this be a bug?
> 2021-05-25 17:04:42
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: unexpected exception type
> at
> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750)
> at
> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>