Re: CVE-2021-44228 - Log4j2 vulnerability

2021-12-15 Thread Richard Deurwaarder
Thanks for picking this up quickly!

I saw you've made a second minor upgrade to upgrade to log4j2 2.16 which is
perfect.

Just to clarify: Will you also push new docker images for these releases as
well? In particular flink 1.11.6 (Sorry we must upgrade soon! :()

On Tue, Dec 14, 2021 at 2:33 AM narasimha  wrote:

> Thanks TImo, that was helpful.
>
> On Mon, Dec 13, 2021 at 7:19 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Chesnay Thank you for the clarification.
>>
>> On Mon, Dec 13, 2021 at 6:55 PM Chesnay Schepler 
>> wrote:
>>
>>> The flink-shaded-zookeeper jars do not contain log4j.
>>>
>>> On 13/12/2021 14:11, Prasanna kumar wrote:
>>>
>>> Does Zookeeper have this vulnerability dependency ? I see references to
>>> log4j in Shaded Zookeeper jar included as part of the flink distribution.
>>>
>>> On Mon, Dec 13, 2021 at 1:40 PM Timo Walther  wrote:
>>>
>>>> While we are working to upgrade the affected dependencies of all
>>>> components, we recommend users follow the advisory of the Apache Log4j
>>>> Community. Also Ververica platform can be patched with a similar
>>>> approach:
>>>>
>>>> To configure the JVMs used by Ververica Platform, you can pass custom
>>>> Java options via the JAVA_TOOL_OPTIONS environment variable. Add the
>>>> following to your platform values.yaml, or append to the existing value
>>>> of JAVA_TOOL_OPTIONS if you are using it already there, then redeploy
>>>> the platform with Helm:
>>>> env:
>>>>- name: JAVA_TOOL_OPTIONS
>>>>  value: -Dlog4j2.formatMsgNoLookups=true
>>>>
>>>>
>>>> For any questions, please contact us via our support portal.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> On 11.12.21 06:45, narasimha wrote:
>>>> > Folks, what about the veverica platform. Is there any
>>>> mitigation around it?
>>>> >
>>>> > On Fri, Dec 10, 2021 at 3:32 PM Chesnay Schepler >>> > <mailto:ches...@apache.org>> wrote:
>>>> >
>>>> > I would recommend to modify your log4j configurations to set
>>>> > log4j2.formatMsgNoLookups to true/./
>>>> > /
>>>> > /
>>>> > As far as I can tell this is equivalent to upgrading log4j, which
>>>> > just disabled this lookup by default.
>>>> > /
>>>> > /
>>>> > On 10/12/2021 10:21, Richard Deurwaarder wrote:
>>>> >> Hello,
>>>> >>
>>>> >> There has been a log4j2 vulnerability made public
>>>> >> https://www.randori.com/blog/cve-2021-44228/
>>>> >> <https://www.randori.com/blog/cve-2021-44228/> which is making
>>>> >> some waves :)
>>>> >> This post even explicitly mentions Apache Flink:
>>>> >>
>>>> https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert/
>>>> >> <
>>>> https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert/
>>>> >
>>>> >>
>>>> >> And fortunately, I saw this was already on your radar:
>>>> >> https://issues.apache.org/jira/browse/FLINK-25240
>>>> >> <https://issues.apache.org/jira/browse/FLINK-25240>
>>>> >>
>>>> >> What would the advice be for flink users? Do you expect to push a
>>>> >> minor to fix this? Or is it advisable to upgrade to the latest
>>>> >> log4j2 version manually for now?
>>>> >>
>>>> >> Thanks for any advice!
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > A.Narasimha Swamy
>>>>
>>>>
>>>
>
> --
> A.Narasimha Swamy
>


CVE-2021-44228 - Log4j2 vulnerability

2021-12-10 Thread Richard Deurwaarder
Hello,

There has been a log4j2 vulnerability made public
https://www.randori.com/blog/cve-2021-44228/ which is making some waves :)
This post even explicitly mentions Apache Flink:
https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert/

And fortunately, I saw this was already on your radar:
https://issues.apache.org/jira/browse/FLINK-25240

What would the advice be for flink users? Do you expect to push a minor to
fix this? Or is it advisable to upgrade to the latest log4j2 version
manually for now?

Thanks for any advice!


What happens when a job is rescaled

2020-11-13 Thread Richard Deurwaarder
Hello,

I have a question about what actually happens when a job is started from an
existing checkpoint, in particular when the parallelism has changed.

*Context:*
We have a flink 1.11.2 (DataStream API) job running on Kubernetes (GCP)
writing its state to GCS.
Normally we run with 12 TMs each 3 CPU cores and about 12gb RAM. We have
quite a bit of state (stored with rocksdb), about 20-25 operators which
have state ranging from 20gb to 180gb per operator. In total we have about
600gb of state.

During normal operations, this works fine the only 'problem' we have is
that savepoints (creation and starting from) are very slow. Therefore we
use external checkpoints to deploy new versions of our job.

*What is our problem?*
One of the things I am currently trying to investigate is why rescaling our
job is so slow. The way we rescale is by canceling the job and then
starting the job with a higher parallelism, whilst pointing to the previous
(external) checkpoint.

Without rescaling, for instance when deploying new code, starting a job
from a checkpoint would cause the first new checkpoint to complete in maybe
5 minutes.
However, if I double the parallelism the first checkpoint takes over an
hour or more to complete. This is troublesome because kubernetes might
sometime decide to restart a TM causing a job restart and thus having to
redo all the checkpoint work...( very annoying if this happens when the
checkpoint is about to finish.. :) )

*What happens during a checkpoint:*
Looking at metrics we can see:
 * CPU being at 100%
 * RAM swinging up and down depending on what operator is currently
checkpointing.
 * Network traffic to GCS peaks at 100mb/s per TM (tests indicate network
should not be a cause a bottle neck).
 * Disk (SSD) iops are in the order of 2-3000 upwards to spikes of 10k
iops, not even close to capacity

Now the obvious answer would be to increase the CPU. This does not really
seem to help though, plus we'd really like to prevent having to vertically
scale our job just to do parallelism changes, as during normal operations
our CPU usage is around 50-60%.

*Question:*
My question is:
What actually happens when flink starts a new job from an existing
checkpoint. What extra work needs to be done because of a change in
parallelism? Is it 'normal' that we would incur this penalty for scaling up
or down?
Do you have any pointers where we should look to get better performance?

Thank you in advance :)

Richard


Re: Does flink support retries on checkpoint write failures

2020-02-01 Thread Richard Deurwaarder
Hi Till & others,

We enabled setFailOnCheckpointingErrors
(setTolerableCheckpointFailureNumber isn't available in 1.8) and this
indeed prevents the large number of restarts.

Hopefully a solution for the reported issue[1] with google gets found but
for now this solved our immediate problem.

Thanks again!

[1] https://issuetracker.google.com/issues/137168102

Regards,

Richard

On Thu, Jan 30, 2020 at 11:40 AM Arvid Heise  wrote:

> If a checkpoint is not successful, it cannot be used for recovery.
> That means Flink will restart to the last successful checkpoint and hence
> not lose any data.
>
> On Wed, Jan 29, 2020 at 9:52 PM wvl  wrote:
>
>> Forgive my lack of knowledge here - I'm a bit out of my league here.
>>
>> But I was wondering if allowing e.g. 1 checkpoint to fail and the reason
>> for which somehow caused a record to be lost (e.g. rocksdb exception /
>> taskmanager crash / etc), there would be no Source rewind to the last
>> successful checkpoint and this record would be lost forever, correct?
>>
>> On Wed, 29 Jan 2020, 17:51 Richard Deurwaarder,  wrote:
>>
>>> Hi Till,
>>>
>>> I'll see if we can ask google to comment on those issues, perhaps they
>>> have a fix in the works that would solve the root problem.
>>> In the meanwhile
>>> `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very
>>> promising!
>>> Thank you for this. I'm going to try this tomorrow to see if that helps.
>>> I will let you know!
>>>
>>> Richard
>>>
>>> On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Richard,
>>>>
>>>> googling a bit indicates that this might actually be a GCS problem [1,
>>>> 2, 3]. The proposed solution/workaround so far is to retry the whole upload
>>>> operation as part of the application logic. Since I assume that you are
>>>> writing to GCS via Hadoop's file system this should actually fall into the
>>>> realm of the Hadoop file system implementation and not Flink.
>>>>
>>>> What you could do to mitigate the problem a bit is to set the number of
>>>> tolerable checkpoint failures to a non-zero value via
>>>> `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n`
>>>> means that the job will only fail and then restart after `n` checkpoint
>>>> failures. Unfortunately, we do not support a failure rate yet.
>>>>
>>>> [1] https://github.com/googleapis/google-cloud-java/issues/3586
>>>> [2] https://github.com/googleapis/google-cloud-java/issues/5704
>>>> [3] https://issuetracker.google.com/issues/137168102
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> We've got a Flink job running on 1.8.0 which writes its state
>>>>> (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a large
>>>>> amount of state (500gb range) are becoming *very* unstable. In the order 
>>>>> of
>>>>> restarting once an hour or even more.
>>>>>
>>>>> The reason for this instability is that we run into "410 Gone"[4]
>>>>> errors from Google Cloud Storage. This indicates an upload (write from
>>>>> Flink's perspective) took place and it wanted to resume the write[2] but
>>>>> could not find the file which it needed to resume. My guess is this is
>>>>> because the previous attempt either failed or perhaps it uploads in chunks
>>>>> of 67mb [3].
>>>>>
>>>>> The library logs this line when this happens:
>>>>>
>>>>> "Encountered status code 410 when accessing URL
>>>>> https://www.googleapis.com/upload/storage/v1/b//o?ifGenerationMatch=0=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata=resumable_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
>>>>> Delegating to response handler for possible retry."
>>>>>
>>>>> We're kind of stuck on these questions:
>>>>> * Is flink capable or doing these retries?
>>>>> * Does anyone succesfully write their (rocksdb) state to Google Cloud
>>>>> storage for bigger state sizes?
>>>>> * Is it possible flink renames or deletes certain directories before
>>>>> all flushes have been done based on an atomic guarantee provided by HDFS
>>>>> that does not hold on other implementations perhaps? A race condition of
>>>>> sorts
>>>>>
>>>>> Basically does anyone recognize this behavior?
>>>>>
>>>>> Regards,
>>>>>
>>>>> Richard Deurwaarder
>>>>>
>>>>> [1] We use an HDFS implementation provided by Google
>>>>> https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
>>>>> [2]
>>>>> https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
>>>>> [3]
>>>>> https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md
>>>>>  (see
>>>>> fs.gs.outputstream.upload.chunk.size)
>>>>> [4] Stacktrace:
>>>>> https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492
>>>>>
>>>>


Does flink support retries on checkpoint write failures

2020-01-28 Thread Richard Deurwaarder
Hi all,

We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to
Google Cloud Storage[1]. We've noticed that jobs with a large amount of
state (500gb range) are becoming *very* unstable. In the order of
restarting once an hour or even more.

The reason for this instability is that we run into "410 Gone"[4] errors
from Google Cloud Storage. This indicates an upload (write from Flink's
perspective) took place and it wanted to resume the write[2] but could not
find the file which it needed to resume. My guess is this is because the
previous attempt either failed or perhaps it uploads in chunks of 67mb [3].

The library logs this line when this happens:

"Encountered status code 410 when accessing URL
https://www.googleapis.com/upload/storage/v1/b//o?ifGenerationMatch=0=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata=resumable_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
Delegating to response handler for possible retry."

We're kind of stuck on these questions:
* Is flink capable or doing these retries?
* Does anyone succesfully write their (rocksdb) state to Google Cloud
storage for bigger state sizes?
* Is it possible flink renames or deletes certain directories before all
flushes have been done based on an atomic guarantee provided by HDFS that
does not hold on other implementations perhaps? A race condition of sorts

Basically does anyone recognize this behavior?

Regards,

Richard Deurwaarder

[1] We use an HDFS implementation provided by Google
https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
[2] https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
[3]
https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md
(see
fs.gs.outputstream.upload.chunk.size)
[4] Stacktrace:
https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492


Re: PubSub source throwing grpc errors

2020-01-15 Thread Richard Deurwaarder
Hi Itamar and Till,

Yes this actually looks a lot worse than it is, fortunately.

>From what I understand this means: something has not released or properly
shutdown an grpc client and the library likes to inform you about this. I
would definartly expect to see this if the job crashes at the 'wrong' point.

As you can see in the issue they did fix or change this at some point. Do
you have something to reproduce this in particular how or when
serialization causes issues? I'll try updating the libraries and see if
that removes the verbose logs.

Regards,

Richard



On Wed, Jan 15, 2020 at 5:37 PM Till Rohrmann  wrote:

> Hi Itamar,
>
> could you share a bit more details about the serialization problem. Which
> class is not serializable and where does it originate from?
>
> Cheers,
> Till
>
> On Tue, Jan 14, 2020 at 9:47 PM Itamar Syn-Hershko <
> ita...@bigdataboutique.com> wrote:
>
>> Thanks!
>>
>> I was able to track this down. Essentially it was a deserialization error
>> which propagated and might have prevented the channel from closing down
>> properly. This could be considered as a problem, but I'm not further down
>> the rabbit hole chasing down a solution for the original deserialization
>> issue.
>>
>> Thanks for the help!
>>
>> On Tue, Jan 14, 2020 at 8:26 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Itamar,
>>>
>>> for further debugging it would be helpful to get the full logs of Flink
>>> and more information about your environment. Since I'm not too
>>> familiar with Flink's PubSub connector, I have pulled in Richard (original
>>> author), Becket and Robert (both helped with reviewing and merging this
>>> connector). They might know what's going on.
>>>
>>> The problem looks a bit similar to [1]. Maybe it would help to upgrade
>>> to a newer google-cloud-pubsub version than 1.62.0. I assume that the
>>> others might know more about it.
>>>
>>> [1] https://github.com/googleapis/google-cloud-java/issues/3648
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Jan 13, 2020 at 12:19 PM Itamar Syn-Hershko <
>>> ita...@bigdataboutique.com> wrote:
>>>
 Hi all,

 We are trying to use the PubSub source with a very minimal and basic
 Flink application as a POC, and getting the following error consistently
 every couple of seconds. What am I missing?

 ```
 io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
 cleanQueue
 SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=5, target=
 pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
 Make sure to call shutdown()/shutdownNow() and wait until
 awaitTermination() returns true.
 java.lang.RuntimeException: ManagedChannel allocation site
 at
 io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.(ManagedChannelOrphanWrapper.java:103)
 at
 io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:53)
 at
 io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:44)
 at
 io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:419)
 at
 org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:55)
 at
 org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:178)
 at
 org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:100)
 at
 org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.base/java.lang.Thread.run(Thread.java:834)
 ```

 Thanks!

 --

 [image: logo] 
 Itamar Syn-Hershko


 ita...@bigdataboutique.com
 https://bigdataboutique.com
 
 
 

>>>
>>
>> --
>>
>> [image: logo] 
>> Itamar Syn-Hershko
>> CTO, Founder
>> +972-54-2467860
>> ita...@bigdataboutique.com
>> https://bigdataboutique.com
>> 
>> 
>> 
>>
>


Re: Setting environment variables of the taskmanagers (yarn)

2019-09-25 Thread Richard Deurwaarder
Hi Peter and Jiayi,

Thanks for the answers this worked perfectly, I just added

containerized.master.env.GOOGLE_APPLICATION_CREDENTIALS=xyz
and
containerized.taskmanager.env.GOOGLE_APPLICATION_CREDENTIALS=xyz

to my flink config and they got picked up.

Do you know why this is missing from the docs? If it's not intentional it
might be nice to add it.

Richard

On Tue, Sep 24, 2019 at 5:53 PM Peter Huang 
wrote:

> Hi Richard,
>
> For the first question, I don't think you need to explicitly specify
> fs.hdfs.hadoopconf as each file in the ship folder is copied as a yarn
> local resource for containers. The configuration path is
> overridden internally in Flink.
>
> For the second question of setting TM environment variables, please use
> these two configurations in your flink conf.
>
> /**
>  * Prefix for passing custom environment variables to Flink's master process.
>  * For example for passing LD_LIBRARY_PATH as an env variable to the 
> AppMaster, set:
>  * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
>  * in the flink-conf.yaml.
>  */
> public static final String CONTAINERIZED_MASTER_ENV_PREFIX = 
> "containerized.master.env.";
>
> /**
>  * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration 
> prefix allows
>  * setting custom environment variables for the workers (TaskManagers).
>  */
> public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = 
> "containerized.taskmanager.env.";
>
>
>
> Best Regards
>
> Peter Huang
>
>
>
>
> On Tue, Sep 24, 2019 at 8:02 AM Richard Deurwaarder 
> wrote:
>
>> Hello,
>>
>> We have our flink job (1.8.0) running on our hadoop 2.7 cluster with
>> yarn. We would like to add the GCS connector to use GCS rather than HDFS.
>> Following the documentation of the GCS connector[1] we have to specify
>> which credentials we want to use and there are two ways of doing this:
>>   * Edit core-site.xml
>>   * Set an environment variable: GOOGLE_APPLICATION_CREDENTIALS
>>
>> Because we're on a company shared hadoop cluster we do not want to change
>> the cluster wide core-site.xml.
>>
>> This leaves me with two options:
>>
>> 1. Create a custom core-site.xml and use --yarnship to send it to all the
>> taskmanager contains. If I do this, to what value should I set
>> fs.hdfs.hadoopconf[2] in flink-conf ?
>> 2. The second option would be to set an environment variable, however
>> because the taskmanagers are started via yarn I'm having trouble figuring
>> out how to make sure this environment variable is set for each yarn
>> container / taskmanager.
>>
>> I would appreciate any help you can provide.
>>
>> Thank you,
>>
>> Richard
>>
>> [1]
>> https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md#configure-hadoop
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#hdfs
>>
>


Setting environment variables of the taskmanagers (yarn)

2019-09-24 Thread Richard Deurwaarder
Hello,

We have our flink job (1.8.0) running on our hadoop 2.7 cluster with yarn.
We would like to add the GCS connector to use GCS rather than HDFS.
Following the documentation of the GCS connector[1] we have to specify
which credentials we want to use and there are two ways of doing this:
  * Edit core-site.xml
  * Set an environment variable: GOOGLE_APPLICATION_CREDENTIALS

Because we're on a company shared hadoop cluster we do not want to change
the cluster wide core-site.xml.

This leaves me with two options:

1. Create a custom core-site.xml and use --yarnship to send it to all the
taskmanager contains. If I do this, to what value should I set
fs.hdfs.hadoopconf[2] in flink-conf ?
2. The second option would be to set an environment variable, however
because the taskmanagers are started via yarn I'm having trouble figuring
out how to make sure this environment variable is set for each yarn
container / taskmanager.

I would appreciate any help you can provide.

Thank you,

Richard

[1]
https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md#configure-hadoop
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#hdfs


Re: Job Manager becomes irresponsive if the size of the session cluster grows

2019-07-26 Thread Richard Deurwaarder
Hello,

We run into the same problem. We've done most of the same
steps/observations:

   - increase memory
   - increase cpu
   - No noticable increase in GC activity
   - Little network io

Our current setup has the liveliness probe disabled and we've increased
(akka)timeouts, this seems to help a bit but it still feels very
unresponsive.


On Fri, Jul 26, 2019 at 12:41 PM Biao Liu  wrote:

> Hi Prakhar,
>
> Sorry I don't have much experience on k8s. Maybe some other guys could
> help.
>
> On Fri, Jul 26, 2019 at 6:20 PM Prakhar Mathur 
> wrote:
>
>> Hi,
>>
>> So we were deploying our flink clusters on YARN earlier but then we moved
>> to kubernetes, but then our clusters were not this big. Have you guys seen
>> issues with job manager rest server becoming irresponsive on kubernetes
>> before?
>>
>> On Fri, Jul 26, 2019, 14:28 Biao Liu  wrote:
>>
>>> Hi Prakhar,
>>>
>>> Sorry I could not find any abnormal message from your GC log and stack
>>> trace.
>>> Have you ever tried deploying the cluster in other ways? Not on
>>> Kubernetes. Like on YARN or standalone. Just for narrowing down the scope.
>>>
>>>
>>> On Tue, Jul 23, 2019 at 12:34 PM Prakhar Mathur 
>>> wrote:
>>>

 On Mon, Jul 22, 2019, 16:08 Prakhar Mathur 
 wrote:

> Hi,
>
> We enabled GC logging, here are the logs
>
> [GC (Allocation Failure) [PSYoungGen: 6482015K->70303K(6776832K)]
> 6955827K->544194K(20823552K), 0.0591479 secs] [Times: user=0.09 sys=0.00,
> real=0.06 secs]
> [GC (Allocation Failure) [PSYoungGen: 6587039K->38642K(6763008K)]
> 7060930K->512614K(20809728K), 0.0740978 secs] [Times: user=0.08 sys=0.00,
> real=0.07 secs]
> [GC (Allocation Failure) [PSYoungGen: 6502858K->192077K(6734848K)]
> 6976829K->666144K(20781568K), 0.0841759 secs] [Times: user=0.17 sys=0.00,
> real=0.09 secs]
> [GC (Allocation Failure) [PSYoungGen: 6647378K->50108K(6759424K)]
> 7121446K->524248K(20806144K), 0.0622997 secs] [Times: user=0.08 sys=0.00,
> real=0.07 secs]
> [GC (Allocation Failure) [PSYoungGen: 6501890K->60606K(6779904K)]
> 6976029K->534961K(20826624K), 0.0637955 secs] [Times: user=0.09 sys=0.00,
> real=0.06 secs]
> [GC (Allocation Failure) [PSYoungGen: 6586046K->40411K(6768640K)]
> 7060401K->514839K(20815360K), 0.0729137 secs] [Times: user=0.08 sys=0.00,
> real=0.07 secs]
> [GC (Allocation Failure) [PSYoungGen: 6543919K->51886K(6797824K)]
> 7018346K->526385K(20844544K), 0.0649143 secs] [Times: user=0.09 sys=0.00,
> real=0.07 secs]
> [GC (Allocation Failure) [PSYoungGen: 6601690K->191832K(6754304K)]
> 7076190K->666427K(20801024K), 0.1029686 secs] [Times: user=0.18 sys=0.00,
> real=0.10 secs]
> [GC (Allocation Failure) [PSYoungGen: 6742947K->62693K(6781952K)]
> 7217543K->537361K(20828672K), 0.0639272 secs] [Times: user=0.09 sys=0.00,
> real=0.06 secs]
> [GC (Allocation Failure) [PSYoungGen: 6589986K->66299K(6770688K)]
> 7064653K->541039K(20817408K), 0.0701853 secs] [Times: user=0.08 sys=0.00,
> real=0.07 secs]
> [GC (Allocation Failure) [PSYoungGen: 6590742K->42995K(6800896K)]
> 7065481K->517798K(20847616K), 0.0595729 secs] [Times: user=0.08 sys=0.00,
> real=0.06 secs]
> [GC (Allocation Failure) [PSYoungGen: 6608678K->66127K(6793728K)]
> 7083482K->541011K(20840448K), 0.0608270 secs] [Times: user=0.09 sys=0.00,
> real=0.06 secs]
> [GC (Allocation Failure) [PSYoungGen: 6608886K->62063K(6822400K)]
> 7083769K->537027K(20869120K), 0.0675917 secs] [Times: user=0.10 sys=0.00,
> real=0.07 secs]
> [GC (Allocation Failure) [PSYoungGen: 6617146K->200674K(6812160K)]
> 7092110K->812325K(20858880K), 1.1685237 secs] [Times: user=3.53 sys=0.71,
> real=1.17 secs]
> [GC (Allocation Failure) [PSYoungGen: 6773610K->194848K(6633984K)]
> 7385261K->806700K(20680704K), 0.0858601 secs] [Times: user=0.19 sys=0.00,
> real=0.09 secs]
> [GC (Allocation Failure) [PSYoungGen: 6617888K->44002K(6723072K)]
> 7229740K->655854K(20769792K), 0.0647387 secs] [Times: user=0.09 sys=0.00,
> real=0.06 secs]
> [GC (Allocation Failure) [PSYoungGen: 6420043K->194672K(6702080K)]
> 7031895K->806604K(20748800K), 0.0833792 secs] [Times: user=0.18 sys=0.00,
> real=0.08 secs]
> [GC (Allocation Failure) [PSYoungGen: 6603376K->187059K(6596096K)]
> 7215308K->799063K(20642816K), 0.0906529 secs] [Times: user=0.17 sys=0.00,
> real=0.09 secs]
> [GC (Allocation Failure) [PSYoungGen: 6572690K->51850K(6715904K)]
> 7184694K->663990K(20762624K), 0.0837285 secs] [Times: user=0.11 sys=0.00,
> real=0.09 secs]
> [GC (Allocation Failure) [PSYoungGen: 6452380K->44766K(6708224K)]
> 7064519K->656993K(20754944K), 0.0809864 secs] [Times: user=0.10 sys=0.00,
> real=0.08 secs]
> [GC (Allocation Failure) [PSYoungGen: 6445790K->42654K(6730752K)]
> 7058017K->654961K(20777472K), 0.0686401 secs] [Times: user=0.08 sys=0.00,
> real=0.07 secs]
> 

Re: Flink Zookeeper HA: FileNotFoundException blob - Jobmanager not starting up

2019-07-23 Thread Richard Deurwaarder
Hi Fabian,

I followed the advice of another flink user who mailed me directly, he has
the same problem and told me to use something like: rmr zgrep
/flink/hunch/jobgraphs/1dccee15d84e1d2cededf89758ac2482
which allowed us to start the job again.

It might be nice to investigate what went wrong as it didn't feel good to
have our production clustered crippled like this.

Richard

On Tue, Jul 23, 2019 at 12:47 PM Fabian Hueske  wrote:

> Hi Richard,
>
> I hope you could resolve the problem in the meantime.
>
> Nonetheless, maybe Till (in CC) has an idea what could have gone wrong.
>
> Best, Fabian
>
> Am Mi., 17. Juli 2019 um 19:50 Uhr schrieb Richard Deurwaarder <
> rich...@xeli.eu>:
>
>> Hello,
>>
>> I've got a problem with our flink cluster where the jobmanager is not
>> starting up anymore, because it tries to download non existant (blob) file
>> from the zookeeper storage dir.
>>
>> We're running flink 1.8.0 on a kubernetes cluster and use the google
>> storage connector [1] to store checkpoints, savepoints and zookeeper data.
>>
>> When I noticed the jobmanager was having problems, it was in a crashloop
>> throwing file not found exceptions [2]
>> Caused by: java.io.FileNotFoundException: Item not found:
>> some-project-flink-state/recovery/hunch/blob/job_e6ad857af7f09b56594e95fe273e9eff/blob_p-486d68fa98fa05665f341d79302c40566b81034e-306d493f5aa810b5f4f7d8d63f5b18b5.
>> If you enabled STRICT generation consistency, it is possible that the live
>> version is still available but the intended generation is deleted.
>>
>> I looked in the blob directory and I can only find:
>> /recovery/hunch/blob/job_1dccee15d84e1d2cededf89758ac2482 I've tried to
>> fiddle around in zookeeper to see if I could find anything [3], but I do
>> not really know what to look for.
>>
>> How could this have happened and how should I recover the job from this
>> situation?
>>
>> Thanks,
>>
>> Richard
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/connectors.html#using-hadoop-file-system-implementations
>> [2] https://gist.github.com/Xeli/0321031655e47006f00d38fc4bc08e16
>> [3] https://gist.github.com/Xeli/04f6d861c5478071521ac6d2c582832a
>>
>


Flink Zookeeper HA: FileNotFoundException blob - Jobmanager not starting up

2019-07-17 Thread Richard Deurwaarder
Hello,

I've got a problem with our flink cluster where the jobmanager is not
starting up anymore, because it tries to download non existant (blob) file
from the zookeeper storage dir.

We're running flink 1.8.0 on a kubernetes cluster and use the google
storage connector [1] to store checkpoints, savepoints and zookeeper data.

When I noticed the jobmanager was having problems, it was in a crashloop
throwing file not found exceptions [2]
Caused by: java.io.FileNotFoundException: Item not found:
some-project-flink-state/recovery/hunch/blob/job_e6ad857af7f09b56594e95fe273e9eff/blob_p-486d68fa98fa05665f341d79302c40566b81034e-306d493f5aa810b5f4f7d8d63f5b18b5.
If you enabled STRICT generation consistency, it is possible that the live
version is still available but the intended generation is deleted.

I looked in the blob directory and I can only find:
/recovery/hunch/blob/job_1dccee15d84e1d2cededf89758ac2482 I've tried to
fiddle around in zookeeper to see if I could find anything [3], but I do
not really know what to look for.

How could this have happened and how should I recover the job from this
situation?

Thanks,

Richard

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/connectors.html#using-hadoop-file-system-implementations
[2] https://gist.github.com/Xeli/0321031655e47006f00d38fc4bc08e16
[3] https://gist.github.com/Xeli/04f6d861c5478071521ac6d2c582832a


Re: BigQuery source ?

2019-06-04 Thread Richard Deurwaarder
I've looked into this briefly a while ago out of interest and read about
how beam handles this. I've never actually implemented but the concept
sounds reasonable to me.

What I read from their code is that beam exports the BigQuery data to
Google Storage. This export shards the data in files with a max size of 1GB
and these files are then processed by the 'source functions' in beam.

I think implementing this in Flink would require the following:

* Before starting the Flink job run the BigQuery to Google Storage Export (
https://cloud.google.com/bigquery/docs/exporting-data)
* Start the flink job and point towards the Google storage files (using
https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage to
easily read from Google Storage buckets)

So the job might look something like this:

> List files = doBigQueryExportJob();
> DataSet records = environment.fromCollection(files)
> .flatMap(new ReadFromFile())
> .map(doWork());


On Fri, May 31, 2019 at 10:15 AM Niels Basjes  wrote:

> Hi,
>
> Has anyone created a source to READ from BigQuery into Flink yet (we
> have Flink running on K8S in the Google cloud)?
> I would like to retrieve a DataSet in a distributed way (the data ...
> it's kinda big) and process that with Flink running on k8s (which we
> have running already).
>
> So far I have not been able to find anything yet.
> Any pointers/hints/code fragments are welcome.
>
> Thanks
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: [ANNOUNCE] Apache Flink 1.8.0 released

2019-04-11 Thread Richard Deurwaarder
Very nice! Thanks Aljoscha and all contributors!

I have one question, will the docker image for 1.8.0 be released soon as
well? https://hub.docker.com/_/flink has the versions up to 1.7.2.

Regards,

Richard

On Wed, Apr 10, 2019 at 4:54 PM Rong Rong  wrote:

> Congrats! Thanks Aljoscha for being the release manager and all for making
> the release possible.
>
> --
> Rong
>
>
> On Wed, Apr 10, 2019 at 4:23 AM Stefan Richter 
> wrote:
>
>> Congrats and thanks to Aljoscha for managing the release!
>>
>> Best,
>> Stefan
>>
>> > On 10. Apr 2019, at 13:01, Biao Liu  wrote:
>> >
>> > Great news! Thanks Aljoscha and all the contributors.
>> >
>> > Till Rohrmann mailto:trohrm...@apache.org>>
>> 于2019年4月10日周三 下午6:11写道:
>> > Thanks a lot to Aljoscha for being our release manager and to the
>> community making this release possible!
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Apr 10, 2019 at 12:09 PM Hequn Cheng > > wrote:
>> > Thanks a lot for the great release Aljoscha!
>> > Also thanks for the work by the whole community. :-)
>> >
>> > Best, Hequn
>> >
>> > On Wed, Apr 10, 2019 at 6:03 PM Fabian Hueske > > wrote:
>> > Congrats to everyone!
>> >
>> > Thanks Aljoscha and all contributors.
>> >
>> > Cheers, Fabian
>> >
>> > Am Mi., 10. Apr. 2019 um 11:54 Uhr schrieb Congxian Qiu <
>> qcx978132...@gmail.com >:
>> > Cool!
>> >
>> > Thanks Aljoscha a lot for being our release manager, and all the others
>> who make this release possible.
>> >
>> > Best, Congxian
>> > On Apr 10, 2019, 17:47 +0800, Jark Wu > imj...@gmail.com>>, wrote:
>> > > Cheers!
>> > >
>> > > Thanks Aljoscha and all others who make 1.8.0 possible.
>> > >
>> > > On Wed, 10 Apr 2019 at 17:33, vino yang > > wrote:
>> > >
>> > > > Great news!
>> > > >
>> > > > Thanks Aljoscha for being the release manager and thanks to all the
>> > > > contributors!
>> > > >
>> > > > Best,
>> > > > Vino
>> > > >
>> > > > Driesprong, Fokko  于2019年4月10日周三 下午4:54写道:
>> > > >
>> > > > > Great news! Great effort by the community to make this happen.
>> Thanks all!
>> > > > >
>> > > > > Cheers, Fokko
>> > > > >
>> > > > > Op wo 10 apr. 2019 om 10:50 schreef Shaoxuan Wang <
>> wshaox...@gmail.com >:
>> > > > >
>> > > > > > Thanks Aljoscha and all others who made contributions to FLINK
>> 1.8.0.
>> > > > > > Looking forward to FLINK 1.9.0.
>> > > > > >
>> > > > > > Regards,
>> > > > > > Shaoxuan
>> > > > > >
>> > > > > > On Wed, Apr 10, 2019 at 4:31 PM Aljoscha Krettek <
>> aljos...@apache.org >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > The Apache Flink community is very happy to announce the
>> release of
>> > > > > > Apache
>> > > > > > > Flink 1.8.0, which is the next major release.
>> > > > > > >
>> > > > > > > Apache Flink® is an open-source stream processing framework
>> for
>> > > > > > > distributed, high-performing, always-available, and accurate
>> data
>> > > > > > streaming
>> > > > > > > applications.
>> > > > > > >
>> > > > > > > The release is available for download at:
>> > > > > > > https://flink.apache.org/downloads.html <
>> https://flink.apache.org/downloads.html>
>> > > > > > >
>> > > > > > > Please check out the release blog post for an overview of the
>> > > > > > improvements
>> > > > > > > for this bugfix release:
>> > > > > > > https://flink.apache.org/news/2019/04/09/release-1.8.0.html <
>> https://flink.apache.org/news/2019/04/09/release-1.8.0.html>
>> > > > > > >
>> > > > > > > The full release notes are available in Jira:
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344274
>> <
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344274
>> >
>> > > > > > >
>> > > > > > > We would like to thank all contributors of the Apache Flink
>> community
>> > > > > who
>> > > > > > > made this release possible!
>> > > > > > >
>> > > > > > > Regards,
>> > > > > > > Aljoscha
>> > > > > >
>> > > > >
>> > > >
>>
>>


Re: Submitting job to Flink on yarn timesout on flip-6 1.5.x

2019-02-26 Thread Richard Deurwaarder
Hello Gary,

Thank you for your response.

I'd like to use the new mode but it does not work for me. It seems I am
running into a firewall issue.

Because the rest.port is random when running on yarn[1]. The machine I use
to deploy the job can, in fact, start the Flink cluster, but it cannot
submit the job on the random chosen port because our firewall blocks it.

Do you know if this is still the case on 1.7 and if there is any way to
work around this?

Richard

[1]
https://stackoverflow.com/questions/54000276/flink-web-port-can-not-be-configured-correctly-in-yarn-mode

On Thu, Feb 21, 2019 at 3:41 PM Gary Yao  wrote:

> Hi,
>
> Beginning with Flink 1.7, you cannot use the legacy mode anymore [1][2]. I
> am
> currently working on removing references to the legacy mode in the
> documentation [3]. Is there any reason, you cannot use the "new mode"?
>
> Best,
> Gary
>
> [1] https://flink.apache.org/news/2018/11/30/release-1.7.0.html
> [2] https://issues.apache.org/jira/browse/FLINK-10392
> [3] https://issues.apache.org/jira/browse/FLINK-11713
>
> On Mon, Feb 18, 2019 at 12:00 PM Richard Deurwaarder 
> wrote:
>
>> Hello,
>>
>> I am trying to upgrade our job from flink 1.4.2 to 1.7.1 but I keep
>> running into timeouts after submitting the job.
>>
>> The flink job runs on our hadoop cluster and starts using Yarn.
>>
>> Relevant config options seem to be:
>>
>> jobmanager.rpc.port: 55501
>>
>> recovery.jobmanager.port: 55502
>>
>> yarn.application-master.port: 55503
>>
>> blob.server.port: 55504
>>
>>
>> I've seen the following behavior:
>>   - Using the same flink-conf.yaml as we used in 1.4.2: 1.5.6 / 1.6.3 /
>> 1.7.1 all versions timeout while 1.4.2 works.
>>   - Using 1.5.6 with "mode: legacy" (to switch off flip-6) works
>>   - Using 1.7.1 with "mode: legacy" gives timeout (I assume this option
>> was removed but the documentation is outdated?
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#legacy
>> )
>>
>> When the timeout happens I get the following stacktrace:
>>
>> INFO class java.time.Instant does not contain a getter for field seconds
>> 2019-02-18T10:16:56.815+01:00
>> INFO class com.bol.fin_hdp.cm1.domain.Cm1Transportable does not contain
>> a getter for field globalId 2019-02-18T10:16:56.815+01:00
>> INFO Submitting job 5af931bcef395a78b5af2b97e92dcffe (detached: false).
>> 2019-02-18T10:16:57.182+01:00
>> INFO 
>> 2019-02-18T10:29:27.527+01:00
>> INFO The program finished with the following exception:
>> 2019-02-18T10:29:27.564+01:00
>> INFO org.apache.flink.client.program.ProgramInvocationException: The
>> main method caused an error. 2019-02-18T10:29:27.601+01:00
>> INFO at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
>> 2019-02-18T10:29:27.638+01:00
>> INFO at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
>> 2019-02-18T10:29:27.675+01:00
>> INFO at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
>> 2019-02-18T10:29:27.711+01:00
>> INFO at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:798)
>> 2019-02-18T10:29:27.747+01:00
>> INFO at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:289)
>> 2019-02-18T10:29:27.784+01:00
>> INFO at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
>> 2019-02-18T10:29:27.820+01:00
>> INFO at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1035)
>> 2019-02-18T10:29:27.857+01:00
>> INFO at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:)
>> 2019-02-18T10:29:27.893+01:00
>> INFO at java.security.AccessController.doPrivileged(Native Method)
>> 2019-02-18T10:29:27.929+01:00
>> INFO at javax.security.auth.Subject.doAs(Subject.java:422)
>> 2019-02-18T10:29:27.968+01:00
>> INFO at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>> 2019-02-18T10:29:28.004+01:00
>> INFO at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> 2019-02-18T10:29:28.040+01:00
>> INFO at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:)
>> 2019-02-18T10:29:28.075+01:00
>> INFO Caused by: java.lang.RuntimeException:
>> org.apache.flink.client.program.ProgramInvoca

Re: Share broadcast state between multiple operators

2019-02-26 Thread Richard Deurwaarder
Hello Till,

So if I understand correctly, when messages get broadcast to multiple
operators, each operator will execute the processBroadcast() function and
store the state under a sort of operator scope? Even if they use the same
MapStateDescriptor?

And if it replicates the state between operators is what makes the
broadcast state different from an Operator state with Union redistribution?

Thanks for any clarification, very interesting to learn about :)

Richard

On Tue, Feb 26, 2019 at 11:57 AM Till Rohrmann  wrote:

> Hi Richard,
>
> Flink does not support to share state between multiple operators.
> Technically also the broadcast state is not shared but replicated between
> subtasks belonging to the same operator. So what you can do is to send the
> broadcast input to different operators, but they will all keep their own
> copy of the state.
>
> Cheers,
> Till
>
> On Mon, Feb 25, 2019 at 11:45 AM Richard Deurwaarder 
> wrote:
>
>> Hi All,
>>
>> Due to the way our code is structured, we would like to use the broadcast
>> state at multiple points of our pipeline. So not only share it between
>> multiple instances of the same operator but also between multiple
>> operators. See the image below for a simplified example.
>>
>> Flink does not seem to have any problems with this at runtime but I
>> wonder:
>>
>>- Is this a good pattern and was it designed with something like this
>>in mind?
>>- If we use the same MapStateDescriptor in both operators, does the
>>state only get stored once? And does it also only get written once?
>>
>>
>> [image: broadcast-state.png]
>>
>> Thanks!
>>
>


Share broadcast state between multiple operators

2019-02-25 Thread Richard Deurwaarder
Hi All,

Due to the way our code is structured, we would like to use the broadcast
state at multiple points of our pipeline. So not only share it between
multiple instances of the same operator but also between multiple
operators. See the image below for a simplified example.

Flink does not seem to have any problems with this at runtime but I wonder:

   - Is this a good pattern and was it designed with something like this in
   mind?
   - If we use the same MapStateDescriptor in both operators, does the
   state only get stored once? And does it also only get written once?


[image: broadcast-state.png]

Thanks!


Submitting job to Flink on yarn timesout on flip-6 1.5.x

2019-02-18 Thread Richard Deurwaarder
Hello,

I am trying to upgrade our job from flink 1.4.2 to 1.7.1 but I keep running
into timeouts after submitting the job.

The flink job runs on our hadoop cluster and starts using Yarn.

Relevant config options seem to be:

jobmanager.rpc.port: 55501

recovery.jobmanager.port: 55502

yarn.application-master.port: 55503

blob.server.port: 55504


I've seen the following behavior:
  - Using the same flink-conf.yaml as we used in 1.4.2: 1.5.6 / 1.6.3 /
1.7.1 all versions timeout while 1.4.2 works.
  - Using 1.5.6 with "mode: legacy" (to switch off flip-6) works
  - Using 1.7.1 with "mode: legacy" gives timeout (I assume this option was
removed but the documentation is outdated?
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#legacy
)

When the timeout happens I get the following stacktrace:

INFO class java.time.Instant does not contain a getter for field seconds
2019-02-18T10:16:56.815+01:00
INFO class com.bol.fin_hdp.cm1.domain.Cm1Transportable does not contain a
getter for field globalId 2019-02-18T10:16:56.815+01:00
INFO Submitting job 5af931bcef395a78b5af2b97e92dcffe (detached: false).
2019-02-18T10:16:57.182+01:00
INFO 
2019-02-18T10:29:27.527+01:00
INFO The program finished with the following exception:
2019-02-18T10:29:27.564+01:00
INFO org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error. 2019-02-18T10:29:27.601+01:00
INFO at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
2019-02-18T10:29:27.638+01:00
INFO at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
2019-02-18T10:29:27.675+01:00
INFO at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
2019-02-18T10:29:27.711+01:00
INFO at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:798)
2019-02-18T10:29:27.747+01:00
INFO at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:289)
2019-02-18T10:29:27.784+01:00
INFO at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
2019-02-18T10:29:27.820+01:00
INFO at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1035)
2019-02-18T10:29:27.857+01:00
INFO at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:)
2019-02-18T10:29:27.893+01:00
INFO at java.security.AccessController.doPrivileged(Native Method)
2019-02-18T10:29:27.929+01:00
INFO at javax.security.auth.Subject.doAs(Subject.java:422)
2019-02-18T10:29:27.968+01:00
INFO at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
2019-02-18T10:29:28.004+01:00
INFO at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
2019-02-18T10:29:28.040+01:00
INFO at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:)
2019-02-18T10:29:28.075+01:00
INFO Caused by: java.lang.RuntimeException:
org.apache.flink.client.program.ProgramInvocationException: Could not
retrieve the execution result. 2019-02-18T10:29:28.110+01:00
INFO at
com.bol.fin_hdp.job.starter.IntervalJobStarter.startJob(IntervalJobStarter.java:43)
2019-02-18T10:29:28.146+01:00
INFO at
com.bol.fin_hdp.job.starter.IntervalJobStarter.startJobWithConfig(IntervalJobStarter.java:32)
2019-02-18T10:29:28.182+01:00
INFO at com.bol.fin_hdp.Main.main(Main.java:8) 2019-02-18T10:29:28.217+01:00
INFO at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2019-02-18T10:29:28.253+01:00
INFO at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2019-02-18T10:29:28.289+01:00
INFO at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2019-02-18T10:29:28.325+01:00
INFO at java.lang.reflect.Method.invoke(Method.java:498)
2019-02-18T10:29:28.363+01:00
INFO at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
2019-02-18T10:29:28.400+01:00
INFO ... 12 more 2019-02-18T10:29:28.436+01:00
INFO Caused by: org.apache.flink.client.program.ProgramInvocationException:
Could not retrieve the execution result. 2019-02-18T10:29:28.473+01:00
INFO at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
2019-02-18T10:29:28.509+01:00
INFO at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
2019-02-18T10:29:28.544+01:00
INFO at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
2019-02-18T10:29:28.581+01:00
INFO at com.bol.fin_hdp.cm1.job.Job.execute(Job.java:54)
2019-02-18T10:29:28.617+01:00
INFO at
com.bol.fin_hdp.job.starter.IntervalJobStarter.startJob(IntervalJobStarter.java:41)
2019-02-18T10:29:28.654+01:00
INFO ... 19 more 2019-02-18T10:29:28.693+01:00
INFO Caused by: org.apache.flink.runtime.client.JobSubmissionException:
Failed to submit JobGraph. 2019-02-18T10:29:28.730+01:00

Implementation error: Unhandled exception - "Implementation error: Unhandled exception."

2018-11-07 Thread Richard Deurwaarder
Hello,

We have a flink job / cluster running in kubernetes. Flink 1.6.2 (but the
same happens in 1.6.0 and 1.6.1) To upgrade our job we use the REST API.

Every so often the jobmanager seems to be stuck in a crashing state and the
logs show me this stack trace:

2018-11-07 18:43:05,815 [flink-scheduler-1] ERROR
org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler -
Implementation error: Unhandled exception.
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#1016927511]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.Implementation error: Unhandled
exception.".
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)

If I restart the jobmanager everything is fine afterwards, but the
jobmanager will not restart by itself.

What might've caused this and is this something we can prevent?

Richard