Re: [QUERY] Multiple elastic search sinks for a single flink pipeline

2020-10-14 Thread Chesnay Schepler
Are the number of sinks fixed? If so, then you can just take the output 
of your map function and apply multiple filters, writing the output of 
each filter into a sync. You could also use a process function with 
side-outputs, and apply a source to each output.


On 10/14/2020 6:05 PM, Vignesh Ramesh wrote:


My requirement is to send the data to a different ES sink (based on 
the data). Ex: If the data contains a particular info send it to sink1 
else send it to sink2 etc(basically send it dynamically to any one 
sink based on the data). I also want to set parallelism separately for 
ES sink1, ES sink2, Es sink3 etc.


|-> Es sink1 (parallelism 4) Kafka -> Map(Transformations) -> ES sink2 
(parallelism 2) -> Es sink3 (parallelism 2)|


Is there any simple way to achieve the above in flink ?

*My solution: (but not satisfied with it)*

I could come up with a solution but there are intermediate kafka 
topics which i write to (topic1,topic2,topic3) and then have separate 
pipelines for Essink1,Essink2 and ESsink3. I want to avoid writing to 
these intermediate kafka topics.


|kafka -> Map(Transformations) -> Kafka topics (Insert into 
topic1,topic2,topic3 based on the data) Kafka topic1 -> 
Essink1(parallelism 4) Kafka topic2 -> Essink2(parallelism 2) Kafka 
topic3 -> |Essink3(parallelism 2)

Regards,
Vignesh





Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-23 Thread Chesnay Schepler
If you do did not specify a different job or cluster id, then yes it 
will read the graph from Zookeeper.

Differentiating different submissions is the very purpose of job ids.

On 23/08/2020 16:38, Alexey Trenikhun wrote:
Let’s say HA is enabled, so this part works. Now we want to upgrade 
job jar, we stop job with save point sp2, change manifest to specify 
“-s sp2” and newer image, and create K8s job again, on start will 
HAServices still read job graph from Zookeeper?



*From:* Chesnay Schepler 
*Sent:* Sunday, August 23, 2020 7:25:11 AM
*To:* Alexey Trenikhun ; Piotr Nowojski 


*Cc:* Flink User Mail List 
*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
If HA is enabled the the cluster will continue from the latest 
externalized checkpoint.

Without HA it still start from the savepoint.

On 23/08/2020 16:18, Alexey Trenikhun wrote:


Let’s say job cluster was submitted as job from save point sp1, so 
spec includes “-s sp1”, job run for days, takin externalized 
checkpoints every 5 minute, then suddenly pod failed, Kubernetes job 
controller restarts job pod using original job spec, which has “-s 
sp1”, so Flink job will start from sp1 rather than from latest 
externalized checkpoint. Is my understanding correct?




*From:* Chesnay Schepler  <mailto:ches...@apache.org>
*Sent:* Sunday, August 23, 2020 1:46:45 AM
*To:* Alexey Trenikhun  <mailto:yen...@msn.com>; 
Piotr Nowojski  <mailto:pnowoj...@apache.org>
*Cc:* Flink User Mail List  
<mailto:user@flink.apache.org>

*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade

A job cluster is submitted as a job, not a deployment.

The built-in Job controller of Kubernetes ensures that this job 
finishes successfully, and if required starts new pods.




On 23/08/2020 06:43, Alexey Trenikhun wrote:
Since it is necessary to use cancel with save point/resume from save 
point, then it is not possible to use Deployment (otherwise 
JobManager pod will restart on crash from same save point), so we 
need to use Job, but in that case ifJob pod is crashed who will 
start new instance of Job pod ? Sounds like currently HA with 
kubernetes is not achievable unless some controller is used to 
manage JobManager. Am I right?


--------
*From:* Chesnay Schepler  
<mailto:ches...@apache.org>

*Sent:* Saturday, August 22, 2020 12:58 AM
*To:* Alexey Trenikhun  <mailto:yen...@msn.com>; 
Piotr Nowojski  <mailto:pnowoj...@apache.org>
*Cc:* Flink User Mail List  
<mailto:user@flink.apache.org>

*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
If, and only if, the cluster-id and JobId are identical then the 
JobGraph will be recovered from ZooKeeper.


On 22/08/2020 06:12, Alexey Trenikhun wrote:
Not sure I that I understand your statement about "the HaServices 
are only being given the JobGraph", seems 
HighAvailabilityServices#getJobGraphStore provides JobGraphStore, 
and potentially implementation of 
JobGraphStore#recoverJobGraph(JobID jobId) for this store could 
build new graph for jar rather than read stored graph from ZooKeeper?


Also, if there is single job with same job-id (job cluster), 
jobgraph of failed job will be over written by new one which will 
have same job-id?


--------
*From:* Chesnay Schepler  
<mailto:ches...@apache.org>

*Sent:* Friday, August 21, 2020 12:16 PM
*To:* Alexey Trenikhun  <mailto:yen...@msn.com>; 
Piotr Nowojski  <mailto:pnowoj...@apache.org>
*Cc:* Flink User Mail List  
<mailto:user@flink.apache.org>

*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
The HaServices are only being given the JobGraph, to this is not 
possible.


Actually I have to correct myself. For a job cluster the state in 
HA should be irrelevant when you're submitting another jar.
Flink has no way of knowing that this jar is in any way connected 
to the previous job; they will be treated as separate things.


However, you will likely end up with stale data in zookeeper (the 
jobgraph of the failed job).


On 21/08/2020 17:51, Alexey Trenikhun wrote:
Is it feasible to override ZooKeeperHaServices to recreate 
JobGraph from jar instead of reading it from ZK state. Any hints? 
I have feeling that reading JobGraph from jar is more resilient 
approach, less chances of mistakes during upgrade


Thanks,
Alexey


*From:* Piotr Nowojski  
<mailto:pnowoj...@apache.org>

*Sent:* Thursday, August 20, 2020 7:04 AM
*To:* Chesnay Schepler  
<mailto:ches...@apache.org>
*Cc:* Alexey Trenikhun  <mailto:yen...@msn.com>; 
Flink User Mail List  
<mailto:user@flink.apache.org>

Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-23 Thread Chesnay Schepler
If HA is enabled the the cluster will continue from the latest 
externalized checkpoint.

Without HA it still start from the savepoint.

On 23/08/2020 16:18, Alexey Trenikhun wrote:


Let’s say job cluster was submitted as job from save point sp1, so 
spec includes “-s sp1”, job run for days, takin externalized 
checkpoints every 5 minute, then suddenly pod failed, Kubernetes job 
controller restarts job pod using original job spec, which has “-s 
sp1”, so Flink job will start from sp1 rather than from latest 
externalized checkpoint. Is my understanding correct?




*From:* Chesnay Schepler 
*Sent:* Sunday, August 23, 2020 1:46:45 AM
*To:* Alexey Trenikhun ; Piotr Nowojski 


*Cc:* Flink User Mail List 
*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade

A job cluster is submitted as a job, not a deployment.

The built-in Job controller of Kubernetes ensures that this job 
finishes successfully, and if required starts new pods.




On 23/08/2020 06:43, Alexey Trenikhun wrote:
Since it is necessary to use cancel with save point/resume from save 
point, then it is not possible to use Deployment (otherwise 
JobManager pod will restart on crash from same save point), so we 
need to use Job, but in that case ifJob pod is crashed who will start 
new instance of Job pod ? Sounds like currently HA with kubernetes is 
not achievable unless some controller is used to manage JobManager. 
Am I right?



*From:* Chesnay Schepler  <mailto:ches...@apache.org>
*Sent:* Saturday, August 22, 2020 12:58 AM
*To:* Alexey Trenikhun  <mailto:yen...@msn.com>; 
Piotr Nowojski  <mailto:pnowoj...@apache.org>
*Cc:* Flink User Mail List  
<mailto:user@flink.apache.org>

*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
If, and only if, the cluster-id and JobId are identical then the 
JobGraph will be recovered from ZooKeeper.


On 22/08/2020 06:12, Alexey Trenikhun wrote:
Not sure I that I understand your statement about "the HaServices 
are only being given the JobGraph", seems 
HighAvailabilityServices#getJobGraphStore provides JobGraphStore, 
and potentially implementation of 
JobGraphStore#recoverJobGraph(JobID jobId) for this store could 
build new graph for jar rather than read stored graph from ZooKeeper?


Also, if there is single job with same job-id (job cluster), 
jobgraph of failed job will be over written by new one which will 
have same job-id?


--------
*From:* Chesnay Schepler  
<mailto:ches...@apache.org>

*Sent:* Friday, August 21, 2020 12:16 PM
*To:* Alexey Trenikhun  <mailto:yen...@msn.com>; 
Piotr Nowojski  <mailto:pnowoj...@apache.org>
*Cc:* Flink User Mail List  
<mailto:user@flink.apache.org>

*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
The HaServices are only being given the JobGraph, to this is not 
possible.


Actually I have to correct myself. For a job cluster the state in HA 
should be irrelevant when you're submitting another jar.
Flink has no way of knowing that this jar is in any way connected to 
the previous job; they will be treated as separate things.


However, you will likely end up with stale data in zookeeper (the 
jobgraph of the failed job).


On 21/08/2020 17:51, Alexey Trenikhun wrote:
Is it feasible to override ZooKeeperHaServices to recreate JobGraph 
from jar instead of reading it from ZK state. Any hints? I have 
feeling that reading JobGraph from jar is more resilient approach, 
less chances of mistakes during upgrade


Thanks,
Alexey


*From:* Piotr Nowojski  
<mailto:pnowoj...@apache.org>

*Sent:* Thursday, August 20, 2020 7:04 AM
*To:* Chesnay Schepler  <mailto:ches...@apache.org>
*Cc:* Alexey Trenikhun  <mailto:yen...@msn.com>; 
Flink User Mail List  
<mailto:user@flink.apache.org>

*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
Thank you for the clarification Chesney and sorry for the incorrect 
previous answer.


Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler <mailto:ches...@apache.org>> napisał(a):


This is incorrect; we do store the JobGraph in ZooKeeper. If
you just delete the deployment the cluster will recover the
previous JobGraph (assuming you aren't changing the Zookeeper
configuration).

If you wish to update the job, then you should cancel it (along
with creating a savepoint), which will clear the Zookeeper
state, and then create a new deployment

On 20/08/2020 15:43, Piotr Nowojski wrote:

Hi Alexey,

I might be wrong (I don't know this side of Flink very well),
but as far as I know JobGraph is never stored in the ZK. It's
always recreated from the job's JAR. So you s

Re: [DISCUSS] Remove Kafka 0.10.x connector (and possibly 0.11.x)

2020-08-25 Thread Chesnay Schepler

+1 to remove both the 1.10 and 1.11 connectors.

The connectors have not been actively developed for some time. They are 
basically just sitting around causing noise by causing test 
instabilities and eating CI time.
It would  also allow us to really simplify the module structure of the 
Kafka connectors.


Users may continue to use the 1.11 version of the connectors with future 
Flink versions, and we may even provide critical bug fixes in a 1.11 
bugfix release (albeit unlikely).


While ultimately this is a separate topic I would also be in favor of 
removing any migration paths we have from 0.11 to the universal connector;
as these are already present in 1.11 users may migrate to the universal 
connector before jumping to Flink 1.12+.


On 25/08/2020 18:49, Konstantin Knauf wrote:

Hi Aljoscha,

I am assuming you're asking about dropping the 
flink-connector-kafka-0.10/0.11 modules, right? Or are you talking 
about removing support for Kafka 0.10/0.11 from the universal connector?


I am in favor of removing flink-connector-kafka-0.10/0.11 in the next 
release. These modules would still be available in Flink 1.11- as a 
reference, and could be used with Flink 1.12+ with small or no 
modifications. To my knowledge, you also use the universal Kafka 
connector with 0.10 brokers, but there might be a performance 
penalty if I remember correctly. In general, I find it important 
to continuously reduce baggage that accumulates over time and this 
seems like a good opportunity.


Best,

Konstantin

On Tue, Aug 25, 2020 at 4:59 AM Paul Lam > wrote:


Hi Aljoscha,

I'm lightly leaning towards keeping the 0.10 connector, for Kafka
0.10 still has a steady user base in my observation.

But if we drop 0.10 connector, can we ensure the users would be
able to smoothly migrate to 0.11 connector/universal connector?

If I remember correctly, the universal connector is compatible
with 0.10 brokers, but I want to double check that.

Best,
Paul Lam


2020年8月24日 22:46,Aljoscha Krettek mailto:aljos...@apache.org>> 写道:

Hi all,

this thought came up on FLINK-17260 [1] but I think it would be a
good idea in general. The issue reminded us that Kafka didn't
have an idempotent/fault-tolerant Producer before Kafka 0.11.0.
By now we have had the "modern" Kafka connector that roughly
follows new Kafka releases for a while and this one supports
Kafka cluster versions as far back as 0.10.2.0 (I believe).

What are your thoughts on removing support for older Kafka
versions? And yes, I know that we had multiple discussions like
this in the past but I'm trying to gauge the current sentiment.

I'm cross-posting to the user-ml since this is important for both
users and developers.

Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-17260




--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk





Re: Loading FlinkKafkaProducer fails with LinkError

2020-08-25 Thread Chesnay Schepler
The simplest answer is that they are in fact not equal; maybe it is a 
jar of an older version of your setup?


Can you give some details on the NoSuchMethodException? Specifically 
whether it tries to access something from the Kafka connector, or from 
your own user code.


On 25/08/2020 21:27, Yuval Itzchakov wrote:
OK, I think I figured it out. It looks like the uber-jar is also being 
placed under `lib`, which is probably the cause of the problem.


Question is, why does it identify it as two different versions? It's 
exactly the same JAR.


On Tue, Aug 25, 2020 at 10:22 PM Yuval Itzchakov <mailto:yuva...@gmail.com>> wrote:


I'm afraid it's not being printed out due to different log levels :(

Yes, I build the image myself. It takes the tar file from
https://archive.apache.org/dist/flink/flink-1.9.0/
<https://archive.apache.org/dist/flink/flink-1.9.1/> and unpacks
it into the image.
I've ran:

find . -iname "*.jar" | xargs -n 1 jar tf | grep -i producerrecord
find . -iname "*.jar" | xargs -n 1 jar tf | grep -i kafka

Both from within /lib, they both produce no results.

    On Tue, Aug 25, 2020 at 10:07 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

The NoSuchMethodException shows that the class is still on the
classpath, but with a different version than your code is
expecting. Otherwise you would've gotten a different error.
This implies that there are 2 versions of the kafka
dependencies on the classpath in your original run; it
suddenly working with parent-first classloading reinforces the
suspicion that they are present in the distribution.

As Arvid mentioned, the classpath log entry (at the very start
of the log file) would be interesting.

Did you build the Flink yourself distribution, or are you
relying on one of the existing Flink binaries/images?

On 25/08/2020 20:51, Yuval Itzchakov wrote:

Hi Arvid,
I'm running Flink in a job cluster on k8s using the Lyft
Operator.

The flink image that I'm building does not have the
flink-connector-kafka library in it's JAR, I've made sure of
this using `jar -tf`. Additionally, once I removed the
dependency from my uber jar, it failed with a
"NoSuchMethodException" at runtime for one of the arbitrary
methods.

I used classloader.resolve-order: parent-first and it
resolved the issue somehow. I still don't know why though.

On Tue, Aug 25, 2020 at 6:13 PM Arvid Heise
mailto:ar...@ververica.com>> wrote:

Hi Yuval,

How do you execute Flink? Can you show us the log entry
with the classpath?

I'm guessing that you have Kafka bundled in your uber-jar
and additionally also have the connector in
flink-dist/lib. If so, you simply need to remove it in
one place. In general, if you use flink-dist, you'd not
bundle any Flink dependencies in your uber-jar (use
provided scope for them).

If you have everything bundled in one uber-jar and
execute it somehow without flink-dist, then I don't
immediately see a solution. Then the log with the
classpath would help.

Best,

Arvid


On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov
mailto:yuva...@gmail.com>> wrote:

Hi,
I'm trying to load a FlinkKafkaProducer sink
alongside another custom sink. While trying to restore
a running Flink app from the previous state, I get
the error message below.
I am running Flink 1.9.0 with the following SBT
dependency added:
"org.apache.flink" %% "flink-connector-kafka" % 1.9.0
And the app is deployed via a standard uber jar with
all the dependencies. W
Would appreciate the help
java.lang.LinkageError: loader constraint violation:
loader (instance of
org/apache/flink/util/ChildFirstClassLoader)
previously initiated loading for a different type
with name
"org/apache/kafka/clients/producer/ProducerRecord"
at java.lang.ClassLoader.defineClass1(Native Method)
at
java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at

java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net

<http://java.net>.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net

<http://java.net>.URLClassL

Re: Loading FlinkKafkaProducer fails with LinkError

2020-08-25 Thread Chesnay Schepler
The NoSuchMethodException shows that the class is still on the 
classpath, but with a different version than your code is expecting. 
Otherwise you would've gotten a different error.
This implies that there are 2 versions of the kafka dependencies on the 
classpath in your original run; it suddenly working with parent-first 
classloading reinforces the suspicion that they are present in the 
distribution.


As Arvid mentioned, the classpath log entry (at the very start of the 
log file) would be interesting.


Did you build the Flink yourself distribution, or are you relying on one 
of the existing Flink binaries/images?


On 25/08/2020 20:51, Yuval Itzchakov wrote:

Hi Arvid,
I'm running Flink in a job cluster on k8s using the Lyft Operator.

The flink image that I'm building does not have the 
flink-connector-kafka library in it's JAR, I've made sure of this 
using `jar -tf`. Additionally, once I removed the dependency from my 
uber jar, it failed with a "NoSuchMethodException" at runtime for one 
of the arbitrary methods.


I used classloader.resolve-order: parent-first and it resolved the 
issue somehow. I still don't know why though.


On Tue, Aug 25, 2020 at 6:13 PM Arvid Heise > wrote:


Hi Yuval,

How do you execute Flink? Can you show us the log entry with the
classpath?

I'm guessing that you have Kafka bundled in your uber-jar and
additionally also have the connector in flink-dist/lib. If so, you
simply need to remove it in one place. In general, if you use
flink-dist, you'd not bundle any Flink dependencies in your
uber-jar (use provided scope for them).

If you have everything bundled in one uber-jar and execute it
somehow without flink-dist, then I don't immediately see a
solution. Then the log with the classpath would help.

Best,

Arvid


On Sun, Aug 23, 2020 at 1:37 PM Yuval Itzchakov mailto:yuva...@gmail.com>> wrote:

Hi,
I'm trying to load a FlinkKafkaProducer sink alongside another
custom sink. While trying to restore
a running Flink app from the previous state, I get the error
message below.
I am running Flink 1.9.0 with the following SBT dependency added:
"org.apache.flink" %% "flink-connector-kafka" % 1.9.0
And the app is deployed via a standard uber jar with all the
dependencies. W
Would appreciate the help
java.lang.LinkageError: loader constraint violation: loader
(instance of org/apache/flink/util/ChildFirstClassLoader)
previously initiated loading for a different type with name
"org/apache/kafka/clients/producer/ProducerRecord"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net
.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net
.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net
.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net
.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net
.URLClassLoader.findClass(URLClassLoader.java:362)
at

org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethod(Class.java:2128)
at java.io

.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
at java.io

.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
at java.io
.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
at java.io
.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
at java.security.AccessController.doPrivileged(Native Method)
at java.io
.ObjectStreamClass.(ObjectStreamClass.java:494)
at java.io
.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
at java.io

.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
at java.io

.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io

.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io


Re: The frequency flink push metrics to pushgateway?

2020-08-28 Thread Chesnay Schepler
You can configure the reporter interval; please see this example 
.


On 28/08/2020 08:49, wangl...@geekplus.com wrote:


Using prometheus and pushgateway to monitor my flink cluster.

For self defined  metrics, counter.inc() will be called for every 
invoke method


I want to know when the actual  counter number is pushed to pushgateway?
Is is a fixed frequency?  If i can set the frequency?

Thanks,
Lei

wangl...@geekplus.com 





Re: Default Flink Metrics Graphite

2020-08-26 Thread Chesnay Schepler
metrics.reporter.grph.class: 
org.apache.flink.metrics.graphite.GraphiteReporter


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#graphite-orgapacheflinkmetricsgraphitegraphitereporter

On 26/08/2020 16:40, Vijayendra Yadav wrote:

Hi Dawid,

I have 1.10.0 version of flink. What is alternative for this version ?

Regards,
Vijay



On Aug 25, 2020, at 11:44 PM, Dawid Wysakowicz 
 wrote:




Hi Vijay,

I think the problem might be that you are using a wrong version of 
the reporter.


You say you used flink-metrics-graphite-1.10.0.jar from 1.10 as a 
plugin, but it was migrated to plugins in 1.11 only[1].


I'd recommend trying it out with the same 1.11 version of Flink and 
Graphite reporter.


Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-16965

On 26/08/2020 08:04, Vijayendra Yadav wrote:

Hi Nikola,

To rule out any other cluster issues, I have tried it in my local 
now. Steps as follows, but don't see any metrics yet.


1) Set up local Graphite

|docker run -d\ --name graphite\ --restart=always\ -p 80:80\ -p 
2003-2004:2003-2004\ -p 2023-2024:2023-2024\ -p 8125:8125/udp\ -p 
8126:8126\ graphiteapp/graphite-statsd|



  Mapped Ports

HostContainer   Service
80  80  nginx 
2003 	2003 	carbon receiver - plaintext 
 

2004 	2004 	carbon receiver - pickle 
 

2023 	2023 	carbon aggregator - plaintext 
 

2024 	2024 	carbon aggregator - pickle 
 


80808080Graphite internal gunicorn port (without Nginx proxying).
8125 	8125 	statsd 

8126 	8126 	statsd admin 



2) WebUI:





3) Run Flink example Job.
./bin/flink run 
./examples/flink-examples-streaming_2.11-1.11-SNAPSHOT-SocketWindowWordCount.jar 
--port 


with conf/flink-conf.yaml set as:

metrics.reporter.grph.factory.class: 
org.apache.flink.metrics.graphite.GraphiteReporterFactory

metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP
metrics.reporter.grph.interval: 1 SECONDS

and graphite jar:

plugins/flink-metrics-graphite/flink-metrics-graphite-1.10.0.jar


4) Can't see any activity in webui graphite.


Could you review and let me know what is wrong here ? any other way 
you suggest to be able to view the raw metrics data ?
Also, do you have sample metrics raw format, you can share from any 
other project.


Regards,
Vijay




On Sun, Aug 23, 2020 at 9:26 PM Nikola Hrusov > wrote:


Hi Vijay,

Your steps look correct to me.
Perhaps you can double check that the graphite port you are
sending is correct? THe default carbon port is 2003 and if you
use the aggregator it is 2023.

You should be able to see in both flink jobmanager and
taskmanager that the metrics have been initialized with the
config you have pasted.

Regards
,
Nikola Hrusov


On Mon, Aug 24, 2020 at 5:00 AM Vijayendra Yadav
mailto:contact@gmail.com>> wrote:

Hi Team,

I am trying  to export Flink stream default metrics using
Graphite, but I can't find it in the Graphite metrics
console.  Could you confirm the steps below are correct?

*1) Updated flink-conf.yaml*

metrics.reporter.grph.factory.class:
org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.grph.host: port
metrics.reporter.grph.port: 9109
metrics.reporter.grph.protocol: TCP
metrics.reporter.grph.interval: 30 SECONDS

2) Added Graphite jar in plugin folder :

ll */usr/lib/flink/plugins/metric/*
*flink-metrics-graphite-1.10.0.jar*

3) Looking metrics in graphite server:

http://port:8080/metrics 

Note: No code change is done.

Regards,
Vijay






Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-21 Thread Chesnay Schepler

The HaServices are only being given the JobGraph, to this is not possible.

Actually I have to correct myself. For a job cluster the state in HA 
should be irrelevant when you're submitting another jar.
Flink has no way of knowing that this jar is in any way connected to the 
previous job; they will be treated as separate things.


However, you will likely end up with stale data in zookeeper (the 
jobgraph of the failed job).


On 21/08/2020 17:51, Alexey Trenikhun wrote:
Is it feasible to override ZooKeeperHaServices to recreate JobGraph 
from jar instead of reading it from ZK state. Any hints? I have 
feeling that reading JobGraph from jar is more resilient approach, 
less chances of mistakes during upgrade


Thanks,
Alexey


*From:* Piotr Nowojski 
*Sent:* Thursday, August 20, 2020 7:04 AM
*To:* Chesnay Schepler 
*Cc:* Alexey Trenikhun ; Flink User Mail List 


*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
Thank you for the clarification Chesney and sorry for the incorrect 
previous answer.


Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler <mailto:ches...@apache.org>> napisał(a):


This is incorrect; we do store the JobGraph in ZooKeeper. If you
just delete the deployment the cluster will recover the previous
JobGraph (assuming you aren't changing the Zookeeper configuration).

If you wish to update the job, then you should cancel it (along
with creating a savepoint), which will clear the Zookeeper state,
and then create a new deployment

On 20/08/2020 15:43, Piotr Nowojski wrote:

Hi Alexey,

I might be wrong (I don't know this side of Flink very well), but
as far as I know JobGraph is never stored in the ZK. It's always
recreated from the job's JAR. So you should be able to upgrade
the job by replacing the JAR with a newer version, as long as the
operator UIDs are the same before and after the upgrade (for
operator state to match before and after the upgrade).

Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun mailto:yen...@msn.com>> napisał(a):

Hello,

Let's say I run Flink Job cluster with persistent storage and
Zookeeper HA on k8s with single  JobManager and use
externalized checkpoints. When JM crashes, k8s will restart
JM pod, and JM will read JobId and JobGraph from ZK and
restore from latest checkpoint. Now let's say I want to
upgrade job binary, I delete deployments, create new
deployments referring to newer image, will JM still read
JobGraph from ZK or will create new one from new job jar?

Thanks,
Alexey







Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-22 Thread Chesnay Schepler
If, and only if, the cluster-id and JobId are identical then the 
JobGraph will be recovered from ZooKeeper.


On 22/08/2020 06:12, Alexey Trenikhun wrote:
Not sure I that I understand your statement about "the HaServices are 
only being given the JobGraph", seems 
HighAvailabilityServices#getJobGraphStore provides JobGraphStore, and 
potentially implementation of JobGraphStore#recoverJobGraph(JobID 
jobId) for this store could build new graph for jar rather than read 
stored graph from ZooKeeper?


Also, if there is single job with same job-id (job cluster), jobgraph 
of failed job will be over written by new one which will have same job-id?


----
*From:* Chesnay Schepler 
*Sent:* Friday, August 21, 2020 12:16 PM
*To:* Alexey Trenikhun ; Piotr Nowojski 


*Cc:* Flink User Mail List 
*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
The HaServices are only being given the JobGraph, to this is not possible.

Actually I have to correct myself. For a job cluster the state in HA 
should be irrelevant when you're submitting another jar.
Flink has no way of knowing that this jar is in any way connected to 
the previous job; they will be treated as separate things.


However, you will likely end up with stale data in zookeeper (the 
jobgraph of the failed job).


On 21/08/2020 17:51, Alexey Trenikhun wrote:
Is it feasible to override ZooKeeperHaServices to recreate JobGraph 
from jar instead of reading it from ZK state. Any hints? I have 
feeling that reading JobGraph from jar is more resilient approach, 
less chances of mistakes during upgrade


Thanks,
Alexey


*From:* Piotr Nowojski  
<mailto:pnowoj...@apache.org>

*Sent:* Thursday, August 20, 2020 7:04 AM
*To:* Chesnay Schepler  <mailto:ches...@apache.org>
*Cc:* Alexey Trenikhun  <mailto:yen...@msn.com>; 
Flink User Mail List  
<mailto:user@flink.apache.org>

*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
Thank you for the clarification Chesney and sorry for the incorrect 
previous answer.


Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler <mailto:ches...@apache.org>> napisał(a):


This is incorrect; we do store the JobGraph in ZooKeeper. If you
just delete the deployment the cluster will recover the previous
JobGraph (assuming you aren't changing the Zookeeper configuration).

If you wish to update the job, then you should cancel it (along
with creating a savepoint), which will clear the Zookeeper state,
and then create a new deployment

On 20/08/2020 15:43, Piotr Nowojski wrote:

Hi Alexey,

I might be wrong (I don't know this side of Flink very well),
but as far as I know JobGraph is never stored in the ZK. It's
always recreated from the job's JAR. So you should be able to
upgrade the job by replacing the JAR with a newer version, as
long as the operator UIDs are the same before and after the
upgrade (for operator state to match before and after the upgrade).

Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun mailto:yen...@msn.com>> napisał(a):

Hello,

Let's say I run Flink Job cluster with persistent storage
and Zookeeper HA on k8s with single  JobManager and use
externalized checkpoints. When JM crashes, k8s will restart
JM pod, and JM will read JobId and JobGraph from ZK and
restore from latest checkpoint. Now let's say I want to
upgrade job binary, I delete deployments, create new
deployments referring to newer image, will JM still read
JobGraph from ZK or will create new one from new job jar?

Thanks,
Alexey









Re: AWS EMR deployment error : NoClassDefFoundError org/apache/flink/api/java/typeutils/ResultTypeQueryable

2020-08-21 Thread Chesnay Schepler
If this class cannot be found on the classpath then chances are Flink is 
completely missing from the classpath.


I haven't worked with EMR, but my guess is that you did not submit 
things correctly.


From the EMR documentation I could gather that the submission should 
work without the submitted jar bundling all of Flink;


given that you jar works in a local cluster that part should not be the 
problem.



On 21/08/2020 08:16, Manas Kale wrote:

Hi,
I am trying to deploy a Flink jar on AWS EMR service. I have ensured 
that Flink v1.10.0 is used in my pom file as that's the 
version supported by EMR. However, I get the following error:

Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/flink/api/java/typeutils/ResultTypeQueryable
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.util.RunJar.run(RunJar.java:232)
at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.api.java.typeutils.ResultTypeQueryable
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 15 more
Also, if I deploy this on my local Flink cluster (v1.10.1) it works.
I'm not sure what could be the cause. Could it be because of 
misconfigured classes bundled in the final JAR file or something that 
was patched in v 1.10.1?






Re: Debezium Flink EMR

2020-08-21 Thread Chesnay Schepler

@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:

Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, 
however, EMR only supports Flink 1.10.0, whereas Debezium Connector 
arrived in Flink 1.11.0, from looking at the documentation.


https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html

I'm wondering what alternative solutions are available for connecting 
Debezium to Flink? Is there an open source Debezium connector that 
works with Flink 1.10.0? Could I potentially pull the code out for the 
1.11.0 Debezium connector and compile it in my project using Flink 
1.10.0 api?


For context, I plan on doing some fairly complicated long lived 
stateful joins / materialization using the Table API over data 
ingested from Postgres and possibly MySQL.


Appreciate any help, thanks!

--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 







Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-20 Thread Chesnay Schepler
This is incorrect; we do store the JobGraph in ZooKeeper. If you just 
delete the deployment the cluster will recover the previous JobGraph 
(assuming you aren't changing the Zookeeper configuration).


If you wish to update the job, then you should cancel it (along with 
creating a savepoint), which will clear the Zookeeper state, and then 
create a new deployment


On 20/08/2020 15:43, Piotr Nowojski wrote:

Hi Alexey,

I might be wrong (I don't know this side of Flink very well), but as 
far as I know JobGraph is never stored in the ZK. It's always 
recreated from the job's JAR. So you should be able to upgrade the job 
by replacing the JAR with a newer version, as long as the operator 
UIDs are the same before and after the upgrade (for operator state to 
match before and after the upgrade).


Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun > napisał(a):


Hello,

Let's say I run Flink Job cluster with persistent storage and
Zookeeper HA on k8s with single  JobManager and use externalized
checkpoints. When JM crashes, k8s will restart JM pod, and JM will
read JobId and JobGraph from ZK and restore from latest
checkpoint. Now let's say I want to upgrade job binary, I delete
deployments, create new deployments referring to newer image, will
JM still read JobGraph from ZK or will create new one from new job
jar?

Thanks,
Alexey





Re: No space left on device exception

2020-08-20 Thread Chesnay Schepler

That should work as well.

On 20/08/2020 22:46, Vishwas Siravara wrote:

Thank you Chesnay.
Yes but I could change the staging directory by adding 
-Djava.io.tmpdir=/data/flink-1.7.2/tmp to /env.java.opts /in the 
flink-conf.yaml file. Do you see any problem with that?


Best,
Vishwas

On Thu, Aug 20, 2020 at 2:01 PM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


Could you try adding this to your flink-conf.yaml?

s3.staging-directory:/usr/mware/flink/tmp

On 20/08/2020 20:50, Vishwas Siravara wrote:

Hi Piotr,
I did some analysis and realised that the temp files for s3
checkpoints are staged in /tmp although the /io.tmp.dirs /is set
to a different directory.

ls -lrth
drwxr-xr-x. 2 was  was 32 Aug 20 17:52 hsperfdata_was
-rw---. 1 was  was   505M Aug 20 18:45 
presto-s3-8158855975833379228.tmp
-rw---. 1 was  was   505M Aug 20 18:45 
presto-s3-7048419193714606532.tmp
drwxr-xr--. 2 root root 6 Aug 20 18:46 hsperfdata_root
[was@sl73rspapd031 tmp]$
flink-conf.yaml configuration
io.tmp.dirs: /usr/mware/flink/tmp
The /tmp has only 2GB, is it possible to change the staging
directory for s3 checkpoints ?
Best,
Vishwas

On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara
mailto:vsirav...@gmail.com>> wrote:

Hi Piotr,
Thank you for your suggestion. I will try that, are the
temporary files created in the directory set in
/io.tmp.dirs/ in the flink-conf.yaml ? Would these files be
the same size as checkpoints ?


Thanks,
Vishwas

On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski
mailto:pnowoj...@apache.org>> wrote:

Hi,

As far as I know when uploading a file to S3, the writer
needs to first create some temporary files on the local
disks. I would suggest to double check all of the
partitions on the local machine and monitor available
disk space continuously while the job is running. If you
are just checking the free space manually, you can
easily miss a point of time when you those temporary
files are too big and approaching the available disk
space usage, as I'm pretty sure those temporary files are
cleaned up immediately after throwing this exception that
you see.

Piotrek

czw., 20 sie 2020 o 00:56 Vishwas Siravara
mailto:vsirav...@gmail.com>>
napisał(a):

Hi guys,
I have a deduplication job that runs on flink 1.7,
that has some state which uses FsState backend. My TM
heap size is 16 GB. I see the below error while
trying to checkpoint a state of size 2GB. There is
enough space available in s3, I tried to upload
larger files and they were all successful. There is
also enough disk space in the local file system, the
disk utility tool does not show anything suspicious.
Whenever I try to start my job from the last
successful checkpoint , it runs into the same error.
Can someone tell me what is the cause of this issue?
Many thanks.


Note: This error goes away when I delete io.tmp.dirs
and restart the job from last checkpoint , but the
disk utility tool does not show much usage before
deletion, so I am not able to figure out what
the problem is.

2020-08-19 21:12:01,909 WARN

org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory
- Could not close the state stream for
s3p://featuretoolkit.c

heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353.
1363 java.io.IOException: No space left on device
1364 at java.io.FileOutputStream.writeBytes(Native
Method)
1365 at
java.io.FileOutputStream.write(FileOutputStream.java:326)
1366 at

java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
1367 at

java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
1368 at
java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
1369 at
java.io.FilterOutputStream.close(FilterOutputStream.java:158)
1370 at

org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986)
1371 at

org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutpu

Re: No space left on device exception

2020-08-20 Thread Chesnay Schepler

Could you try adding this to your flink-conf.yaml?

s3.staging-directory:/usr/mware/flink/tmp

On 20/08/2020 20:50, Vishwas Siravara wrote:

Hi Piotr,
I did some analysis and realised that the temp files for s3 
checkpoints are staged in /tmp although the /io.tmp.dirs /is set to a 
different directory.


ls -lrth
drwxr-xr-x. 2 was  was 32 Aug 20 17:52 hsperfdata_was
-rw---. 1 was  was   505M Aug 20 18:45 presto-s3-8158855975833379228.tmp
-rw---. 1 was  was   505M Aug 20 18:45 presto-s3-7048419193714606532.tmp
drwxr-xr--. 2 root root 6 Aug 20 18:46 hsperfdata_root
[was@sl73rspapd031 tmp]$
flink-conf.yaml configuration
io.tmp.dirs: /usr/mware/flink/tmp
The /tmp has only 2GB, is it possible to change the staging directory 
for s3 checkpoints ?

Best,
Vishwas

On Thu, Aug 20, 2020 at 10:27 AM Vishwas Siravara > wrote:


Hi Piotr,
Thank you for your suggestion. I will try that, are the temporary
files created in the directory set in /io.tmp.dirs/ in the
flink-conf.yaml ? Would these files be the same size as checkpoints ?


Thanks,
Vishwas

On Thu, Aug 20, 2020 at 8:35 AM Piotr Nowojski
mailto:pnowoj...@apache.org>> wrote:

Hi,

As far as I know when uploading a file to S3, the writer needs
to first create some temporary files on the local disks. I
would suggest to double check all of the partitions on the
local machine and monitor available disk space continuously
while the job is running. If you are just checking the free
space manually, you can easily miss a point of time when you
those temporary files are too big and approaching the
available disk space usage, as I'm pretty sure those temporary
files are cleaned up immediately after throwing this exception
that you see.

Piotrek

czw., 20 sie 2020 o 00:56 Vishwas Siravara
mailto:vsirav...@gmail.com>> napisał(a):

Hi guys,
I have a deduplication job that runs on flink 1.7, that
has some state which uses FsState backend. My TM heap size
is 16 GB. I see the below error while trying to checkpoint
a state of size 2GB. There is enough space available in
s3, I tried to upload larger files and they were all
successful. There is also enough disk space in the local
file system, the disk utility tool does not show anything
suspicious. Whenever I try to start my job from the last
successful checkpoint , it runs into the same error. Can
someone tell me what is the cause of this issue? Many thanks.


Note: This error goes away when I delete io.tmp.dirs and
restart the job from last checkpoint , but the disk
utility tool does not show much usage before deletion, so
I am not able to figure out what the problem is.

2020-08-19 21:12:01,909 WARN
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory
- Could not close the state stream for
s3p://featuretoolkit.c

heckpoints/dev_dedup/9b64aafadcd6d367cfedef84706abcba/chk-189/f8e668dd-8019-4830-ab12-d48940ff5353.
1363 java.io.IOException: No space left on device
1364 at java.io.FileOutputStream.writeBytes(Native Method)
1365 at
java.io.FileOutputStream.write(FileOutputStream.java:326)
1366 at

java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
1367 at
java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
1368 at
java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
1369 at
java.io.FilterOutputStream.close(FilterOutputStream.java:158)
1370 at

org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:986)
1371 at

org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
1372 at

org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
1373 at

org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
1374 at

org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
1375 at

org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:269)
1376 at

org.apache.flink.runtime.state.CheckpointStreamWithResultProvider.close(CheckpointStreamWithResultProvider.java:58)
1377 at

Re: Flink Job cluster in HA mode - recovery vs upgrade

2020-08-23 Thread Chesnay Schepler

A job cluster is submitted as a job, not a deployment.

The built-in Job controller of Kubernetes ensures that this job finishes 
successfully, and if required starts new pods.




On 23/08/2020 06:43, Alexey Trenikhun wrote:
Since it is necessary to use cancel with save point/resume from save 
point, then it is not possible to use Deployment (otherwise JobManager 
pod will restart on crash from same save point), so we need to use 
Job, but in that case ifJob pod is crashed who will start new instance 
of Job pod ? Sounds like currently HA with kubernetes is not 
achievable unless some controller is used to manage JobManager. Am I 
right?



*From:* Chesnay Schepler 
*Sent:* Saturday, August 22, 2020 12:58 AM
*To:* Alexey Trenikhun ; Piotr Nowojski 


*Cc:* Flink User Mail List 
*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
If, and only if, the cluster-id and JobId are identical then the 
JobGraph will be recovered from ZooKeeper.


On 22/08/2020 06:12, Alexey Trenikhun wrote:
Not sure I that I understand your statement about "the HaServices are 
only being given the JobGraph", seems 
HighAvailabilityServices#getJobGraphStore provides JobGraphStore, and 
potentially implementation of JobGraphStore#recoverJobGraph(JobID 
jobId) for this store could build new graph for jar rather than read 
stored graph from ZooKeeper?


Also, if there is single job with same job-id (job cluster), jobgraph 
of failed job will be over written by new one which will have same 
job-id?


----
*From:* Chesnay Schepler  <mailto:ches...@apache.org>
*Sent:* Friday, August 21, 2020 12:16 PM
*To:* Alexey Trenikhun  <mailto:yen...@msn.com>; 
Piotr Nowojski  <mailto:pnowoj...@apache.org>
*Cc:* Flink User Mail List  
<mailto:user@flink.apache.org>

*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
The HaServices are only being given the JobGraph, to this is not 
possible.


Actually I have to correct myself. For a job cluster the state in HA 
should be irrelevant when you're submitting another jar.
Flink has no way of knowing that this jar is in any way connected to 
the previous job; they will be treated as separate things.


However, you will likely end up with stale data in zookeeper (the 
jobgraph of the failed job).


On 21/08/2020 17:51, Alexey Trenikhun wrote:
Is it feasible to override ZooKeeperHaServices to recreate JobGraph 
from jar instead of reading it from ZK state. Any hints? I have 
feeling that reading JobGraph from jar is more resilient approach, 
less chances of mistakes during upgrade


Thanks,
Alexey


*From:* Piotr Nowojski  
<mailto:pnowoj...@apache.org>

*Sent:* Thursday, August 20, 2020 7:04 AM
*To:* Chesnay Schepler  <mailto:ches...@apache.org>
*Cc:* Alexey Trenikhun  <mailto:yen...@msn.com>; 
Flink User Mail List  
<mailto:user@flink.apache.org>

*Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
Thank you for the clarification Chesney and sorry for the incorrect 
previous answer.


Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler <mailto:ches...@apache.org>> napisał(a):


This is incorrect; we do store the JobGraph in ZooKeeper. If you
just delete the deployment the cluster will recover the previous
JobGraph (assuming you aren't changing the Zookeeper configuration).

If you wish to update the job, then you should cancel it (along
with creating a savepoint), which will clear the Zookeeper
state, and then create a new deployment

On 20/08/2020 15:43, Piotr Nowojski wrote:

Hi Alexey,

I might be wrong (I don't know this side of Flink very well),
but as far as I know JobGraph is never stored in the ZK. It's
always recreated from the job's JAR. So you should be able to
upgrade the job by replacing the JAR with a newer version, as
long as the operator UIDs are the same before and after the
upgrade (for operator state to match before and after the upgrade).

Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun mailto:yen...@msn.com>> napisał(a):

Hello,

Let's say I run Flink Job cluster with persistent storage
and Zookeeper HA on k8s with single  JobManager and use
externalized checkpoints. When JM crashes, k8s will restart
JM pod, and JM will read JobId and JobGraph from ZK and
restore from latest checkpoint. Now let's say I want to
upgrade job binary, I delete deployments, create new
deployments referring to newer image, will JM still read
JobGraph from ZK or will create new one from new job jar?

Thanks,
Alexey











Re: Flink S3 Hadoop dependencies

2020-08-14 Thread Chesnay Schepler
Filesystems are supposed to be used as plugins (by putting the jars 
under plugins/ instead of lib/), in which case they are loaded 
separately from other classes, specifically user-code.


https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/plugins.html

On 14/08/2020 20:25, Satish Saley wrote:

Hi team,

Was there a reason for not shading hadoop-common 
https://github.com/apache/flink/commit/e1e7d7f7ecc080c850a264021bf1b20e3d27d373#diff-e7b798a682ee84ab804988165e99761cR38-R44 
? This is leaking lots of classes such as guava and causing issues in 
our flink application.
I see that hadoop-common classes were shaded in earlier versions 
https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop/1.9.0


Stacktrace :
Caused by: java.lang.NoSuchMethodError: 
com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;CLjava/lang/Object;)V

at io.grpc.Metadata$Key.validateName(Metadata.java:742)
at io.grpc.Metadata$Key.(Metadata.java:750)
at io.grpc.Metadata$Key.(Metadata.java:668)
at io.grpc.Metadata$AsciiKey.(Metadata.java:959)
at io.grpc.Metadata$AsciiKey.(Metadata.java:954)
at io.grpc.Metadata$Key.of(Metadata.java:705)
at io.grpc.Metadata$Key.of(Metadata.java:701)
at io.grpc.internal.GrpcUtil.(GrpcUtil.java:80)
at 
io.grpc.internal.AbstractManagedChannelImplBuilder.(AbstractManagedChannelImplBuilder.java:90)
at 
io.grpc.netty.shaded.io.grpc.netty.NettyChannelProvider.builderForTarget(NettyChannelProvider.java:42)
at 
io.grpc.netty.shaded.io.grpc.netty.NettyChannelProvider.builderForTarget(NettyChannelProvider.java:23)

at io.grpc.ManagedChannelBuilder.forTarget(ManagedChannelBuilder.java:76)





Re: Maximum query and refresh rate for metrics from REST API

2020-09-17 Thread Chesnay Schepler
By default metrics are only updated every 10 seconds; this can be 
controlled via 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#metrics-fetcher-update-interval.


On 9/17/2020 12:22 AM, Piper Piper wrote:

Hello,

What is the recommended way to get metrics (such as CPU, Memory and 
user defined meters and gauges) at the highest frequency rate (i.e. 
with the highest/fastest refresh rate) such as every 500 milliseconds 
or less?


Is there any rate limiting by default on querying the REST API for 
metrics? I am querying the REST API every second but not seeing any 
change in the CPU load for every second, so I was wondering if there 
is any maximum frequency at which I can query it.


Thanks,

Piper





Re: Blobserver dying mid-application

2020-10-01 Thread Chesnay Schepler

It would also be good to know how many slots you have on each task executor.

On 10/1/2020 11:21 AM, Till Rohrmann wrote:

Hi Andreas,

do the logs of the JM contain any information?

Theoretically, each task submission to a `TaskExecutor` can trigger a 
new connection to the BlobServer. This depends a bit on how large your 
TaskInformation is and whether this information is being offloaded to 
the BlobServer. What you can definitely try to do is to increase the 
blob.fetch.backlog in order to see whether this solves the problem.


How many jobs and in with what timeline do you submit them to the 
Flink cluster? Maybe you can share a bit more details about the 
application you are running.


Cheers,
Till

On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas > wrote:


Hello folks, I’m seeing application failures where our Blobserver
is refusing connections mid application:

2020-09-30 13:56:06,227 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.TaskExecutor -
Un-registering task and sending final execution state FINISHED to
JobManager for task DataSink (TextOutputFormat

(hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
- UTF-8) 3d1890b47f4398d805cf0c1b54286f71.

2020-09-30 13:56:06,423 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable - Free
slot TaskSlot(index:0, state:ACTIVE, resource profile:
ResourceProfile{cpuCores=1.7976931348623157E308,
heapMemoryInMB=2147483647, directMemoryInMB=2147483647,
nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647,
managedMemoryInMB=3046}, allocationId:
e8be16ec74f16c795d95b89cd08f5c37, jobId:
e808de0373bd515224434b7ec1efe249).

2020-09-30 13:56:06,424 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove
job e808de0373bd515224434b7ec1efe249 from job leader monitoring.

2020-09-30 13:56:06,424 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.TaskExecutor - Close
JobManager connection for job e808de0373bd515224434b7ec1efe249.

2020-09-30 13:56:06,426 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.TaskExecutor - Close
JobManager connection for job e808de0373bd515224434b7ec1efe249.

2020-09-30 13:56:06,426 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot
reconnect to job e808de0373bd515224434b7ec1efe249 because it is
not registered.

2020-09-30 13:56:09,918 INFO  [CHAIN DataSource (dataset | Read
Staging From File System | AVRO) -> Map (Map at
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
collapsePipelineIfRequired(Task.java:160)) (1/1)]
org.apache.flink.runtime.blob.BlobClient - Downloading

48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
from d43723-430.dc.gs.com/10.48.128.14:46473
 (retry 3)

2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset | Read
Staging From File System | AVRO) -> Map (Map at
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
collapsePipelineIfRequired(Task.java:160)) (1/1)]
org.apache.flink.runtime.blob.BlobClient - Failed to fetch BLOB

48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
from d43723-430.dc.gs.com/10.48.128.14:46473
 and store it
under

/fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-0004
Retrying...

2020-09-30 13:56:09,920 INFO  [CHAIN DataSource (dataset | Read
Staging From File System | AVRO) -> Map (Map at
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
collapsePipelineIfRequired(Task.java:160)) (1/1)]
org.apache.flink.runtime.blob.BlobClient - Downloading

48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
from d43723-430.dc.gs.com/10.48.128.14:46473
 (retry 4)

2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset | Read
Staging From File System | AVRO) 

Re: Blobserver dying mid-application

2020-10-01 Thread Chesnay Schepler

All jobs running in a Flink session cluster talk to the same blob server.

The time when tasks are submitted depends on the job; for streaming jobs 
all tasks are deployed when the job starts running; in case of batch 
jobs the submission can be staggered.


I'm only aware of 2 cases where we transfer data via the blob server;
a) retrieval of jars required for the user code to run  (this is what 
you see in the stack trace)
b) retrieval of TaskInformation, which _should_ only happen if your job 
is quite large, but let's assume it does.


For a) there should be at most numberOfSlots * numberOfTaskExecutors 
concurrent connections, in the worst case of each slot working on a 
different job, as each would download the jars for their respective job. 
If multiple slots are used for the same job at the same time, then the 
job jar is only retrieved once.


For b) the limit should also be numberOfSlots * numberOfTaskExecutors; 
it is done once per task, and there are only so many tasks that can run 
at the same time.


Thus from what I can tell there should be at most 104 (26 task executors 
* 2 slots * 2) concurrent attempts, of which only 54 should land in the 
backlog.


Did you run into this issue before?
If not, is this application different than your existing applications? 
Is the jar particularly big, jobs particularly short running or more 
complex than others.


One thing to note is that the backlog relies entirely on OS 
functionality, which can be subject to an upper limit enforced by the OS.
The configured backlog size is just a hint to the OS, but it may ignore 
it; it appears that 128 is not an uncommon upper limit, but maybe there 
are lower settings out there.

You can check this limit via sysctl -a | grep net.core.somaxconn
Maybe this value is set to 0, effectively disabling the backlog?

It may also be worthwhile to monitor the number of such 
connections.(|netstat -ant | grep -c SYN_REC)|


@Nico Do you have any ideas?

On 10/1/2020 6:26 PM, Hailu, Andreas wrote:


Hi Chesnay, Till, thanks for responding.

@Chesnay:

Apologies, I said cores when I meant slots JSo a total of 26 Task 
managers with 2 slots each for a grand total of 52 parallelism.


@Till:

For this application, we have a grand total of 78 jobs, with some of 
them demanding more parallelism than others. Each job has multiple 
operators – depending on the size of the data we’re operating on, we 
could submit 1 whopper with 52 parallelism, or multiple smaller jobs 
submitted in parallel with a sum of 52 parallelism. When does a task 
submission to a `TaskExecutor` take place? Is that on job submission 
or something else? I’m just curious as a parallelism of 52 seems on 
the lower side to breach 1K connections in the queue, unless 
interactions with the Blobserver are much more frequent than I think. 
Is it possible that separate Flink jobs share the same Blobserver? 
Because we have thousands of Flink applications running concurrently 
in our YARN cluster.


*// *ah**

*From:*Chesnay Schepler 
*Sent:* Thursday, October 1, 2020 5:42 AM
*To:* Till Rohrmann ; Hailu, Andreas 
[Engineering] 

*Cc:* user@flink.apache.org
*Subject:* Re: Blobserver dying mid-application

It would also be good to know how many slots you have on each task 
executor.


On 10/1/2020 11:21 AM, Till Rohrmann wrote:

Hi Andreas,

do the logs of the JM contain any information?

Theoretically, each task submission to a `TaskExecutor` can
trigger a new connection to the BlobServer. This depends a bit on
how large your TaskInformation is and whether this information is
being offloaded to the BlobServer. What you can definitely try to
do is to increase the blob.fetch.backlog in order to see whether
this solves the problem.

How many jobs and in with what timeline do you submit them to the
Flink cluster? Maybe you can share a bit more details about the
application you are running.

Cheers,

Till

On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas
mailto:andreas.ha...@gs.com>> wrote:

Hello folks, I’m seeing application failures where our
Blobserver is refusing connections mid application:

2020-09-30 13:56:06,227 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.TaskExecutor -
Un-registering task and sending final execution state FINISHED
to JobManager for task DataSink (TextOutputFormat

(hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
- UTF-8) 3d1890b47f4398d805cf0c1b54286f71.

2020-09-30 13:56:06,423 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable -
Free slot TaskSlot(index:0, state:ACTIVE, resource profile:
ResourceProfile{cpuCores=1.7976931348623157E308,
heapMemoryInMB=2147483647, directMemoryInMB=2147483647,
nativeMemo

Re: sideOutputLateData doesn't work with map()

2020-09-17 Thread Chesnay Schepler

This is working as intended, but is admittedly inconvenient.
The reason why the original version does not work is that the 
side-output is scoped to the DataStream that the process function 
creates; the Map function creates another DataStream though that does 
not retain the side-output of the previous DataStream.


On 9/17/2020 3:21 PM, Ori Popowski wrote:


Turns out that this is the way to solve this problem:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
  .addSource(new SourceFunction[Int] {
    override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
  (1 to 10090).foreach(ctx.collect)
      Thread.sleep(1000)
      (20 to 30).foreach(ctx.collect)
    }
    override def cancel(): Unit = {}
  })
  .map(x => Tuple1(x))
  .assignAscendingTimestamps(_._1)
  .keyBy(_ => 1)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
  .sideOutputLateData(tag)
  .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int, 
TimeWindow] {
    override def process(key: Int, context: Context, elements: 
Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {

  out.collect(elements.map(_._1).toList)
    }
  })
stream
  .getSideOutput(tag)
  .map(a => s"late: $a")
  .print()
stream
  .map(list => list :+ 42)
  .print()

senv.execute()

On Thu, Sep 17, 2020 at 3:32 PM Ori Popowski > wrote:


Hi,

I have this simple flow:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
  .addSource(new SourceFunction[Int] {
    override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
  (1 to 10090).foreach(ctx.collect)
      Thread.sleep(1000)
      (20 to 30).foreach(ctx.collect)
    }
    override def cancel(): Unit = {}
  })
  .map(x => Tuple1(x))
  .assignAscendingTimestamps(_._1)
  .keyBy(_ => 1)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
  .sideOutputLateData(tag)
  .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int,
TimeWindow] {
    override def process(key: Int, context: Context, elements:
Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {
  out.collect(elements.map(_._1).toList)
    }
  })
stream
  .print()
stream
  .getSideOutput(tag)
  .map(a => s"late: $a")
  .print()

senv.execute()

This is a simple stream which uses a session window on integers
and then uses process(…) to just collect them into a list. There's
also side output for late data.
When I run this job I can see printing to stdout of the late
messages without any problem.

However, when I add a map(…) after process(…), the late data isn't
getting into the sideoutput and I cannot see the printing to stdout:
…
.sideOutputLateData(tag)
.process(…)
.map(list => list :+ 42)
…

Is this a bug or is it working as intended? If it's not a bug -
does it mean I cannot add any operator after process(…)?

Thanks





Re: java.lang.NoSuchMethodError while writing to Kafka from Flink

2020-05-25 Thread Chesnay Schepler

Please double-check that your distribution and application jar were
built against the same Flink version.

This looks related to a binary-compatibility issues reporter in
FLINK-13586 .



Re: Query Rest API from IDE during runtime

2020-05-25 Thread Chesnay Schepler
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) does 
not actually create any resources yet, this only happens when you run a 
job. Upon execute() the Flink cluster is started, the job is run, and 
once the job finishes (and execute() returns) the cluster shuts down.
So, you can only query the REST API if the cluster is running; you will 
have to call execute() and in a separate thread query the REST API.


FYI: You should never query 6123 via HTTP, since this is what Akka runs on.

On 25/05/2020 12:46, Annemarie Burger wrote:

Hi,

Thanks for your response!
I can't seem to get past a "java.net.ConnectException: Connection refused"
though. Below is the relevant code and exception, any idea what I'm doing
wrong?



Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10);
config.setLong(RestOptions.RETRY_DELAY, 1000);
config.setString(RestOptions.ADDRESS,"localhost");

// Unsure if I need this
ExecutorService ex =
WebMonitorEndpoint.createExecutorService(config.getInteger(RestOptions.SERVER_NUM_THREADS),

config.getInteger(RestOptions.SERVER_THREAD_PRIORITY),"name");


StreamExecutionEnvironment env =

StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);


CloseableHttpClient httpClient = HttpClients.createDefault();

// Same problem when trying with port 6123
HttpGet httpget = new HttpGet("http://localhost:8081/jobs;);
HttpResponse httpresponse = httpClient.execute(httpget);


Exception in thread "main"
org.apache.flink.hadoop.shaded.org.apache.http.conn.HttpHostConnectException:
Connect to localhost:8081 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1]
failed: Connection refused: connect
at
org.apache.flink.hadoop.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:159)
at
org.apache.flink.hadoop.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
at
org.apache.flink.hadoop.shaded.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381)
at
org.apache.flink.hadoop.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
at
org.apache.flink.hadoop.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
at
org.apache.flink.hadoop.shaded.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
at
org.apache.flink.hadoop.shaded.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111)
at
org.apache.flink.hadoop.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at
org.apache.flink.hadoop.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at
org.apache.flink.hadoop.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
at gellyStreaming.gradoop.model.GraphState.(GraphState.java:113)
at
gellyStreaming.gradoop.model.SimpleTemporalEdgeStream.buildState(SimpleTemporalEdgeStream.java:473)
at gellyStreaming.gradoop.model.Tests.incrementalState(Tests.java:261)
at gellyStreaming.gradoop.model.Tests.main(Tests.java:417)
Caused by: java.net.ConnectException: Connection refused: connect
at java.base/java.net.PlainSocketImpl.connect0(Native Method)
at
java.base/java.net.PlainSocketImpl.socketConnect(PlainSocketImpl.java:101)
at
java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
at
java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
at
java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403)
at java.base/java.net.Socket.connect(Socket.java:609)
at
org.apache.flink.hadoop.shaded.org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
at
org.apache.flink.hadoop.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
... 13 more




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Query Rest API from IDE during runtime

2020-05-25 Thread Chesnay Schepler
If you set DeploymentOptions.ATTACHED to false then execute() does not 
block until the job finishes, and returns a DetachedJobExecutionResult 
from which you can retrieve the Job ID.
If you need to know when the job finishes you will have to continuously 
query the REST API.


This is the only way to do so using the StreamExecutionEnvironment that 
I'm aware of.


On 25/05/2020 14:34, Annemarie Burger wrote:

Hi,

Thanks for your reply and explanation!
Do you know of any way to have a job retrieve its own jobID while it's still
running?

Best,
Annemarie



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Chesnay Schepler
If it were a class-loading issue I would think that we'd see an 
exception of some kind. Maybe double-check that flink-shaded-hadoop is 
not in the lib directory. (usually I would ask for the full classpath 
that the HS is started with, but as it turns out this isn't getting 
logged :( (FLINK-18008))


The fact that overview.json and jobs/overview.json are missing indicates 
that something goes wrong directly on startup. What is supposed to 
happens is that the HS starts, fetches all currently available archives 
and then creates these files.

So it seems like the download gets stuck for some reason.

Can you use jstack to create a thread dump, and see what the 
Flink-HistoryServer-ArchiveFetcher is doing?


I will also file a JIRA for adding more logging statements, like when 
fetching starts/stops.


On 27/05/2020 20:57, Hailu, Andreas wrote:


Hi Chesney, apologies for not getting back to you sooner here. So I 
did what you suggested - I downloaded a few files from my 
jobmanager.archive.fs.dir HDFS directory to a locally available 
directory named 
/local/scratch/hailua_p2epdlsuat/historyserver/archived/. I then 
changed my historyserver.archive.fs.dir to 
file:///local/scratch/hailua_p2epdlsuat/historyserver/archived/ and 
that seemed to work. I’m able to see the history of the applications I 
downloaded. So this points to a problem with sourcing the history from 
HDFS.


Do you think this could be classpath related? This is what we use for 
our HADOOP_CLASSPATH var:


//gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop/lib/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-hdfs/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-hdfs/lib/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-mapreduce/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-mapreduce/lib/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-yarn/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-yarn/lib/*:/gns/software/ep/da/dataproc/dataproc-prod/lakeRmProxy.jar:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop/bin::/gns/mw/dbclient/postgres/jdbc/pg-jdbc-9.3.v01/postgresql-9.3-1100-jdbc4.jar/

//

You can see we have references to Hadoop mapred/yarn/hdfs libs in there.

*// *ah**

*From:*Chesnay Schepler 
*Sent:* Sunday, May 3, 2020 6:00 PM
*To:* Hailu, Andreas [Engineering] ; 
user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

yes, exactly; I want to rule out that (somehow) HDFS is the problem.

I couldn't reproduce the issue locally myself so far.

On 01/05/2020 22:31, Hailu, Andreas wrote:

Hi Chesnay, yes – they were created using Flink 1.9.1 as we’ve
only just started to archive them in the past couple weeks. Could
you clarify on how you want to try local filesystem archives? As
in changing jobmanager.archive.fs.dir and historyserver.web.tmpdir
to the same local directory?

*// *ah

*From:*Chesnay Schepler 
<mailto:ches...@apache.org>
*Sent:* Wednesday, April 29, 2020 8:26 AM
*To:* Hailu, Andreas [Engineering] 
<mailto:andreas.ha...@ny.email.gs.com>; user@flink.apache.org
<mailto:user@flink.apache.org>
*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

hmm...let's see if I can reproduce the issue locally.

Are the archives from the same version the history server runs on?
(Which I supposed would be 1.9.1?)

Just for the sake of narrowing things down, it would also be
interesting to check if it works with the archives residing in the
local filesystem.

On 27/04/2020 18:35, Hailu, Andreas wrote:

bash-4.1$ ls -l /local/scratch/flink_historyserver_tmpdir/

total 8

drwxrwxr-x 3 p2epdlsuat p2epdlsuat 4096 Apr 21 10:43
flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9

drwxrwxr-x 3 p2epdlsuat p2epdlsuat 4096 Apr 21 10:22
flink-web-history-95b3f928-c60f-4351-9926-766c6ad3ee76

There are just two directories in here. I don’t see cache
directories from my attempts today, which is interesting.
Looking a little deeper into them:

bash-4.1$ ls -lr

/local/scratch/flink_historyserver_tmpdir/flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9

total 1756

drwxrwxr-x 2 p2epdlsuat p2epdlsuat 1789952 Apr 21 10:44 jobs

bash-4.1$ ls -lr

/local/scratch/flink_historyserver_tmpdir/flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9/jobs

total 0

-rw-rw-r-- 1 p2epdlsuat p2epdlsuat 0 Apr 21 10:43 overview.json

There are indeed archives already in HDFS – I’ve included some
in my initial mail, but here they are again just for reference:

-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs

Found 44282 items

-rw-r- 3 delp datalake_admin_dev  50569 2020-03-21
23:17
/user/p

Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Chesnay Schepler

Looks like it is indeed stuck on downloading the archive.

I searched a bit in the Hadoop JIRA and found several similar instances:
https://issues.apache.org/jira/browse/HDFS-6999
https://issues.apache.org/jira/browse/HDFS-7005
https://issues.apache.org/jira/browse/HDFS-7145

It is supposed to be fixed in 2.6.0 though :/

If hadoop is available from the HADOOP_CLASSPATH and flink-shaded-hadoop 
in /lib then you basically don't know what Hadoop version is actually 
being used,

which could lead to incompatibilities and dependency clashes.
If flink-shaded-hadoop 2.4/2.5 is on the classpath, maybe that is being 
used and runs into HDFS-7005.


On 28/05/2020 16:27, Hailu, Andreas wrote:


Just created a dump, here’s what I see:

"Flink-HistoryServer-ArchiveFetcher-thread-1" #19 daemon prio=5 
os_prio=0 tid=0x7f93a5a2c000 nid=0x5692 runnable [0x7f934a0d3000]


java.lang.Thread.State: RUNNABLE

    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)

    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)

    at 
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)


    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)

    - locked <0x0005df986960> (a sun.nio.ch.Util$2)

    - locked <0x0005df986948> (a 
java.util.Collections$UnmodifiableSet)


    - locked <0x0005df928390> (a sun.nio.ch.EPollSelectorImpl)

    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)

    at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)


    at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)


    at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)


    at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)


    at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)


    at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)


    at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)


    at 
org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:201)


    at 
org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)


    - locked <0x0005ceade5e0> (a 
org.apache.hadoop.hdfs.RemoteBlockReader2)


    at 
org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:781)


    at 
org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:837)


    - eliminated <0x0005cead3688> (a 
org.apache.hadoop.hdfs.DFSInputStream)


    at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:897)


    - locked <0x0005cead3688> (a 
org.apache.hadoop.hdfs.DFSInputStream)


   at 
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:945)


    - locked <0x0005cead3688> (a 
org.apache.hadoop.hdfs.DFSInputStream)


    at java.io.DataInputStream.read(DataInputStream.java:149)

    at 
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)


    at java.io.InputStream.read(InputStream.java:101)

    at org.apache.flink.util.IOUtils.copyBytes(IOUtils.java:69)

    at org.apache.flink.util.IOUtils.copyBytes(IOUtils.java:91)

    at 
org.apache.flink.runtime.history.FsJobArchivist.getArchivedJsons(FsJobArchivist.java:110)


    at 
org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.run(HistoryServerArchiveFetcher.java:169)


    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)


    at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)


    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)


    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)


    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)


    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)


    at java.lang.Thread.run(Thread.java:745)

What problems could the flink-shaded-hadoop jar being included introduce?

*// *ah**

*From:*Chesnay Schepler 
*Sent:* Thursday, May 28, 2020 9:26 AM
*To:* Hailu, Andreas [Engineering] ; 
user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

If it were a class-loading issue I would think that we'd see an 
exception of some kind. Maybe double-check that flink-shaded-hadoop is 
not in the lib directory. (usually I would ask for the full classpath 
that the HS is started with, but as it turns out thi

Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-29 Thread Chesnay Schepler

oh I'm not using the HistoryServer; I just wrote it ;)
Are these archives all in the same location? So we're roughly looking at 
5 GB of archives then?


That could indeed "just" be a resource problem. The HistoryServer 
eagerly downloads all archives, and not on-demand.
The next step would be to move some of the archives into a separate HDFS 
directory and try again.


(Note that by configuring "historyserver.web.tmpdir" to some permanent 
directory subsequent (re)starts of the HistorySserver can re-use this 
directory; so you only have to download things once)


On 29/05/2020 00:43, Hailu, Andreas wrote:


May I also ask what version of flink-hadoop you’re using and the 
number of jobs you’re storing the history for? As of writing we have 
roughly 101,000 application history files. I’m curious to know if 
we’re encountering some kind of resource problem.


*// *ah**

*From:*Hailu, Andreas [Engineering]
*Sent:* Thursday, May 28, 2020 12:18 PM
*To:* 'Chesnay Schepler' ; user@flink.apache.org
*Subject:* RE: History Server Not Showing Any Jobs - File Not Found?

Okay, I will look further to see if we’re mistakenly using a version 
that’s pre-2.6.0. However, I don’t see flink-shaded-hadoop in my /lib 
directory for flink-1.9.1.


flink-dist_2.11-1.9.1.jar

flink-table-blink_2.11-1.9.1.jar

flink-table_2.11-1.9.1.jar

log4j-1.2.17.jar

slf4j-log4j12-1.7.15.jar

Are the files within /lib.

*// *ah**

*From:*Chesnay Schepler mailto:ches...@apache.org>>
*Sent:* Thursday, May 28, 2020 11:00 AM
*To:* Hailu, Andreas [Engineering] <mailto:andreas.ha...@ny.email.gs.com>>; user@flink.apache.org 
<mailto:user@flink.apache.org>

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

Looks like it is indeed stuck on downloading the archive.

I searched a bit in the Hadoop JIRA and found several similar instances:

https://issues.apache.org/jira/browse/HDFS-6999 
<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_HDFS-2D6999=DwMD-g=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM=b1rFpuaq4HMshPx-d-0ZmaazccTuKjDKzJjF0WZSIso=wtWbBz9FrMlr29HibXGZvdcsFC1wqyVPulrYiTewpoQ=>


https://issues.apache.org/jira/browse/HDFS-7005 
<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_HDFS-2D7005=DwMD-g=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM=b1rFpuaq4HMshPx-d-0ZmaazccTuKjDKzJjF0WZSIso=0KgRQHmW0Xj6NToNVzoi9iAGh1SIbfe8cnCqj1TXuW8=>


https://issues.apache.org/jira/browse/HDFS-7145 
<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_HDFS-2D7145=DwMD-g=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM=b1rFpuaq4HMshPx-d-0ZmaazccTuKjDKzJjF0WZSIso=oy8z5gRd6dNDURDDH20f2yiplIuJ9qnYZeVpTIrHMwc=>


It is supposed to be fixed in 2.6.0 though :/

If hadoop is available from the HADOOP_CLASSPATH and 
flink-shaded-hadoop in /lib then you basically don't know what Hadoop 
version is actually being used,


which could lead to incompatibilities and dependency clashes.

If flink-shaded-hadoop 2.4/2.5 is on the classpath, maybe that is 
being used and runs into HDFS-7005.


On 28/05/2020 16:27, Hailu, Andreas wrote:

Just created a dump, here’s what I see:

"Flink-HistoryServer-ArchiveFetcher-thread-1" #19 daemon prio=5
os_prio=0 tid=0x7f93a5a2c000 nid=0x5692 runnable
[0x7f934a0d3000]

java.lang.Thread.State: RUNNABLE

    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)

    at
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)

    at
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)

    at
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)

    - locked <0x0005df986960> (a sun.nio.ch.Util$2)

    - locked <0x0005df986948> (a
java.util.Collections$UnmodifiableSet)

    - locked <0x0005df928390> (a sun.nio.ch.EPollSelectorImpl)

    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)

    at

org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)

    at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)

    at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)

    at

org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)

    at

org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)

    at

org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)

    at

org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)

    at
  

Re: History Server Not Showing Any Jobs - File Not Found?

2020-06-02 Thread Chesnay Schepler
1) It downloads all archives and stores them on disk; the only thing 
stored in memory is the job ID or the archive. There is no hard upper 
limit; it is mostly constrained by disk space / memory. I say mostly, 
because I'm not sure how well the WebUI handles 100k jobs being loaded 
into the overview.


2) No, there is no retention policy. It is currently expected that an 
external process cleans up archives. If an archive was deleted (from the 
archive directory) the HistoryServer does notice that and also delete 
the local copy.


On 01/06/2020 23:05, Hailu, Andreas wrote:


So I created a new HDFS directory with just 1 archive and pointed the 
server to monitor that directory, et voila – I’m able to see the 
applications in the UI. So it must have been really churning trying to 
fetch all of those initial archives J


I have a couple of follow up questions if you please:

1.What is the upper limit of the number of archives the history server 
can support? Does it attempt to download every archive and load them 
all into memory?


2.Retention: we have on the order of 100K applications per day in our 
production environment. Is there any native retention of policy? E.g. 
only keep the latest X archives in the dir - or is this something we 
need to manage ourselves?


Thanks.

*// *ah**

*From:*Hailu, Andreas [Engineering]
*Sent:* Friday, May 29, 2020 8:46 AM
*To:* 'Chesnay Schepler' ; user@flink.apache.org
*Subject:* RE: History Server Not Showing Any Jobs - File Not Found?

Yes, these are all in the same directory, and we’re at 67G right now. 
I’ll try with incrementally smaller directories and let you know what 
I find.


*// *ah**

*From:*Chesnay Schepler mailto:ches...@apache.org>>
*Sent:* Friday, May 29, 2020 3:11 AM
*To:* Hailu, Andreas [Engineering] <mailto:andreas.ha...@ny.email.gs.com>>; user@flink.apache.org 
<mailto:user@flink.apache.org>

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

oh I'm not using the HistoryServer; I just wrote it ;)

Are these archives all in the same location? So we're roughly looking 
at 5 GB of archives then?


That could indeed "just" be a resource problem. The HistoryServer 
eagerly downloads all archives, and not on-demand.


The next step would be to move some of the archives into a separate 
HDFS directory and try again.


(Note that by configuring "historyserver.web.tmpdir" to some permanent 
directory subsequent (re)starts of the HistorySserver can re-use this 
directory; so you only have to download things once)


On 29/05/2020 00:43, Hailu, Andreas wrote:

May I also ask what version of flink-hadoop you’re using and the
number of jobs you’re storing the history for? As of writing we
have roughly 101,000 application history files. I’m curious to
know if we’re encountering some kind of resource problem.

*// *ah

*From:*Hailu, Andreas [Engineering]
*Sent:* Thursday, May 28, 2020 12:18 PM
*To:* 'Chesnay Schepler' 
<mailto:ches...@apache.org>; user@flink.apache.org
<mailto:user@flink.apache.org>
*Subject:* RE: History Server Not Showing Any Jobs - File Not Found?

Okay, I will look further to see if we’re mistakenly using a
version that’s pre-2.6.0. However, I don’t see flink-shaded-hadoop
in my /lib directory for flink-1.9.1.

flink-dist_2.11-1.9.1.jar

flink-table-blink_2.11-1.9.1.jar

flink-table_2.11-1.9.1.jar

log4j-1.2.17.jar

slf4j-log4j12-1.7.15.jar

    Are the files within /lib.

*// *ah

*From:*Chesnay Schepler mailto:ches...@apache.org>>
*Sent:* Thursday, May 28, 2020 11:00 AM
*To:* Hailu, Andreas [Engineering] mailto:andreas.ha...@ny.email.gs.com>>; user@flink.apache.org
<mailto:user@flink.apache.org>
*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

Looks like it is indeed stuck on downloading the archive.

I searched a bit in the Hadoop JIRA and found several similar
instances:

https://issues.apache.org/jira/browse/HDFS-6999

<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_HDFS-2D6999=DwMD-g=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM=b1rFpuaq4HMshPx-d-0ZmaazccTuKjDKzJjF0WZSIso=wtWbBz9FrMlr29HibXGZvdcsFC1wqyVPulrYiTewpoQ=>

https://issues.apache.org/jira/browse/HDFS-7005

<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_HDFS-2D7005=DwMD-g=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM=b1rFpuaq4HMshPx-d-0ZmaazccTuKjDKzJjF0WZSIso=0KgRQHmW0Xj6NToNVzoi9iAGh1SIbfe8cnCqj1TXuW8=>

https://issues.apache.org/jira/browse/HDFS-7145

<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_HDFS-2D7145=DwMD-g=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM=b1rFpuaq4HMshPx-d-0Zmaa

Re: On Flink job re-submission, observing error, "the rpc invocation size exceeds the maximum akka framesize"

2020-09-18 Thread Chesnay Schepler

how can we know the expected size for which it is failing?


If you did not configure akka.framesize yourself then it is set to the 
documented default value. See the configuration documentation for the 
release you are using.


> Does the operator state have any impact on the expected Akka frame size?

If you are using the MemoryStateBackend, yes. Otherwise, the impact of 
using any form of state on the framesize should be negligible.


> What is the impact of increasing it?

Increase in memory consumption, probably around 1-2x the increased 
amount. (So, increase it my 1mb, memory usages goes up by 1-2 mb)


On 9/18/2020 9:50 AM, shravan wrote:

Hi,

This is in continuation to an already raised request, (had replied to the
same thread but couldn't get any response yet, hence posting a new request)
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-rpc-invocation-size-exceeds-the-maximum-akka-framesize-when-the-job-was-re-submitted-td37507.html

We are observing the same error as well with regard to "The rpc invocation
size exceeds the maximum akka framesize.", and have follow-up questions on
the same.

Why we face this issue, how can we know the expected size for which it is
failing? The error message does not indicate that. Does the operator state
have any impact on the expected Akka frame size? What is the impact of
increasing it?

Awaiting a response.

Regards,
Shravan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: On Flink job re-submission, observing error, "the rpc invocation size exceeds the maximum akka framesize"

2020-09-18 Thread Chesnay Schepler
If you use 1.10.0 or above the framesize for which it failed is part of 
the exception message, see FLINK-14618.


If you are using older version, then I'm afraid there is no way to tell.

On 9/18/2020 12:11 PM, shravan wrote:

Thanks for the quick response.

I might have wrongly phrased one of the questions.

/"> how can we know the expected size for which it is failing?

If you did not configure akka.framesize yourself then it is set to the
documented default value. See the configuration documentation for the
release you are using."/

We found out the default size from the configuration but we are unable to
identify the size for which it fails. Could you help out on this?

Awaiting a response.

Regards,
Shravan




Chesnay Schepler wrote

how can we know the expected size for which it is failing?

If you did not configure akka.framesize yourself then it is set to the
documented default value. See the configuration documentation for the
release you are using.

  > Does the operator state have any impact on the expected Akka frame
size?

If you are using the MemoryStateBackend, yes. Otherwise, the impact of
using any form of state on the framesize should be negligible.

  > What is the impact of increasing it?

Increase in memory consumption, probably around 1-2x the increased
amount. (So, increase it my 1mb, memory usages goes up by 1-2 mb)

On 9/18/2020 9:50 AM, shravan wrote:

Hi,

This is in continuation to an already raised request, (had replied to the
same thread but couldn't get any response yet, hence posting a new
request)
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-rpc-invocation-size-exceeds-the-maximum-akka-framesize-when-the-job-was-re-submitted-td37507.html

We are observing the same error as well with regard to "The rpc
invocation
size exceeds the maximum akka framesize.", and have follow-up questions
on
the same.

Why we face this issue, how can we know the expected size for which it is
failing? The error message does not indicate that. Does the operator
state
have any impact on the expected Akka frame size? What is the impact of
increasing it?

Awaiting a response.

Regards,
Shravan



--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: On Flink job re-submission, observing error, "the rpc invocation size exceeds the maximum akka framesize"

2020-09-18 Thread Chesnay Schepler

There are quite a few reason why the framesize could be exceeded.

The most common one we see is due to the parallelism being so high that 
tasks can't be deployed in the first place. When a task is deployed the 
RPC payload also contains information about all downstream tasks this 
task sends data to; when those are a few thousand (usually in case of a 
shuffle) the amount of data can quickly add up.


Other causes could be tasks having thousands of accumulators or there 
being too many metrics on one TaskExecutor (which would result in 
metrics not being queryable from the WebUI/REST API).


Overall though, the documentation is pretty accurate. The framesize 
being exceeded is usually not because the user did anything wrong, but 
just operating at a scale that the default framesize cannot support. The 
only solution to that is to increase the framesize.


On 9/18/2020 12:34 PM, shravan wrote:

Thanks again for the quick response.

In that case, could you tell me what are the possible factors that warrant a
framesize increase? I see the official documentation and it simply states
"If Flink fails because messages exceed this limit, then you should increase
it", which isn't very convincing.

Regards,
M S Shravan
Chesnay Schepler wrote

If you use 1.10.0 or above the framesize for which it failed is part of
the exception message, see FLINK-14618.

If you are using older version, then I'm afraid there is no way to tell.

On 9/18/2020 12:11 PM, shravan wrote:

Thanks for the quick response.

I might have wrongly phrased one of the questions.

/"> how can we know the expected size for which it is failing?

If you did not configure akka.framesize yourself then it is set to the
documented default value. See the configuration documentation for the
release you are using."/

We found out the default size from the configuration but we are unable to
identify the size for which it fails. Could you help out on this?

Awaiting a response.

Regards,
Shravan




Chesnay Schepler wrote

how can we know the expected size for which it is failing?

If you did not configure akka.framesize yourself then it is set to the
documented default value. See the configuration documentation for the
release you are using.

   > Does the operator state have any impact on the expected Akka frame
size?

If you are using the MemoryStateBackend, yes. Otherwise, the impact of
using any form of state on the framesize should be negligible.

   > What is the impact of increasing it?

Increase in memory consumption, probably around 1-2x the increased
amount. (So, increase it my 1mb, memory usages goes up by 1-2 mb)

On 9/18/2020 9:50 AM, shravan wrote:

Hi,

This is in continuation to an already raised request, (had replied to
the
same thread but couldn't get any response yet, hence posting a new
request)
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-rpc-invocation-size-exceeds-the-maximum-akka-framesize-when-the-job-was-re-submitted-td37507.html

We are observing the same error as well with regard to "The rpc
invocation
size exceeds the maximum akka framesize.", and have follow-up questions
on
the same.

Why we face this issue, how can we know the expected size for which it
is
failing? The error message does not indicate that. Does the operator
state
have any impact on the expected Akka frame size? What is the impact of
increasing it?

Awaiting a response.

Regards,
Shravan



--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: mvn clean verify - testConfigurePythonExecution failing

2020-10-22 Thread Chesnay Schepler
try naming it PythonProgramOptionsITCase; it apparently needs a jar to 
be created first, which happens after unit tests (tests suffixed with 
Test) are executed.


On 10/22/2020 1:48 PM, Juha Mynttinen wrote:

Hello there,

The PR https://github.com/apache/flink/pull/13322 lately added the 
test method testConfigurePythonExecution in 
org.apache.flink.client.cli.PythonProgramOptionsTest.


"mvn clean verify" fails for me in testConfigurePythonExecution:

...
INFO] Running org.apache.flink.client.cli.PythonProgramOptionsTest
[ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 0.433 s <<< FAILURE! - in 
org.apache.flink.client.cli.PythonProgramOptionsTest
[ERROR] 
testConfigurePythonExecution(org.apache.flink.client.cli.PythonProgramOptionsTest) 
 Time elapsed: 0.019 s  <<< ERROR!

java.nio.file.NoSuchFileException: target/dummy-job-jar
at 
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at 
java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
at 
java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149)
at 
java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)

at java.base/java.nio.file.Files.readAttributes(Files.java:1763)
at 
java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219)

at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276)
at java.base/java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:322)
at java.base/java.nio.file.Files.walkFileTree(Files.java:2716)
at java.base/java.nio.file.Files.walkFileTree(Files.java:2796)
at 
org.apache.flink.client.cli.PythonProgramOptionsTest.testConfigurePythonExecution(PythonProgramOptionsTest.java:131)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)

at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)

at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)
at 
org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:158)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

...
[ERROR] Errors:
[ERROR] PythonProgramOptionsTest.testConfigurePythonExecution:131 » 
NoSuchFile target/...




The command "find . -name dummy-job-jar" doesn't find anything. I 
didn't check any deeper why 

Re: FLINK 1.11 Graphite Metrics

2020-10-27 Thread Chesnay Schepler
In the normal Flink distribution these jars were moved from opt/ to 
plugins/ so that they are available by default without having to mess 
around with any jars.
I don't think anyone was aware that the plugin directory is not 
populated on EMR.


On 10/27/2020 9:53 PM, Vijayendra Yadav wrote:
Perfect after downloading it into the plugin, it is working well. I am 
wondering why these jars have been removed from opt/ folder, earlier I 
was able to copy from opt/ to lib/ folder for 1.10.

For now I just downloaded from Maven for 1.11 and copied in plugin/.

Regards,
Vijay

On Tue, Oct 27, 2020 at 11:18 AM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


So the plugins directory is completely empty?

In that case, please download the flink-metrics-graphite jar

<https://mvnrepository.com/artifact/org.apache.flink/flink-metrics-graphite/1.11.0>
and also copy it into the plugins directory.

On 10/27/2020 7:04 PM, Vijayendra Yadav wrote:

Also, you are right that the plugin did not have anything by
default when we created EMR 5.31 with Flink 1.11.

In opt/ I see:

[hadoop@ip-10-223-71-70 flink]$ pwd
/usr/lib/flink
[hadoop@ip-10-223-71-70 flink]$ ll opt/
total 172860
-rw-r--r-- 1 root root 24029243 Sep 19 03:08
flink-azure-fs-hadoop-1.11.0.jar
-rw-r--r-- 1 root root   185395 Sep 19 03:11
flink-cep_2.11-1.11.0.jar
-rw-r--r-- 1 root root    53473 Sep 19 03:17
flink-cep-scala_2.11-1.11.0.jar
-rw-r--r-- 1 root root   640604 Sep 19 03:16
flink-gelly_2.11-1.11.0.jar
-rw-r--r-- 1 root root   764049 Sep 19 03:16
flink-gelly-scala_2.11-1.11.0.jar
-rw-r--r-- 1 root root   268951 Sep 19 03:17 flink-ml_2.11-1.11.0.jar
-rw-r--r-- 1 root root 22316430 Sep 19 03:08
flink-oss-fs-hadoop-1.11.0.jar
-rw-r--r-- 1 root root 37228704 Sep 19 03:17
flink-python_2.11-1.11.0.jar
-rw-r--r-- 1 root root    22155 Sep 19 03:16
flink-queryable-state-runtime_2.11-1.11.0.jar
-rw-r--r-- 1 root root 19985454 Sep 19 03:08
flink-s3-fs-hadoop-1.11.0.jar
-rw-r--r-- 1 root root 36173428 Sep 19 03:08
flink-s3-fs-presto-1.11.0.jar
-rw-r--r-- 1 root root   194834 Aug 28 16:51
flink-shaded-netty-tcnative-dynamic-2.0.25.Final-11.0.jar
-rw-r--r-- 1 root root  8028165 Aug 28 17:04
flink-shaded-zookeeper-3.5.6.jar
-rw-r--r-- 1 root root   544183 Sep 19 03:17
flink-sql-client_2.11-1.11.0.jar
-rw-r--r-- 1 root root   103766 Sep 19 03:17
flink-state-processor-api_2.11-1.11.0.jar
-rw-r--r-- 1 root root 26428976 Sep 19 03:08
flink-swift-fs-hadoop-1.11.0.jar
drwxr-xr-x 2 root root      134 Oct 13 18:01 python

in lib/ I see:

[hadoop@ip-10-223-71-70 flink]$ ll lib/
total 190304
-rw-r--r-- 1 root root     90784 Sep 19 03:14 flink-csv-1.11.0.jar
-rw-r--r-- 1 root root 114256876 Sep 19 03:17
flink-dist_2.11-1.11.0.jar
-rw-r--r-- 1 root root     94866 Sep 19 03:14 flink-json-1.11.0.jar
-rw-r--r-- 1 root root   7712156 Aug 28 16:51
flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root root  33325748 Sep 19 03:17
flink-table_2.11-1.11.0.jar
-rw-r--r-- 1 root root  37330514 Sep 19 03:17
flink-table-blink_2.11-1.11.0.jar
-rw-r--r-- 1 root root     67114 Aug 28 16:50
log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 root root    276771 Aug 28 16:50 log4j-api-2.12.1.jar
-rw-r--r-- 1 root root   1674433 Aug 28 16:50 log4j-core-2.12.1.jar
-rw-r--r-- 1 root root     23518 Aug 28 16:50
log4j-slf4j-impl-2.12.1.jar

Regards,
Vijay

On Tue, Oct 27, 2020 at 10:57 AM Vijayendra Yadav
mailto:contact@gmail.com>> wrote:

Hi Chesnay,

Steps to upgrade are as follows:

1) Created EMR 5.31 Cluster which comes with Flink 1.11
2) Copied flink-s3-fs-hadoop-1.11.0.jar to plugin folder for
application.

cd /usr/lib/flink/

mkdir -p ./plugins/s3-fs-hadoop

cp ./opt/flink-s3-fs-hadoop-1.11.0.jar ./plugins/s3-fs-hadoop/

3) Recompiled Application with Flink 1.11 dependency.
4) Updated Graphite plugin class in config

That is all I did.

Regards,
Vijay


On Tue, Oct 27, 2020 at 10:00 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

How exactly did you do the upgrade? Did you copy some
files from 1.11 into an existing 1.10 distribution?

The configuration is correct, but it appears as if the
entire plugins directory is either a) empty or b) not
shipped.

On 10/27/2020 5:22 PM, Vijayendra Yadav wrote:

Hi Robert and Chesnay,

Only  thing changed is I upgraded from Flink 1.10 to
1.11 and to support that updated conf yaml with factory
class.

Here is attached Full Log with classpath etc.   (log.txt)

Regards,
Vijay


On T

Re: FLINK 1.11 Graphite Metrics

2020-10-27 Thread Chesnay Schepler

So the plugins directory is completely empty?

In that case, please download the flink-metrics-graphite jar 
<https://mvnrepository.com/artifact/org.apache.flink/flink-metrics-graphite/1.11.0> 
and also copy it into the plugins directory.


On 10/27/2020 7:04 PM, Vijayendra Yadav wrote:
Also, you are right that the plugin did not have anything by default 
when we created EMR 5.31 with Flink 1.11.


In opt/ I see:

[hadoop@ip-10-223-71-70 flink]$ pwd
/usr/lib/flink
[hadoop@ip-10-223-71-70 flink]$ ll opt/
total 172860
-rw-r--r-- 1 root root 24029243 Sep 19 03:08 
flink-azure-fs-hadoop-1.11.0.jar

-rw-r--r-- 1 root root   185395 Sep 19 03:11 flink-cep_2.11-1.11.0.jar
-rw-r--r-- 1 root root    53473 Sep 19 03:17 
flink-cep-scala_2.11-1.11.0.jar

-rw-r--r-- 1 root root   640604 Sep 19 03:16 flink-gelly_2.11-1.11.0.jar
-rw-r--r-- 1 root root   764049 Sep 19 03:16 
flink-gelly-scala_2.11-1.11.0.jar

-rw-r--r-- 1 root root   268951 Sep 19 03:17 flink-ml_2.11-1.11.0.jar
-rw-r--r-- 1 root root 22316430 Sep 19 03:08 
flink-oss-fs-hadoop-1.11.0.jar

-rw-r--r-- 1 root root 37228704 Sep 19 03:17 flink-python_2.11-1.11.0.jar
-rw-r--r-- 1 root root    22155 Sep 19 03:16 
flink-queryable-state-runtime_2.11-1.11.0.jar

-rw-r--r-- 1 root root 19985454 Sep 19 03:08 flink-s3-fs-hadoop-1.11.0.jar
-rw-r--r-- 1 root root 36173428 Sep 19 03:08 flink-s3-fs-presto-1.11.0.jar
-rw-r--r-- 1 root root   194834 Aug 28 16:51 
flink-shaded-netty-tcnative-dynamic-2.0.25.Final-11.0.jar
-rw-r--r-- 1 root root  8028165 Aug 28 17:04 
flink-shaded-zookeeper-3.5.6.jar
-rw-r--r-- 1 root root   544183 Sep 19 03:17 
flink-sql-client_2.11-1.11.0.jar
-rw-r--r-- 1 root root   103766 Sep 19 03:17 
flink-state-processor-api_2.11-1.11.0.jar
-rw-r--r-- 1 root root 26428976 Sep 19 03:08 
flink-swift-fs-hadoop-1.11.0.jar

drwxr-xr-x 2 root root      134 Oct 13 18:01 python

in lib/ I see:

[hadoop@ip-10-223-71-70 flink]$ ll lib/
total 190304
-rw-r--r-- 1 root root     90784 Sep 19 03:14 flink-csv-1.11.0.jar
-rw-r--r-- 1 root root 114256876 Sep 19 03:17 flink-dist_2.11-1.11.0.jar
-rw-r--r-- 1 root root     94866 Sep 19 03:14 flink-json-1.11.0.jar
-rw-r--r-- 1 root root   7712156 Aug 28 16:51 
flink-shaded-zookeeper-3.4.14.jar

-rw-r--r-- 1 root root  33325748 Sep 19 03:17 flink-table_2.11-1.11.0.jar
-rw-r--r-- 1 root root  37330514 Sep 19 03:17 
flink-table-blink_2.11-1.11.0.jar

-rw-r--r-- 1 root root     67114 Aug 28 16:50 log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 root root    276771 Aug 28 16:50 log4j-api-2.12.1.jar
-rw-r--r-- 1 root root   1674433 Aug 28 16:50 log4j-core-2.12.1.jar
-rw-r--r-- 1 root root     23518 Aug 28 16:50 log4j-slf4j-impl-2.12.1.jar

Regards,
Vijay

On Tue, Oct 27, 2020 at 10:57 AM Vijayendra Yadav 
mailto:contact@gmail.com>> wrote:


Hi Chesnay,

Steps to upgrade are as follows:

1) Created EMR 5.31 Cluster which comes with Flink 1.11
2) Copied flink-s3-fs-hadoop-1.11.0.jar to plugin folder for
application.

cd /usr/lib/flink/

mkdir -p ./plugins/s3-fs-hadoop

cp ./opt/flink-s3-fs-hadoop-1.11.0.jar ./plugins/s3-fs-hadoop/

3) Recompiled Application with Flink 1.11 dependency.
4) Updated Graphite plugin class in config

That is all I did.

Regards,
Vijay


On Tue, Oct 27, 2020 at 10:00 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

How exactly did you do the upgrade? Did you copy some files
from 1.11 into an existing 1.10 distribution?

The configuration is correct, but it appears as if the entire
plugins directory is either a) empty or b) not shipped.

On 10/27/2020 5:22 PM, Vijayendra Yadav wrote:

Hi Robert and Chesnay,

Only  thing changed is I upgraded from Flink 1.10 to 1.11 and
to support that updated conf yaml with factory class.

Here is attached Full Log with classpath etc.  (log.txt)

Regards,
Vijay


On Tue, Oct 27, 2020 at 9:31 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Are you writing a test? (otherwise the ReporterSetupTest
reporters wouldn't be around)
Do you have a dependency on the graphite reporter?

On 10/27/2020 8:27 AM, Robert Metzger wrote:

Hi Vijayendra,
can you post or upload the entire logs, so that we can
see the Classpath logged on startup, as well as the
effective configuration parameters?

On Tue, Oct 27, 2020 at 12:49 AM Vijayendra Yadav
mailto:contact@gmail.com>>
wrote:

Hi Chesnay,

Another log message:

2020-10-26 23:33:08,516 WARN
org.apache.flink.runtime.metrics.ReporterSetup - The
reporter factory
(org.apache.flink.metrics.graphite.GraphiteReporterFactory)
could not be found for reporter grph. Avail

Re: FLINK 1.11 Graphite Metrics

2020-10-25 Thread Chesnay Schepler
Ah wait, in 1.11 it should not longer be necessary to explicitly copy 
the reporter jar.


Please update your reporter configuration to this:

|metrics.reporter.grph.factory.class: 
org.apache.flink.metrics.graphite.GraphiteReporterFactory|


On 10/25/2020 4:00 PM, Chesnay Schepler wrote:

Have you followed the documentation, specifically this bit?

> In order to use this reporter you must copy 
|/opt/flink-metrics-influxdb-1.11.2.jar| into the |plugins/influxdb| 
folder of your Flink distribution.


On 10/24/2020 12:17 AM, Vijayendra Yadav wrote:

Hi Team,

for Flink 1.11 Graphite Metrics. I see the following Error in the log.
Any suggestions?

020-10-23 21:55:14,652 ERROR org.apache.flink.runtime.metrics.ReporterSetup 
   - Could not instantiate metrics reporter grph. Metrics might not be 
exposed/reported.
java.lang.ClassNotFoundException: 
org.apache.flink.metrics.graphite.GraphiteReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:313)
at 
org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:274)
at 
org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:235)
at 
org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:148)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:316)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:270)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:208)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89)

Regards,
Vijay







Re: FLINK 1.11 Graphite Metrics

2020-10-25 Thread Chesnay Schepler

Have you followed the documentation, specifically this bit?

> In order to use this reporter you must copy 
|/opt/flink-metrics-influxdb-1.11.2.jar| into the |plugins/influxdb| 
folder of your Flink distribution.


On 10/24/2020 12:17 AM, Vijayendra Yadav wrote:

Hi Team,

for Flink 1.11 Graphite Metrics. I see the following Error in the log.
Any suggestions?

020-10-23 21:55:14,652 ERROR org.apache.flink.runtime.metrics.ReporterSetup 
   - Could not instantiate metrics reporter grph. Metrics might not be 
exposed/reported.
java.lang.ClassNotFoundException: 
org.apache.flink.metrics.graphite.GraphiteReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:313)
at 
org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:274)
at 
org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:235)
at 
org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:148)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:316)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:270)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:208)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89)

Regards,
Vijay





Re: RestClusterClient and classpath

2020-10-28 Thread Chesnay Schepler

@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the 
classes, or does not exist at all on the machine your application is run on.


On 10/28/2020 12:08 PM, Kostas Kloudas wrote:

Hi all,

I will have a look in the whole stack trace in a bit.

@Chesnay Schepler I think that we are setting the correct classloader
during jobgraph creation [1]. Is that what you mean?

Cheers,
Kostas

[1] 
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122

On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
 wrote:

Always the same problem.

Caused by: java.lang.ClassNotFoundException: it.test.XXX
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 10 more

I've also tried with

 flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");

but nothing changes.

On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler  wrote:

hmm..it appears as if PackagedProgramUtils#createJobGraph does some things 
outside the usercode classlodaer (getPipelineFromProgram()), specifically the 
call to the main method.

@klou This seems like wrong behavior?

@Flavio What you could try in the meantime is wrap the call to createJobGraph 
like this:

final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
try {

Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
// do tstuff
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}


On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:

Any help here?  How can I understand why the classes inside the jar are not 
found when creating the PackagedProgram?

On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier  
wrote:

In the logs I see that the jar is the classpath (I'm trying to debug the 
program from the IDE)..isn'it?

Classpath: [file:/tmp/job-bundle.jar]
...

Best,
Flavio

On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler  wrote:

* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:

Well it happens on the client before you even hit the RestClusterClient, so I 
assume that either your jar is not packaged correctly or you your JobExecutor 
is putting it on the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:

Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm 
trying to use as a client towards the Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
   public static BatchEnv getBatchEnv() {
 // TODO use the following when ready to convert from/to datastream
 // return 
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment ret = BatchTableEnvironment.create(env);
 customizeEnv(ret);
 return new BatchEnv(env, ret);
   }

   private static void customizeEnv(TableEnvironment ret) {
 final Configuration conf = ret.getConfig().getConfiguration();
 // 
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
2);
 conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
 conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
 // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
 // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 
0.4f);//NOSONAR
 // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 
2);//NOSONAR
 // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 
2);// NOSONAR
 conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
 conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
 conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
 conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
 conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
 final List kryoSerializers = new ArrayList<>();
 kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, 
JodaDateTimeSerializer.class));
 kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, 
TBaseSerializer.class));
 kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, 
TBaseSerializer.class));
 con

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Chesnay Schepler
The alternative could also be to use a different argument than "no one 
uses it", e.g., we are fine with removing it at the cost of friction for 
some users because there are better alternatives.


On 10/28/2020 10:46 AM, Kostas Kloudas wrote:

I think that the mailing lists is the best we can do and I would say
that they seem to be working pretty well (e.g. the recent Mesos
discussion).
Of course they are not perfect but the alternative would be to never
remove anything user facing until the next major release, which I find
pretty strict.

On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  wrote:

If the conclusion is that we shouldn't remove it if _anyone_ is using
it, then we cannot remove it because the user ML obviously does not
reach all users.

On 10/28/2020 9:28 AM, Kostas Kloudas wrote:

Hi all,

I am bringing the up again to see if there are any users actively
using the BucketingSink.
So far, if I am not mistaken (and really sorry if I forgot anything),
it is only a discussion between devs about the potential problems of
removing it. I totally understand Chesnay's concern about not
providing compatibility with the StreamingFileSink (SFS) and if there
are any users, then we should not remove it without trying to find a
solution for them.

But if there are no users then I would still propose to remove the
module, given that I am not aware of any efforts to provide
compatibility with the SFS any time soon.
The reasons for removing it also include the facts that we do not
actively maintain it and we do not add new features. As for potential
missing features in the SFS compared to the BucketingSink that was
mentioned before, I am not aware of any fundamental limitations and
even if there are, I would assume that the solution is not to direct
the users to a deprecated sink but rather try to increase the
functionality of the actively maintained one.

Please keep in mind that the BucketingSink is deprecated since FLINK
1.9 and there is a new File Sink that is coming as part of FLIP-143
[1].
Again, if there are any active users who cannot migrate easily, then
we cannot remove it before trying to provide a smooth migration path.

Thanks,
Kostas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:

@Seth: Earlier in this discussion it was said that the BucketingSink
would not be usable in 1.12 .

On 10/16/2020 4:25 PM, Seth Wiesman wrote:

+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:


@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1]
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:

I remember this conversation popping up a few times already and I'm in
general a big fan of removing BucketingSink.

However, until now there were a few features lacking in StreamingFileSink
that are present in BucketingSink and that are being actively used (I

can't

exactly remember them now, but I can look it up if everyone else is also
suffering from bad memory). Did we manage to add them in the meantime? If
not, then it feels rushed to remove it at this point.

On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 

wrote:

@Chesnay Schepler  Off the top of my head, I cannot find an easy way
to migrate from the BucketingSink to the StreamingFileSink. It may be
possible but it will require some effort because the logic would be
"read the old state, commit it, and start fresh with the
StreamingFileSink."

On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
wrote:

On 13.10.20 14:01, David Anderson wrote:

I thought this was waiting on FLIP-46 -- Graceful Shutdown

Handling --

and

in fact, the StreamingFileSink is mentioned in that FLIP as a

motivating

use case.

Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.

Thanks

for the reminder, we shou

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Chesnay Schepler
Then we can't remove it, because there is no way for us to ascertain 
whether anyone is still using it.


Sure, the user ML is the best we got, but you can't argue that we don't 
want any users to be affected and then use an imperfect mean to find users.
If you are fine with relying on the user ML, then you _are_ fine with 
removing it at the cost of friction for some users.


To be clear, I, personally, don't have a problem with removing it (we 
have removed other connectors in the past that did not have a migration 
plan), I just reject he argumentation.


On 10/28/2020 12:21 PM, Kostas Kloudas wrote:

No, I do not think that "we are fine with removing it at the cost of
friction for some users".

I believe that this can be another discussion that we should have as
soon as we establish that someone is actually using it. The point I am
trying to make is that if no user is using it, we should remove it and
not leave unmaintained code around.

On Wed, Oct 28, 2020 at 12:11 PM Chesnay Schepler  wrote:

The alternative could also be to use a different argument than "no one
uses it", e.g., we are fine with removing it at the cost of friction for
some users because there are better alternatives.

On 10/28/2020 10:46 AM, Kostas Kloudas wrote:

I think that the mailing lists is the best we can do and I would say
that they seem to be working pretty well (e.g. the recent Mesos
discussion).
Of course they are not perfect but the alternative would be to never
remove anything user facing until the next major release, which I find
pretty strict.

On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  wrote:

If the conclusion is that we shouldn't remove it if _anyone_ is using
it, then we cannot remove it because the user ML obviously does not
reach all users.

On 10/28/2020 9:28 AM, Kostas Kloudas wrote:

Hi all,

I am bringing the up again to see if there are any users actively
using the BucketingSink.
So far, if I am not mistaken (and really sorry if I forgot anything),
it is only a discussion between devs about the potential problems of
removing it. I totally understand Chesnay's concern about not
providing compatibility with the StreamingFileSink (SFS) and if there
are any users, then we should not remove it without trying to find a
solution for them.

But if there are no users then I would still propose to remove the
module, given that I am not aware of any efforts to provide
compatibility with the SFS any time soon.
The reasons for removing it also include the facts that we do not
actively maintain it and we do not add new features. As for potential
missing features in the SFS compared to the BucketingSink that was
mentioned before, I am not aware of any fundamental limitations and
even if there are, I would assume that the solution is not to direct
the users to a deprecated sink but rather try to increase the
functionality of the actively maintained one.

Please keep in mind that the BucketingSink is deprecated since FLINK
1.9 and there is a new File Sink that is coming as part of FLIP-143
[1].
Again, if there are any active users who cannot migrate easily, then
we cannot remove it before trying to provide a smooth migration path.

Thanks,
Kostas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:

@Seth: Earlier in this discussion it was said that the BucketingSink
would not be usable in 1.12 .

On 10/16/2020 4:25 PM, Seth Wiesman wrote:

+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:


@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1]
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:

I remember this conversation popping up a few times already and I'm in
general a big fan of removing BucketingSink.

However, until now there were a few features lacking 

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Chesnay Schepler
If the conclusion is that we shouldn't remove it if _anyone_ is using 
it, then we cannot remove it because the user ML obviously does not 
reach all users.


On 10/28/2020 9:28 AM, Kostas Kloudas wrote:

Hi all,

I am bringing the up again to see if there are any users actively
using the BucketingSink.
So far, if I am not mistaken (and really sorry if I forgot anything),
it is only a discussion between devs about the potential problems of
removing it. I totally understand Chesnay's concern about not
providing compatibility with the StreamingFileSink (SFS) and if there
are any users, then we should not remove it without trying to find a
solution for them.

But if there are no users then I would still propose to remove the
module, given that I am not aware of any efforts to provide
compatibility with the SFS any time soon.
The reasons for removing it also include the facts that we do not
actively maintain it and we do not add new features. As for potential
missing features in the SFS compared to the BucketingSink that was
mentioned before, I am not aware of any fundamental limitations and
even if there are, I would assume that the solution is not to direct
the users to a deprecated sink but rather try to increase the
functionality of the actively maintained one.

Please keep in mind that the BucketingSink is deprecated since FLINK
1.9 and there is a new File Sink that is coming as part of FLIP-143
[1].
Again, if there are any active users who cannot migrate easily, then
we cannot remove it before trying to provide a smooth migration path.

Thanks,
Kostas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:

@Seth: Earlier in this discussion it was said that the BucketingSink
would not be usable in 1.12 .

On 10/16/2020 4:25 PM, Seth Wiesman wrote:

+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:


@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1]
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:

I remember this conversation popping up a few times already and I'm in
general a big fan of removing BucketingSink.

However, until now there were a few features lacking in StreamingFileSink
that are present in BucketingSink and that are being actively used (I

can't

exactly remember them now, but I can look it up if everyone else is also
suffering from bad memory). Did we manage to add them in the meantime? If
not, then it feels rushed to remove it at this point.

On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 

wrote:

@Chesnay Schepler  Off the top of my head, I cannot find an easy way
to migrate from the BucketingSink to the StreamingFileSink. It may be
possible but it will require some effort because the logic would be
"read the old state, commit it, and start fresh with the
StreamingFileSink."

On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
wrote:

On 13.10.20 14:01, David Anderson wrote:

I thought this was waiting on FLIP-46 -- Graceful Shutdown

Handling --

and

in fact, the StreamingFileSink is mentioned in that FLIP as a

motivating

use case.

Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.

Thanks

for the reminder, we should close FLIP-46 now with an explanatory
message to avoid confusion.

--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng






Re: RestClusterClient and classpath

2020-10-28 Thread Chesnay Schepler
hmm..it appears as if PackagedProgramUtils#createJobGraph does some 
things outside the usercode classlodaer (getPipelineFromProgram()), 
specifically the call to the main method.


@klou This seems like wrong behavior?

@Flavio What you could try in the meantime is wrap the call to 
createJobGraph like this:


final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader(); try {
   
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
 // do tstuff}finally {
   Thread.currentThread().setContextClassLoader(contextClassLoader); }


On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
Any help here?  How can I understand why the classes inside the jar 
are not found when creating the PackagedProgram?


On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier 
mailto:pomperma...@okkam.it>> wrote:


In the logs I see that the jar is the classpath (I'm trying to
debug the program from the IDE)..isn'it?

Classpath: [file:/tmp/job-bundle.jar]
...

Best,
Flavio

On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:

Well it happens on the client before you even hit the
RestClusterClient, so I assume that either your jar is not
packaged correctly or you your JobExecutor is putting it on
the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:

Sure. Here it is (org.apache.flink.client.cli.JobExecutor is
my main class I'm trying to use as a client towards the
Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to
datastream
    // return
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment ret =
BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf =
ret.getConfig().getConfiguration();
    //

conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
FLINK_TEST_TMP_DIR);
    //
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
4); //NOSONAR
    //
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
0.4f);//NOSONAR
    //
conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
32768 * 2);//NOSONAR
    //
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
32768 * 2);// NOSONAR
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
0);// NOSONAR
conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT,
Duration.ofMinutes(10));// NOSONAR
    final List kryoSerializers = new ArrayList<>();
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
JodaDateTimeSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
TBaseSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
TBaseSerializer.class));
conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at

org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at

org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at

org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at

org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at
org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at

it.okkam.datalinks.flink.DatalinksExecutionEnvironment.cus

Re: RestClusterClient and classpath

2020-10-27 Thread Chesnay Schepler

* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
Well it happens on the client before you even hit the 
RestClusterClient, so I assume that either your jar is not packaged 
correctly or you your JobExecutor is putting it on the classpath.


On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main 
class I'm trying to use as a client towards the Flink cluster - 
session mode).

it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to datastream
    // return 
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

    BatchTableEnvironment ret = BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf = ret.getConfig().getConfiguration();
    // 
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
2);

    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
    conf.setString(BlobServerOptions.STORAGE_DIRECTORY, 
FLINK_TEST_TMP_DIR);
    // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); 
//NOSONAR
    // 
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 
0.4f);//NOSONAR
    // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
32768 * 2);//NOSONAR
    // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 
32768 * 2);// NOSONAR

conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
    conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// 
NOSONAR

    final List kryoSerializers = new ArrayList<>();
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, 
JodaDateTimeSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, 
TBaseSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, 
TBaseSerializer.class));

    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at 
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)

at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at 
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at 
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at 
it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)

... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <mailto:rmetz...@apache.org>> wrote:


Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering
if the error happens on the client or server side (among other
questions 

Re: FLINK 1.11 Graphite Metrics

2020-10-27 Thread Chesnay Schepler
Are you writing a test? (otherwise the ReporterSetupTest reporters 
wouldn't be around)

Do you have a dependency on the graphite reporter?

On 10/27/2020 8:27 AM, Robert Metzger wrote:

Hi Vijayendra,
can you post or upload the entire logs, so that we can see the 
Classpath logged on startup, as well as the effective configuration 
parameters?


On Tue, Oct 27, 2020 at 12:49 AM Vijayendra Yadav 
mailto:contact@gmail.com>> wrote:


Hi Chesnay,

Another log message:

2020-10-26 23:33:08,516 WARN
org.apache.flink.runtime.metrics.ReporterSetup - The reporter
factory
(org.apache.flink.metrics.graphite.GraphiteReporterFactory) could
not be found for reporter grph. Available factories:

[org.apache.flink.runtime.metrics.ReporterSetupTest$ConfigExposingReporterFactory,
org.apache.flink.runtime.metrics.ReporterSetupTest$TestReporterFactory,

org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory,
org.apache.flink.runtime.metrics.ReporterSetupTest$FailingFactory].
2020-10-26 23:33:08,517 INFO
org.apache.flink.runtime.metrics.MetricRegistryImpl - No metrics
reporter configured, no metrics will be exposed/reported.
Regards,
Vijay

On Mon, Oct 26, 2020 at 2:34 PM Vijayendra Yadav
mailto:contact@gmail.com>> wrote:

Hi Chesnay,

I have the same, and I am exporting the flinkconf like below,
where i have flink-conf.yaml with configuration you have
given.What else can I try ?

export FLINK_CONF_DIR=${app_install_path}/flinkconf/

regards,
Vijay

On Sun, Oct 25, 2020 at 8:03 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Ah wait, in 1.11 it should not longer be necessary to
explicitly copy the reporter jar.

Please update your reporter configuration to this:

|metrics.reporter.grph.factory.class:
org.apache.flink.metrics.graphite.GraphiteReporterFactory|

On 10/25/2020 4:00 PM, Chesnay Schepler wrote:

Have you followed the documentation, specifically this bit?

> In order to use this reporter you must copy
|/opt/flink-metrics-influxdb-1.11.2.jar| into the
|plugins/influxdb| folder of your Flink distribution.

On 10/24/2020 12:17 AM, Vijayendra Yadav wrote:

Hi Team,

for Flink 1.11 Graphite Metrics. I see the following
Error in the log.
Any suggestions?

020-10-23 21:55:14,652 ERROR 
org.apache.flink.runtime.metrics.ReporterSetup- Could not 
instantiate metrics reporter grph. Metrics might not be exposed/reported.
java.lang.ClassNotFoundException: 
org.apache.flink.metrics.graphite.GraphiteReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:313)
at 
org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:274)
at 
org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:235)
at 
org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:148)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:316)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:270)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:208)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89)

Regards,
Vijay









Re: RestClusterClient and classpath

2020-10-27 Thread Chesnay Schepler
Well it happens on the client before you even hit the RestClusterClient, 
so I assume that either your jar is not packaged correctly or you your 
JobExecutor is putting it on the classpath.


On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main 
class I'm trying to use as a client towards the Flink cluster - 
session mode).

it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to datastream
    // return 
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

    BatchTableEnvironment ret = BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf = ret.getConfig().getConfiguration();
    // 
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
2);

    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
    conf.setString(BlobServerOptions.STORAGE_DIRECTORY, 
FLINK_TEST_TMP_DIR);
    // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); 
//NOSONAR
    // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 
0.4f);//NOSONAR
    // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
32768 * 2);//NOSONAR
    // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 
32768 * 2);// NOSONAR

conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
    conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// 
NOSONAR

    final List kryoSerializers = new ArrayList<>();
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, 
JodaDateTimeSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, 
TBaseSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, 
TBaseSerializer.class));

    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at 
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)

at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at 
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at 
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at 
it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)

... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger > wrote:


Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering if
the error happens on the client or server side (among other
questions I have).

On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier
mailto:pomperma...@okkam.it>> wrote:

Hi to all,
I was trying to use 

Re: Flink 1.11 throws Unrecognized field "error_code"

2020-07-17 Thread Chesnay Schepler
Please double-check that the client and server are using the same Flink 
version.


On 17/07/2020 02:42, Lian Jiang wrote:

Hi,

I am using java 1.8 and Flink 1.11 by following 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/local_installation.html 
on my MAC Mojave 10.14.6. But "

|./bin/flink run examples/streaming/WordCount.jar|
" throw below error. These are the java versions that I tried:

 1.8.0_222, x86_64: "AdoptOpenJDK 8" 
/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home
    1.8.0_221, x86_64: "Java SE 8" 
/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home
    1.8.0_181, x86_64: "Java SE 8" 
/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home


They all behave the same. Any idea is highly appreciated!


Error:

Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: 
Unrecognized field "error_code" (class 
org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody), not 
marked as ignorable (one known property: "jobUrl"])
 at [Source: UNKNOWN; line: -1, column: -1] (through reference chain: 
org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody["error_code"])
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:840)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1192)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1592)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1542)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:504)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4173)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2536)
at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:382)





Re: Any change in behavior related to the "web.upload.dir" behavior between Flink 1.9 and 1.11

2020-08-03 Thread Chesnay Schepler

From what I can tell we have not changed anything.

Are you making any modifications to the image? This exception should 
only be thrown if there is already a file with the same path, and I 
don't think Flink would do that.


On 03/08/2020 21:43, Avijit Saha wrote:

Hello,

Has there been any change in behavior related to the "web.upload.dir" 
behavior between Flink 1.9 and 1.11?


I have a failure case where when build an image using 
"flink:1.11.0-scala_2.12" in Dockerfile, the job manager job 
submissions fail with the following Exception but the same flow works 
fine (for the same underlying Code image) when using 
"flink:1.9.1-scala_2.12"..


This is the Exception stack trace for 1.11 and not seen using 1.9:
--
Caused by: java.nio.file.FileAlreadyExistsException: 
/opt/flink/flink-web-upload
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) 
~[?:1.8.0_262]
at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) 
~[?:1.8.0_262]
at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) 
~[?:1.8.0_262]
at 
sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384) 
~[?:1.8.0_262]
at java.nio.file.Files.createDirectory(Files.java:674) 
~[?:1.8.0_262]
at 
java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) 
~[?:1.8.0_262]
at java.nio.file.Files.createDirectories(Files.java:727) 
~[?:1.8.0_262]
at 
org.apache.flink.runtime.rest.RestServerEndpoint.checkAndCreateUploadDir(RestServerEndpoint.java:478) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rest.RestServerEndpoint.createUploadDir(RestServerEndpoint.java:462) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rest.RestServerEndpoint.(RestServerEndpoint.java:114) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.(WebMonitorEndpoint.java:200) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.(DispatcherRestEndpoint.java:68) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rest.SessionRestEndpointFactory.createRestEndpoint(SessionRestEndpointFactory.java:63) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:152) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]

... 2 more





Re: The bytecode of the class does not match the source code

2020-08-05 Thread Chesnay Schepler
Please make sure you have loaded the correct source jar, and aren't by 
chance still using the 1.11.0 source jar.


On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
      I found  the 
【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 class 
in【flink-runtime_2.11-1.11.1.jar】does not match the source code. Is it 
a problem we need to fix(if it is, what should we do)? or just let it go?







Re: Flink Promethues Metricsreporter Question

2020-08-06 Thread Chesnay Schepler

The PrometheusReporter acts as a scraping target for a single process.
If you already have setup something in the Flink cluster that allows 
Prometheus/ServiceMonitor to scrape (Flink) metrics, then it shouldn't 
be necessary.
It doesn't coordinate with other services in any way; it just has access 
to Flink metrics and waits for someone to scrape them.


On 05/08/2020 23:28, Avijit Saha wrote:

Hi,

Have a general question about Flink support for Prometheus metrics. We 
already have a Prometheus setup in our cluster with ServiceMonitor-s 
monitoring ports like 8080 etc. for scraping metrics.


In a setup like this, if we deploy Flink Job managers/Task managers in 
the cluster, is there any need to have the PrometheusReporter 
configured as well? How does that coordinate with existing 
Prometheus ServiceMonitors if present?


Is the PrometheusReporter based on "pull" model so that it can pull 
metrics from Flink and send to some Prometheus host system?


Thanks
Avijit





Re: Question about ParameterTool

2020-08-11 Thread Chesnay Schepler
The benefit of the ParameterTool is that you do not increase your 
dependency footprint by using it.


When using another CLI library you will generally package it within your 
user-jar, which may or may not increase the risk of dependency conflicts.

Whether, and how large this risk is, depends naturally on the library.
This also results in a larger jar file, which may or may not be relevant 
for you.


On 11/08/2020 23:35, Marco Villalobos wrote:

Thank you for the clarification.

But does it offer any additional benefits that are not clearly documented?



On Tue, Aug 11, 2020 at 12:22 PM Robert Metzger > wrote:


Hi,
there are absolutely no dangers not using ParameterTool.
It is used by the Flink examples, and as a showcase for global job
parameters:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/best_practices.html#register-the-parameters-globally

On Tue, Aug 11, 2020 at 7:13 PM Marco Villalobos
mailto:mvillalo...@kineteque.com>> wrote:

What are the dangers of not using the ParameterTool for
parsing command line parameters?

I have been using Picocli (https://picocli.info/). Will this
be a mistake? Are there any side-effects that I should be
aware of?





Re: The bytecode of the class does not match the source code

2020-08-05 Thread Chesnay Schepler
Well of course these differ; on the left you have the decompiled 
bytecode, on the right the original source.


If these were the same you wouldn't need source jars.

On 05/08/2020 12:20, 魏子涵 wrote:
I'm sure the two versions match up. Following is the pic comparing 
codes in IDEA

https://img-blog.csdnimg.cn/20200805180232929.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0NTRE5yaG1t,size_16,color_FF,t_70






At 2020-08-05 16:46:11, "Chesnay Schepler"  wrote:

Please make sure you have loaded the correct source jar, and
aren't by chance still using the 1.11.0 source jar.

On 05/08/2020 09:57, 魏子涵 wrote:

Hi, everyone:
      I found  the
【org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl】 class
in【flink-runtime_2.11-1.11.1.jar】does not match the source code.
Is it a problem we need to fix(if it is, what should we do)? or
just let it go?










Re: Metrics for the overall pipeline

2020-08-06 Thread Chesnay Schepler
You could create an abstract class that extends AbstractRichFunction, 
and all your remaining functions extend that class and implement the 
respective (Map/etc.)Function interface.


On 06/08/2020 13:20, Manish G wrote:
Adding metrics to individual RichMapFunction implementation classes 
would give metrics information about that particular class.


As a pipeline consists of multiple such classes, how can we have 
metrics for the overall data pipeline?Are there any best practices for it?


With regards





Re: Hadoop_Compatability

2020-08-06 Thread Chesnay Schepler
We still offer a flink-shaded-hadoop-2 artifact that you can find on the 
download page: https://flink.apache.org/downloads.html#additional-components

In 1.9 we changed the artifact name.

Note that we will not release newer versions of this dependency.

As for providing Hadoop class, there is some guidance in the 
documentation 
.
If this does not work fr your, and you want to put the required 
dependencies into the lib/ directory, then you will have to look through 
your dependency tree to determine which Hadoop dependencies are required.
The Flink hadoop-compatibility dependency requires hadoop-common and 
hadoop-mapreduce-client-core, and whatever transitive dependencies these 
have.


On 06/08/2020 13:08, C DINESH wrote:

Hi All,

From 1.9 version there is no *flink-shaded-hadoop2dependency. To use 
Hadoop APIS like *IntWritable , LongWritable. What are the 
dependencies we need to add to use these APIs.


I tried searching in google. Not able to understand the solution. 
Please guide me.


Thanks in Advance.
Dinesh.





Re: Dependency vulnerabilities with Apache Flink 1.10.1 version

2020-08-06 Thread Chesnay Schepler
log4j - If you don't use a Socket appender, you're good. Otherwise, you 
can replace the log4j jars in lib/ with a newer version. You could also 
upgrade to 1.11.1 which uses log4j2.


guava - We do not use Guava for serialization AFAIK. We also do not use 
Java serialization for records.


commons-io - We do not use it for serialization.

protobuf - Only used by specific components for exchanging records. If 
an attacker is able to introduce arbitrary data in places, then they in 
any case wreak havoc in all sorts of ways, be it by introducing false 
data invalidating your pipeline or sending obscenely large messages 
eating up all memory.


all things contained in flink-shaded-hadoop - Please evaluate yourself 
whether this is problematic and for, and use a newer Hadoop version if 
required. Flink 1.11.0 also came with support for Hadoop 3. 
Flink-shaded-hadoop is no longer supported, and was just a collection of 
Hadoop dependencies that we package for convenience.


TL;DR: Please use your own judgement for issues regarding Hadoop, and 
reach out to the Hadoop project for guidance. As for the remaining 
issues, it is unlikely that these vulnerabilities make your setup any 
less secure.


On 06/08/2020 14:06, V N, Suchithra (Nokia - IN/Bangalore) wrote:

Hello,

We are using Apache Flink 1.10.1 version. During our security scans following 
issues are reported by our scan tool.
Please let us know your comments on these dependency vulnerabilities.

Thanks,
Suchithra

-Original Message-
From: m...@gsuite.cloud.apache.org  On Behalf Of 
Apache Security Team
Sent: Thursday, August 6, 2020 1:08 PM
To: V N, Suchithra (Nokia - IN/Bangalore) 
Cc: Jash, Shaswata (Nokia - IN/Bangalore) ; Prabhala, Anuradha 
(Nokia - IN/Bangalore) ; Badagandi, Srinivas B. (Nokia - 
IN/Bangalore) 
Subject: Re: Security vulnerabilities with Apache Flink 1.10.1 version

Hi,

Outdated dependencies are not always security issues.  A project would only be 
affected if a dependency was used in such a way that the affected underlying 
code is used and the vulnerabilities were exposed.
We typically get reports sent to us from scanning tools that looks at 
dependencies out of context on how they are actually used in the projects.  As 
such we reject these reports and suggest you either a) show how the product is 
affected by the dependency vulnerabilities, or
b) simply mention this as a normal bug report to that project.  Since 
dependency vulnerabilities are quite public, there is no need to use this 
private reporting mechanism for them.

Regards, Mark

On Thu, Aug 6, 2020 at 6:04 AM V N, Suchithra (Nokia - IN/Bangalore) 
 wrote:

Hello,



We are using Apache Flink 1.10.1 version. During our security scans following 
issues are reported by our scan tool.



1.Package : log4j-1.2.17

Severity: CRITICAL

Fix version: 2.8.2



Description:

Apache Log4j contains a flaw that is triggered as the SocketServer class 
accepts log data from untrusted network traffic, which it then insecurely 
deserializes. This may allow a remote attacker to potentially execute arbitrary 
code.



Path:

/opt/flink/lib/log4j-1.2.17.jar

/opt/flink/bin/bash-java-utils.jar:log4j



References:

https://cve.mitre.org/cgi-bin/cvename.cgi?name=2019-17571

https://seclists.org/oss-sec/2019/q4/167

https://logging.apache.org/log4j/1.2/



2.Package: guava-14.0.1

Severity: HIGH

Fix version: 25.0, 24.1.1



Description:

Google Guava contains a flaw in the 
CompoundOrdering_CustomFieldSerializer::deserialize() function in 
com/google/common/collect/CompoundOrdering_CustomFieldSerializer.java that is 
triggered when deserializing Java objects. With specially crafted serialized 
data, a context-dependent can exhaust available memory, resulting in a denial 
of service.



References:

https://github.com/google/guava/wiki/CVE-2018-10237

https://cve.mitre.org/cgi-bin/cvename.cgi?name=2018-10237



3.Package: guava-14.0.1,18.0

Severity: HIGH

Fix version: 25.0, 24.1.1



Description:

Google Guava contains a flaw in the AtomicDoubleArray::readObject() function in 
com/google/common/util/concurrent/AtomicDoubleArray.java that is triggered when 
deserializing Java objects. With specially crafted serialized data, a 
context-dependent can cause a process linked against the library to exhaust 
available memory.



References:

https://github.com/google/guava/wiki/CVE-2018-10237

https://cve.mitre.org/cgi-bin/cvename.cgi?name=2018-10237



4. Package: guava-19.0

Severity: HIGH

Fix version: 25.0, 24.1.1, 23.6.1



Description:

Google Guava contains a flaw in the 
CompoundOrdering_CustomFieldSerializer::deserialize() function in 
com/google/common/collect/CompoundOrdering_CustomFieldSerializer.java that is 
triggered when deserializing Java objects. With specially crafted serialized 
data, a context-dependent can exhaust available memory, resulting in a denial 
of service.



References:

https://cve.mitre.org/cgi-bin/cvename.cgi?name=2018-10237


Re: Missing metrics when using metric reporter on high parallelism

2020-08-11 Thread Chesnay Schepler

IIRC this can be caused by the Carbon MAX_CREATES_PER_MINUTE setting.

I would deem it unlikely that the reporter thread is busy for 30 seconds.

On 11/08/2020 16:57, Nikola Hrusov wrote:

Hello,

I am doing some tests with flink 1.11.1 and I have noticed something 
strange/wrong going on with the exported metrics.


I have a configuration like such:
/
metrics.reporter.graphite.class: 
org.apache.flink.metrics.graphite.GraphiteReporterFactory

metrics.reporter.graphite.host: graphite
metrics.reporter.graphite.port: 8080
metrics.reporter.graphite.protocol: tcp
metrics.reporter.graphite.interval: 10 SECONDS/

which should produce metrics to graphite every 10 seconds.

And that works with low parallelism (e.g. <= 20). Then we get all 
metrics, all the time, every 10th second.
However, when I scale my job to 200 parallelism or more, the metrics 
are not sent every 10 seconds. Sometimes they are missing for up to 3 
reporting cycles.
I have had a brief look in the code here: 
https://github.com/apache/flink/blob/release-1.11.1/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java#L107-L144 and 
it looks like there is a separate thread. That was my first guess, if 
it is doing too much work on the same thread.


I have tried lowering the reporting interval from 10 SECONDS to 6-7 
SECONDS, but even in that case there will be missing metrics. Even for 
simpler jobs such as "source -> map -> sink" with higher parallelism 
that would happen.


What can I do to further debug/make this work? Has anyone come across 
this before?


Regards
,
Nikola Hrusov





Re: Metrics for number of events in a timeframe

2020-08-04 Thread Chesnay Schepler
meter * timeframe (in seconds) is the simplest option, although it will 
not be that accurate due to the flattening of spikes.


You'd get the best results by using a time-series database, and 
calculating the difference between the current count and one 5 minutes ago.
An example for Prometheus: 
https://prometheus.io/docs/prometheus/latest/querying/functions/#delta .


On 04/08/2020 15:04, Manish G wrote:

Hi,

Meter gives throughput while counter gives number of events since 
system started. What if I have to find number of events processed in 
say, last 5 minutes.


Is there an inbuilt metrics for it OR do I need to do meter * timeframe?

With regards






Re: Metrics for number of events in a timeframe

2020-08-04 Thread Chesnay Schepler

No, because Flink counters are mapped to Prometheus gauges.

On 04/08/2020 15:52, Manish G wrote:

That documentation states:

|delta| should only be used with gauges.

Would that cause an issue as we are using counter.

With regards

On Tue, Aug 4, 2020 at 7:12 PM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


meter * timeframe (in seconds) is the simplest option, although it
will
not be that accurate due to the flattening of spikes.

You'd get the best results by using a time-series database, and
calculating the difference between the current count and one 5
minutes ago.
An example for Prometheus:
https://prometheus.io/docs/prometheus/latest/querying/functions/#delta
.

On 04/08/2020 15:04, Manish G wrote:
> Hi,
>
> Meter gives throughput while counter gives number of events since
> system started. What if I have to find number of events
processed in
> say, last 5 minutes.
>
> Is there an inbuilt metrics for it OR do I need to do meter *
timeframe?
>
> With regards
>
>





Re: getting in an infinite loop while creating the dependency-reduced pom

2020-08-03 Thread Chesnay Schepler

https://issues.apache.org/jira/browse/MSHADE-148

On 03/08/2020 10:16, Dongwon Kim wrote:

Hi,

I create a new maven project (I'm using Maven 3.6.3) w/ the below command

|curl https://flink.apache.org/q/quickstart-SNAPSHOT.sh | bash -s
1.11.1|

and add the following dependencies to dependencies


org.apache.flink
flink-connector-kafka_${scala.binary.version}
${flink.version}



org.apache.flink
flink-test-utils_${scala.binary.version}
${flink.version}
test


org.apache.flink
flink-runtime_${scala.binary.version}
${flink.version}
test
tests


org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
test
tests



When executing "mvn clean package", I've got stuck in

[INFO] Scanning for projects...
[INFO]
[INFO] --< org.myorg.quickstart:quickstart 
>---
[INFO] Building Flink Quickstart Job 0.1
[INFO] [ jar 
]-
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ quickstart ---
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ 
quickstart ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ quickstart 
---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to 
/Users/east.12/tmp/quickstart/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) 
@ quickstart ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory 
/Users/east.12/tmp/quickstart/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ 
quickstart ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ quickstart ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ quickstart ---
[INFO] Building jar: /Users/east.12/tmp/quickstart/target/quickstart-0.1.jar
[INFO]
[INFO] --- maven-shade-plugin:3.1.1:shade (default) @ quickstart ---
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding org.apache.flink:force-shading:jar:1.11.1 from the shaded 
jar.
[INFO] Excluding org.apache.logging.log4j:log4j-slf4j-impl:jar:2.12.1 from 
the shaded jar.
[INFO] Excluding org.apache.logging.log4j:log4j-api:jar:2.12.1 from the 
shaded jar.
[INFO] Excluding org.apache.logging.log4j:log4j-core:jar:2.12.1 from the 
shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.11.1 in 
the shaded jar.
[INFO] Including 
org.apache.flink:flink-connector-kafka-base_2.11:jar:1.11.1 in the shaded jar.
[INFO] Including org.apache.kafka:kafka-clients:jar:2.4.1 in the shaded jar.
[INFO] Including com.github.luben:zstd-jni:jar:1.4.3-1 in the shaded jar.
[INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar.
[INFO] Including org.lz4:lz4-java:jar:1.6.0 in the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /Users/east.12/tmp/quickstart/target/quickstart-0.1.jar 
with /Users/east.12/tmp/quickstart/target/quickstart-0.1-shaded.jar
[INFO] Dependency-reduced POM written at: 
/Users/east.12/tmp/quickstart/dependency-reduced-pom.xml

Any help?

Thanks,

Dongwon





Re: Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Chesnay Schepler
WatermarkStrategy.forBoundedOutOfOrderness(Duration.of(1, 
ChronoUnit.MINUTES)) returns a WatermarkStrategy, but the exact type 
is entirely dependent on the variable declaration (i.e., it is not 
dependent on any argument).


So, when you assign the strategy to a variable then the compiler can 
infer the generic type. Without a variable it is treated as a 
WatermarkStrategy, because there is nothing to infer the type from.


On 08/07/2020 08:54, Niels Basjes wrote:

Hi,

I'm migrating some of my code to Flink 1.11 and I ran into something I 
find strange.


This works

WatermarkStrategy watermarkStrategy = WatermarkStrategy
 .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES));

watermarkStrategy  
.withTimestampAssigner((SerializableTimestampAssigner) (element, 
recordTimestamp) -> 42L);

However this does NOT work

WatermarkStrategy watermarkStrategy = WatermarkStrategy
 .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))  
.withTimestampAssigner((SerializableTimestampAssigner) (element, 
recordTimestamp) -> 42L);


When I try to compile this last one I get

Error:(109, 13) java: no suitable method found for 
withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)
    method 
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.TimestampAssignerSupplier) 
is not applicable
      (argument mismatch; 
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner 
cannot be converted to 
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier)
    method 
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner) 
is not applicable
      (argument mismatch; 
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner 
cannot be converted to 
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner)


Why is that?

--
Best regards / Met vriendelijke groeten,

Niels Basjes





Re: A query on Flink metrics in kubernetes

2020-07-09 Thread Chesnay Schepler
From Flink's perspective no metrics are aggregated, nor are metric 
requests forwarded to some other process.


Each TaskExecutor has its own reporter, that each must be scraped to get 
the full set of metrics.


On 09/07/2020 11:39, Manish G wrote:

Hi,

I have a query regarding prometheus scraping Flink metrics data with 
application running in kubernetes cluster.


If taskmanager is running on multiple nodes, and prometheus requests 
for the metrics data, then is that request directed to one of the 
nodes(based on some strategy, like round-robin) or is data aggregated 
from all the nodes?


With regards





Re: Limiting metrics logs to custom metric

2020-07-08 Thread Chesnay Schepler
There's no built-in functionality for this. You could customize the 
reporter though.


On 08/07/2020 17:19, Manish G wrote:

Hi,

I have added a Meter in my code and pushing it to app logs using slf4j 
reporter.


I observe that apart from my custometrics, lots of other metrics like 
gauge, histogram etc is also published. It makes it difficult to 
filter out data for generating splunk graphs.


Is there a way to limit published metrics to just the custom one?

With regards





Re: Limiting metrics logs to custom metric

2020-07-08 Thread Chesnay Schepler
Not really; but essentially you have to override 
SLF4JReporter#notifyOfAddedMetric and filter the metrics you're 
interested in. Then build the flink-metrics-slf4j module, and replace 
the corresponding jar in your distribution.


On 08/07/2020 18:20, Manish G wrote:

Ok.Any resource on same?


On Wed, Jul 8, 2020, 9:38 PM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


There's no built-in functionality for this. You could customize the
reporter though.

On 08/07/2020 17:19, Manish G wrote:
> Hi,
>
> I have added a Meter in my code and pushing it to app logs using
slf4j
> reporter.
>
> I observe that apart from my custometrics, lots of other metrics
like
> gauge, histogram etc is also published. It makes it difficult to
> filter out data for generating splunk graphs.
>
> Is there a way to limit published metrics to just the custom one?
>
> With regards






Re: Integrating prometheus

2020-07-03 Thread Chesnay Schepler

Yes, you do need to extend RichFunction; there's no way around that.

As for the missing metric, the usual cause is that the job/task finishes 
so quickly that the metric is never reported. If this is not the cause I 
would recommend enabling DEBUG logging and searching for warnings from 
the metric groups/registry/reporter.


On 03/07/2020 19:27, Manish G wrote:
Also, it seems custom metrics can only be recorded if we extend 
RichFunction, as it allows us to override open wherein we can get hold 
of context and metrics constructs.


Please let me know if there are other ways too.

On Fri, Jul 3, 2020 at 10:05 PM Manish G <mailto:manish.c.ghildi...@gmail.com>> wrote:


Hi,

I am basically looking for : throughput, success rate, error rate.

For experimental purposes I could complete all configurations as
explained in the official documentation. But somehow my custom
metrics(a simple Counter) is still not shown on the prometheus
board, though default metrics I can see.

Anything I am missing?

On Fri, Jul 3, 2020 at 8:57 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

What metrics specifically are you interested in?

On 03/07/2020 17:22, Robert Metzger wrote:

Hi Manish,

Currently, Flink's metric system does not support metrics via
annotations. You need to go with the documented approach.
But of course, you can try to build your own metrics
abstraction based on Flink's metric system.

On Fri, Jul 3, 2020 at 9:35 AM Manish G
mailto:manish.c.ghildi...@gmail.com>> wrote:

Hi,

I am following this

<https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html>
 link
on how to integrate prometheus with flink.
Going by the code sample,  I would need to insert
related metrics code in the main logic.

Is it avoidable, like by using some annotations on methods?

Manish







Re: SSL for QueryableStateClient

2020-07-07 Thread Chesnay Schepler

Queryable state does not support SSL.

On 06/07/2020 22:42, mail2so...@yahoo.co.in wrote:

Hello,

I am running flink on Kubernetes, and from outside the Ingress to a 
proxy on Kubernetes is via SSL 443 PORT only.


Can you please provide guidance on how to setup the SSL for 
/*QueryableStateClient*/, the client to inquire the state.



Please let me know if any other details is needed.

Thanks & Regards
Souma Suvra Ghosh





Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler

Have you looked at the SLF4J reporter?

https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:

Hi,

Is it possible to log Flink metrics in application logs apart from 
publishing it to Prometheus?


With regards





Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
Please enable debug logging and search for warnings from the metric 
groups/registry/reporter.


If you cannot find anything suspicious, you can also send the foll log 
to me directly.


On 06/07/2020 16:29, Manish G wrote:
Job is an infinite streaming one, so it keeps going. Flink 
configuration is as:


metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


How long did the job run for, and what is the configured interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in
flink-conf.yml, copying the jar in lib directory), and registered
the Meter with metrics group and invoked markEvent() method in
the target code. But I don't see any related logs.
I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Have you looked at the SLF4J reporter?


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:
> Hi,
>
> Is it possible to log Flink metrics in application logs
apart from
> publishing it to Prometheus?
>
> With regards








Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler

How long did the job run for, and what is the configured interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in 
flink-conf.yml, copying the jar in lib directory), and registered the 
Meter with metrics group and invoked markEvent() method in the target 
code. But I don't see any related logs.

I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


Have you looked at the SLF4J reporter?


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:
> Hi,
>
> Is it possible to log Flink metrics in application logs apart from
> publishing it to Prometheus?
>
> With regards






Re: Decompressing Tar Files for Batch Processing

2020-07-07 Thread Chesnay Schepler

I would probably go with a separate process.

Downloading the file could work with Flink if it is already present in 
some supported filesystem. Decompressing the file is supported for 
selected formats (deflate, gzip, bz2, xz), but this seems to be an 
undocumented feature, so I'm not sure how usable it is in reality.


On 07/07/2020 01:30, Austin Cawley-Edwards wrote:

Hey all,

I need to ingest a tar file containing ~1GB of data in around 10 CSVs. 
The data is fairly connected and needs some cleaning, which I'd like 
to do with the Batch Table API + SQL (but have never used before). 
I've got a small prototype loading the uncompressed CSVs and applying 
the necessary SQL, which works well.


I'm wondering about the task of downloading the tar file and unzipping 
it into the CSVs. Does this sound like something I can/ should do in 
Flink, or should I set up another process to download, unzip, and 
store in a filesystem to then read with the Flink Batch job? My 
research is leading me towards doing it separately but I'd like to do 
it all in the same job if there's a creative way.


Thanks!
Austin





Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
WSL is a bit buggy when it comes to allocating ports; it happily lets 2 
processes create sockets on the same port, except that the latter one 
doesn't do anything.

Super annying, and I haven't found a solution to that myself yet.

You'll have to configure the ports explicitly for the JM/TM, which will 
likely entail manually starting the processes and updating the 
configuration in-between, e.g.:


./bin/jobmanager.sh start

./bin/taskmanager.sh start

On 06/07/2020 19:16, Manish G wrote:

Yes.

On Mon, Jul 6, 2020 at 10:43 PM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


Are you running Flink is WSL by chance?

On 06/07/2020 19:06, Manish G wrote:

In flink-conf.yaml:
*metrics.reporter.prom.port: 9250-9260*

This is based on information provided here

<https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter>
/*|port|- (optional) the port the Prometheus exporter listens on,
defaults to9249
<https://github.com/prometheus/prometheus/wiki/Default-port-allocations>.
In order to be able to run several instances of the reporter on
one host (e.g. when one TaskManager is colocated with the
JobManager) it is advisable to use a port range like|9250-9260|.*/
/*
*/
As I am running flink locally, so both jobmanager and taskmanager
are colocated.

In prometheus.yml:
*- job_name: 'flinkprometheus'
    scrape_interval: 5s
    static_configs:
      - targets: ['localhost:9250', 'localhost:9251']
    metrics_path: /*
*
*
This is the whole configuration I have done based on several
tutorials and blogs available online.
**


/**/


On Mon, Jul 6, 2020 at 10:20 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

These are all JobManager metrics; have you configured
prometheus to also scrape the task manager processes?

On 06/07/2020 18:35, Manish G wrote:

The metrics I see on prometheus is like:
# HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
lastCheckpointRestoreTimestamp (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge

flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 -1.0
# HELP flink_jobmanager_job_numberOfFailedCheckpoints 
numberOfFailedCheckpoints (scope: jobmanager_job)
# TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge

flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0
# HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
jobmanager_Status_JVM_Memory_Heap)
# TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 
1.029177344E9
# HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count 
Count (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count 
gauge

flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
 2.0
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
# HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity 
TotalCapacity (scope: jobmanager_Status_JVM_Memory_Direct)
# TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge

flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
604064.0
# HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: 
jobmanager_job)
# TYPE flink_jobmanager_job_fullRestarts gauge

flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0



On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

You've said elsewhere that you do see some metrics in
prometheus, which are those?

Why are you configuring the host for the prometheus
reporter? This option is only for the
PrometheusPushGatewayReporter.

On 06/07/2020 18:01, Manish G wrote:

Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.p

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler

Are you running Flink is WSL by chance?

On 06/07/2020 19:06, Manish G wrote:

In flink-conf.yaml:
*metrics.reporter.prom.port: 9250-9260*

This is based on information provided here 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter>
/*|port|- (optional) the port the Prometheus exporter listens on, 
defaults to9249 
<https://github.com/prometheus/prometheus/wiki/Default-port-allocations>. 
In order to be able to run several instances of the reporter on one 
host (e.g. when one TaskManager is colocated with the JobManager) it 
is advisable to use a port range like|9250-9260|.*/

/*
*/
As I am running flink locally, so both jobmanager and taskmanager are 
colocated.


In prometheus.yml:
*- job_name: 'flinkprometheus'
    scrape_interval: 5s
    static_configs:
      - targets: ['localhost:9250', 'localhost:9251']
    metrics_path: /*
*
*
This is the whole configuration I have done based on several tutorials 
and blogs available online.

**


/**/


On Mon, Jul 6, 2020 at 10:20 PM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


These are all JobManager metrics; have you configured prometheus
to also scrape the task manager processes?

On 06/07/2020 18:35, Manish G wrote:

The metrics I see on prometheus is like:
# HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
lastCheckpointRestoreTimestamp (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge

flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 -1.0
# HELP flink_jobmanager_job_numberOfFailedCheckpoints 
numberOfFailedCheckpoints (scope: jobmanager_job)
# TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge

flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0
# HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
jobmanager_Status_JVM_Memory_Heap)
# TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
# HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count 
Count (scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge

flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
 2.0
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
# HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity 
TotalCapacity (scope: jobmanager_Status_JVM_Memory_Direct)
# TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
604064.0
# HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: 
jobmanager_job)
# TYPE flink_jobmanager_job_fullRestarts gauge

flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0



On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

You've said elsewhere that you do see some metrics in
prometheus, which are those?

Why are you configuring the host for the prometheus reporter?
This option is only for the PrometheusPushGatewayReporter.

On 06/07/2020 18:01, Manish G wrote:

Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS
//

And while I can see custom metrics in Taskmanager logs, but
prometheus dashboard logs doesn't show custom metrics.

With regards

On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

You have explicitly configured a reporter list,
resulting in the slf4j reporter being ignored:

2020-07-06 13:48:22,191 INFO
org.apache.flink.configuration.GlobalConfiguration -
Loading configuration property: metrics.reporters, prom
2020-07-06 13:48:23,203 INFO
 

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
You have explicitly configured a reporter list, resulting in the slf4j 
reporter being ignored:


2020-07-06 13:48:22,191 INFO 
org.apache.flink.configuration.GlobalConfiguration    - Loading 
configuration property: metrics.reporters, prom
2020-07-06 13:48:23,203 INFO 
org.apache.flink.runtime.metrics.ReporterSetup    - 
Excluding reporter slf4j, not configured in reporter list (prom).


Note that nowadays metrics.reporters is no longer required; the set of 
reporters is automatically determined based on configured properties; 
the only use-case is disabling a reporter without having to remove the 
entire configuration.

I'd suggest to just remove the option, try again, and report back.

On 06/07/2020 16:35, Chesnay Schepler wrote:
Please enable debug logging and search for warnings from the metric 
groups/registry/reporter.


If you cannot find anything suspicious, you can also send the foll log 
to me directly.


On 06/07/2020 16:29, Manish G wrote:
Job is an infinite streaming one, so it keeps going. Flink 
configuration is as:


metrics.reporter.slf4j.class: 
org.apache.flink.metrics.slf4j.Slf4jReporter

metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


How long did the job run for, and what is the configured interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in
flink-conf.yml, copying the jar in lib directory), and
registered the Meter with metrics group and invoked markEvent()
method in the target code. But I don't see any related logs.
I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Have you looked at the SLF4J reporter?


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:
> Hi,
>
> Is it possible to log Flink metrics in application logs
apart from
> publishing it to Prometheus?
>
> With regards










Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
These are all JobManager metrics; have you configured prometheus to also 
scrape the task manager processes?


On 06/07/2020 18:35, Manish G wrote:

The metrics I see on prometheus is like:
# HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp 
lastCheckpointRestoreTimestamp (scope: jobmanager_job)
# TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge
flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 -1.0
# HELP flink_jobmanager_job_numberOfFailedCheckpoints numberOfFailedCheckpoints 
(scope: jobmanager_job)
# TYPE flink_jobmanager_job_numberOfFailedCheckpoints gauge
flink_jobmanager_job_numberOfFailedCheckpoints{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0
# HELP flink_jobmanager_Status_JVM_Memory_Heap_Max Max (scope: 
jobmanager_Status_JVM_Memory_Heap)
# TYPE flink_jobmanager_Status_JVM_Memory_Heap_Max gauge
flink_jobmanager_Status_JVM_Memory_Heap_Max{host="localhost",} 1.029177344E9
# HELP flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count Count 
(scope: jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count gauge
flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count{host="localhost",}
 2.0
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",} 8.42E9
# HELP flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity TotalCapacity 
(scope: jobmanager_Status_JVM_Memory_Direct)
# TYPE flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity gauge
flink_jobmanager_Status_JVM_Memory_Direct_TotalCapacity{host="localhost",} 
604064.0
# HELP flink_jobmanager_job_fullRestarts fullRestarts (scope: jobmanager_job)
# TYPE flink_jobmanager_job_fullRestarts gauge
flink_jobmanager_job_fullRestarts{job_id="58483036154d7f72ad1bbf10eb86bc2e",host="localhost",job_name="frauddetection",}
 0.0



On Mon, Jul 6, 2020 at 9:51 PM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


You've said elsewhere that you do see some metrics in prometheus,
which are those?

Why are you configuring the host for the prometheus reporter? This
option is only for the PrometheusPushGatewayReporter.

On 06/07/2020 18:01, Manish G wrote:

Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS
//

And while I can see custom metrics in Taskmanager logs, but
prometheus dashboard logs doesn't show custom metrics.

With regards

On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

You have explicitly configured a reporter list, resulting in
the slf4j reporter being ignored:

2020-07-06 13:48:22,191 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: metrics.reporters, prom
2020-07-06 13:48:23,203 INFO
org.apache.flink.runtime.metrics.ReporterSetup - Excluding
reporter slf4j, not configured in reporter list (prom).

Note that nowadays metrics.reporters is no longer required;
the set of reporters is automatically determined based on
configured properties; the only use-case is disabling a
reporter without having to remove the entire configuration.
I'd suggest to just remove the option, try again, and report
back.

On 06/07/2020 16:35, Chesnay Schepler wrote:

Please enable debug logging and search for warnings from the
metric groups/registry/reporter.

If you cannot find anything suspicious, you can also send
the foll log to me directly.

On 06/07/2020 16:29, Manish G wrote:

Job is an infinite streaming one, so it keeps going. Flink
configuration is as:

metrics.reporter.slf4j.class:
    org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

How long did the job run for, and what is the
configured interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the
   

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
You've said elsewhere that you do see some metrics in prometheus, which 
are those?


Why are you configuring the host for the prometheus reporter? This 
option is only for the PrometheusPushGatewayReporter.


On 06/07/2020 18:01, Manish G wrote:

Hi,

So I have following in flink-conf.yml :
//
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS
//

And while I can see custom metrics in Taskmanager logs, but prometheus 
dashboard logs doesn't show custom metrics.


With regards

On Mon, Jul 6, 2020 at 8:55 PM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


You have explicitly configured a reporter list, resulting in the
slf4j reporter being ignored:

2020-07-06 13:48:22,191 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: metrics.reporters, prom
2020-07-06 13:48:23,203 INFO
org.apache.flink.runtime.metrics.ReporterSetup - Excluding
reporter slf4j, not configured in reporter list (prom).

Note that nowadays metrics.reporters is no longer required; the
set of reporters is automatically determined based on configured
properties; the only use-case is disabling a reporter without
having to remove the entire configuration.
I'd suggest to just remove the option, try again, and report back.

On 06/07/2020 16:35, Chesnay Schepler wrote:

Please enable debug logging and search for warnings from the
metric groups/registry/reporter.

If you cannot find anything suspicious, you can also send the
foll log to me directly.

On 06/07/2020 16:29, Manish G wrote:

Job is an infinite streaming one, so it keeps going. Flink
configuration is as:

metrics.reporter.slf4j.class:
org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 30 SECONDS



On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

How long did the job run for, and what is the configured
interval?


On 06/07/2020 15:51, Manish G wrote:

Hi,

Thanks for this.

I did the configuration as mentioned at the link(changes in
flink-conf.yml, copying the jar in lib directory), and
registered the Meter with metrics group and invoked
markEvent() method in the target code. But I don't see any
related logs.
I am doing this all on my local computer.

Anything else I need to do?

With regards
Manish

On Mon, Jul 6, 2020 at 5:24 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Have you looked at the SLF4J reporter?


https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 06/07/2020 13:49, Manish G wrote:
> Hi,
>
> Is it possible to log Flink metrics in application
logs apart from
> publishing it to Prometheus?
>
> With regards












Re: RequiredParameters in Flink 1.11

2020-07-13 Thread Chesnay Schepler
/** * ... * * @deprecated These classes will be dropped in the next 
version. Use {@link ParameterTool} or a third-party * command line 
parsing library instead. */



On 13/07/2020 17:24, Flavio Pompermaier wrote:

In Flink 1.11 RequiredParameters and Option have been deprecated.
Is there any recommendation in this sense (about how to get rid of 
deprecation warnings)?


Best,
Flavio





Re: Missing jars

2020-07-14 Thread Chesnay Schepler

flink-formats is a pom artifact, meaning that there are no jars for it.

You should add a dependency to the specific format(s) you are interested 
in, like flink-formats-csv.


On 14/07/2020 17:41, Daves Open wrote:
I added flink-formats to my pom.xml file, but the jar files are not 
found.  I checked in mvnrepository and the pom entry exists, but there 
are no files.  Can you please confirm that the jars for flink-format 
are available in the repos?  The following is my pom entry:



    org.apache.flink
    flink-formats
    1.11.0


Thanks





Re: RestartStrategy failure count when losing a Task Manager

2020-07-15 Thread Chesnay Schepler
1) A restart in one region only increments the count by 1, independent 
of how many tasks from that region fail at the same time.
If tasks from different regions fail at the same time, then the bound is 
incremented by the number of affected regions.


2)

I would consider what failure rate is acceptable if there were no 
regions, and then multiple that but the number of slots to offset task 
executor failures.



Failures in the application (e.g., a source failing for some reason) 
will generally behave, failure-rate wise, as if regions would not exist. 
They are sporadic, and the chance of them appearing in different regions 
at the same time seems rather small.



On 15/07/2020 00:16, Jiahui Jiang wrote:
Hello Flink, I have some questions regarding to the guideline on 
configuring restart strategy.


I was testing a job with the following setup:

 1. There are many tasks, but currently I'm running with only 2
parallelism, but plenty of task slots (4 TM and 4 task slot in
each TM).
 2. It's ran in k8s with HA enabled.
 3. The current restart strategy is 'failure-rate' with 30mins failure
interval, 1 min delay interval and 3 failure rate.

When a TM got removed by k8s, it looked like that caused multiple 
failure to happen all at once. In the job manager log, I'm seeing 
different task failed with the same stacktrace 'Heartbeat of 
taskManager with id {SOME_ID} timed out' around the same time.


I understand that all the tasks that were running on this taskManager 
would fail. But still have these following questions:


Questions:

 1. What count as one failure for a restartStrategy? It doesn't look
like every failed task counts towards one failure according to my
other jobs. Is it because we have failover strategy defaults to be
region, and each failure only trigger part of the job graph to
restart, and the rest of the 'not retriggered' job graph can still
cause more failure that will be counted towards failure rate?
 2. If that's the case, what will be the recommended way to set
restart strategy? If we don't want to hard code a number for every
single pipeline we are running, is that a good way to infer how to
set the failure rate?

Thank you so much!
Jiahui





Re: Communicating with my operators

2020-07-15 Thread Chesnay Schepler

Using an S3 bucket containing the configuration is the way to go.

1) web sockets, or more generally all approaches where you connect to 
the source


The JobGraph won't help you; it doesn't contain the information on where 
tasks are deployed to at runtime. It is just an abstract representation 
of your job.


You could theoretically retrieve the actual location through the REST 
API, and maybe expose the port as a metric.


But then you still have to deal with resolving IPs, internal/external 
IPs and all that jazz.


2) CoProcessFunction

We still have to get the data in somehow; so you'd need to have some 
source in any case :)


3) ParameterTool

This is really just a parsing tool, so it won't help for this use-case.

4) State Processing API

A bit too complicated. If restarting jobs is an option, you could just 
encode the commands into the source, emit them as an event of sort, and 
the process function updates it's state on reception of these events.


On 15/07/2020 10:00, Tom Wells wrote:

Hi Everyone

I'm looking for some advice on designing my operators (which 
unsurprisingly tend to take the form of SourceFunctions, 
ProcessFunctions or SinkFunctions) to allow them to be "dynamically 
configured" while running.


By way of example, I have a SourceFunction which collects the names of 
various S3 buckets, and then a ProcessFunction which reads and 
collects their contents. The gotcha is that the list of S3 buckets is 
not fixed, and can be changed during the lifetime of the job. This 
add/remove action would be done by some human administrator, and lets 
say using a simple command line tool.


For example - here is an idea of what I want to build to "communicate" 
with my flink job:


```
# Add a bucket to the flink job to process
$ ./admin-tool add-bucket --name my-s3-bucket --region eu-west-1 
--access-key ...


# Get a list of the s3 buckets we're currently processing, and when 
last they were last accessed

$ ./admin-tool list-buckets
my-s3-bucket | eu-west-1 | 5 seconds ago

# Remove buckets
$ ./admin-tool remove-bucket --name my-s3-bucket
```

Hope that gives you an idea - of course this could apply to any number 
of different source types, and could even extend to configuration of 
sinks etc too.


So - how should my command line tool communicate with my operators?

4 alternative approaches I've thought about:

- Have a SourceFunction open a websocket and listen for bucket 
add/remove commands (written to by the command line tool). I think 
this would work, but the difficulty is in figuring out where exactly 
the SourceFunction might be deployed in the flink cluster to find the 
websocket listening port. I took a look at the ClusterClient API and 
it's possibly available by inspecting the JobGraph... I'm just not 
sure if this is an anti-pattern?


- Use a CoProcessFunction instead, and have it be joined with a 
DataStream that I can somehow write to directly from the command line 
tool (maybe using flink-client api - can i write to a DataStream 
directly??). I don't think this is possible but would feel like a good 
clean approach?


- Somehow using the ParameterTool. I don't think it supports a dynamic 
use-case though?


- Writing directly to the saved state of a ProcessFunction to add the 
remove bucket names. I'm pretty unfamiliar with this approach - but 
looks possible according to the docs on the State Processor API - 
however it seems like I would have to read the savepoint, write the 
updates, then restore from savepoint which may mean suspending and 
resuming the job while that happens. Not really an issue for me, but 
does feel like possibly the wrong approach for my simple requirement.


- Solve it just using datasources - e.g. create a centrally read s3 
bucket which holds the latest configuration and is sourced and joined 
by every operator (probably using Broadcast State). My command line 
tool would then just have to write to that S3 bucket - no need to 
communicate directly with the operators.


The last option is fairly obvious and probably my default approach - 
I'm just wondering if whether any of the alternatives above are worth 
investigating. (Especially considering my endless quest to learn 
everything about Flink - i don't mind exploring the less obvious 
pathways).


I would love some guidance or advice on what you feel is the best 
approach / idiomatic approach for this.


All the best,
Tom





Re: Is there a way to access the number of 'keys' that have been used for partioning?

2020-07-15 Thread Chesnay Schepler
This information is not readily available; in fact Flink itself doesn't 
know how many keys there are at any point.

You'd have to calculate it yourself.

On 15/07/2020 17:11, orionemail wrote:

Hi,

I need to query the number of keys that a stream has been split by, is 
there a way to do this?


Thanks,

O







Re: Integrating prometheus

2020-07-03 Thread Chesnay Schepler

What metrics specifically are you interested in?

On 03/07/2020 17:22, Robert Metzger wrote:

Hi Manish,

Currently, Flink's metric system does not support metrics via 
annotations. You need to go with the documented approach.
But of course, you can try to build your own metrics abstraction based 
on Flink's metric system.


On Fri, Jul 3, 2020 at 9:35 AM Manish G > wrote:


Hi,

I am following this


 link
on how to integrate prometheus with flink.
Going by the code sample,  I would need to insert related metrics
code in the main logic.

Is it avoidable, like by using some annotations on methods?

Manish





Re: Customised RESTful trigger

2020-07-12 Thread Chesnay Schepler
You can specify arguments to your job via query parameters or a json 
body (recommended) as documented here 
.


On 10/07/2020 18:48, Jacek Grzebyta wrote:

Hello,

I am a newbie in the Apache Flink environment. I found it is possible 
to trigger a job using the MONITORING REST API. Is it possible to 
customise a request to start a job with some parameters? From the 
bigger perspective I would like to provide a large file URL into a 
Flink application to do a TFL job.


For example after request:

/job?inputFile=s3://my-bucket/input/input-600m.json

Flink will start the FTL on an instance. Independently if the service 
will receive another query:


/job?inputFile=s3://my-bucket/input/input-other2G.json

Flink would start the other processing on the other job instance.

I thought I could deploy the jar file with parameters but that would 
be quite weird.
I have no idea how can I solve that without converting a REST request 
into a stream event first which would be the simplest.


Regards,
Jacek





Re: History Server Not Showing Any Jobs - File Not Found?

2020-07-12 Thread Chesnay Schepler
Ah, I remembered wrong, my apologies. Unfortunately there is no option 
to prevent the cleanup; it is something I wanted to have for a long time 
though...


On 11/07/2020 17:57, Hailu, Andreas wrote:


Thanks for the clarity. To this point you made:

/(Note that by configuring "historyserver.web.tmpdir" to some 
permanent directory subsequent (re)starts of the HistorySserver can 
re-use this directory; so you only have to download things once)/


The HistoryServer process in fact deletes this local cache during its 
shutdown hook. Is there a setting we can use so that it doesn’t do this?


2020-07-11 11:43:29,527 [HistoryServer shutdown hook] INFO 
HistoryServer - *Removing web dashboard root cache directory 
/local/scratch/flink_historyserver_tmpdir*


2020-07-11 11:43:29,536 [HistoryServer shutdown hook] INFO 
HistoryServer - Stopped history server.


We’re attempting to work around the UI becoming un-responsive/crashing 
the browser at a large number archives (in my testing, that’s around 
20,000 archives with Chrome)  by persisting the job IDs of our 
submitted apps and then navigating to the job overview page directly, 
e.g. http://(host):(port)/#/job/(jobId)/overview 
. It would have been 
really great if the server stored archives by the application ID 
rather than the job ID – particularly for apps that potentially submit 
hundreds of jobs. Tracking one application ID (ala Spark) would ease 
the burden on the dev + ops side. Perhaps a feature for the future J


*// *ah**

*From:*Chesnay Schepler 
*Sent:* Tuesday, June 2, 2020 3:55 AM
*To:* Hailu, Andreas [Engineering] ; 
user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

1) It downloads all archives and stores them on disk; the only thing 
stored in memory is the job ID or the archive. There is no hard upper 
limit; it is mostly constrained by disk space / memory. I say mostly, 
because I'm not sure how well the WebUI handles 100k jobs being loaded 
into the overview.


2) No, there is no retention policy. It is currently expected that an 
external process cleans up archives. If an archive was deleted (from 
the archive directory) the HistoryServer does notice that and also 
delete the local copy.


On 01/06/2020 23:05, Hailu, Andreas wrote:

So I created a new HDFS directory with just 1 archive and pointed
the server to monitor that directory, et voila – I’m able to see
the applications in the UI. So it must have been really churning
trying to fetch all of those initial archives J

I have a couple of follow up questions if you please:

1.What is the upper limit of the number of archives the history
server can support? Does it attempt to download every archive and
load them all into memory?

2.Retention: we have on the order of 100K applications per day in
our production environment. Is there any native retention of
policy? E.g. only keep the latest X archives in the dir - or is
this something we need to manage ourselves?

Thanks.

*// *ah

*From:*Hailu, Andreas [Engineering]
*Sent:* Friday, May 29, 2020 8:46 AM
    *To:* 'Chesnay Schepler' 
<mailto:ches...@apache.org>; user@flink.apache.org
<mailto:user@flink.apache.org>
*Subject:* RE: History Server Not Showing Any Jobs - File Not Found?

Yes, these are all in the same directory, and we’re at 67G right
now. I’ll try with incrementally smaller directories and let you
know what I find.

    *// *ah

*From:*Chesnay Schepler mailto:ches...@apache.org>>
*Sent:* Friday, May 29, 2020 3:11 AM
*To:* Hailu, Andreas [Engineering] mailto:andreas.ha...@ny.email.gs.com>>; user@flink.apache.org
<mailto:user@flink.apache.org>
*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

oh I'm not using the HistoryServer; I just wrote it ;)

Are these archives all in the same location? So we're roughly
looking at 5 GB of archives then?

That could indeed "just" be a resource problem. The HistoryServer
eagerly downloads all archives, and not on-demand.

The next step would be to move some of the archives into a
separate HDFS directory and try again.

(Note that by configuring "historyserver.web.tmpdir" to some
permanent directory subsequent (re)starts of the HistorySserver
can re-use this directory; so you only have to download things once)

On 29/05/2020 00:43, Hailu, Andreas wrote:

May I also ask what version of flink-hadoop you’re using and
the number of jobs you’re storing the history for? As of
writing we have roughly 101,000 application history files. I’m
curious to know if we’re encountering some kind of resource
problem.

*// *ah

*From:*Hailu, Andreas [Engineering]
    *Sent:* Thursday, May 28, 2020 12:18 PM
*To:* 'Chesnay Schepler' 
<mailto:ches...@apache.or

Re: Request: Documentation for External Communication with Flink Cluster

2020-06-15 Thread Chesnay Schepler

https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/rest_api.html#api

On 15/06/2020 17:47, Morgan Geldenhuys wrote:

Hi Community,

I am interested in creating an external client for submitting and 
managing Flink jobs via a HTTP/REST endpoint. Taking a look at the 
documentation, external communication is possible with the Dispatcher 
and JobManager 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html#external--rest-connectivity). 
That is great, however, where is the documentation for the REST api?


Thanks in advance!

Regards,
M.





Re: Unable to run flink job in dataproc cluster with jobmanager provided

2020-06-17 Thread Chesnay Schepler
Are you by any chance creating a local environment via 
(Stream)ExecutionEnvironment#createLocalEnvironment?


On 17/06/2020 17:05, Sourabh Mehta wrote:

Hi Team,

I'm  exploring flink for one of my use case, I'm facing some issues 
while running a flink job in cluster mode. Below are the steps I 
followed to setup and run job in cluster mode :
1. Setup flink on google cloud dataproc using 
https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/flink


2. After setting up the cluster I could see the flink session started 
and could see the UI for the same.


3 Submitted job from dataproc master node using below command

sudo HADOOP_CONF_DIR=/etc/hadoop/conf /usr/lib/flink/bin/flink run -m 
yarn-cluster -yid application_1592311654771_0001 -class 
com.sm.flink.FlinkDriver 
/usr/lib/flink/lib/flink-1.0.10-sm-SNAPSHOT.jar 
hdfs://cluster-flink-poc-m:8020/user/flink/rocksdb/


After running the job I see the job started successfully but created a 
mini local cluster and ran in local mode. I don't see any jobs 
submitted to JobManger and I also see 0 task managers on UI.


Can someone please help me understand here?, do let me know what input 
is required to investigate the same.








Re: Faild to load dependency after migration to Flink 1.10

2020-06-24 Thread Chesnay Schepler
Did you make any modifications to the Flink scripts in order to get 
log4j2 to work with Flink 1.10?
IIRC we had to modify the scripts when we migrated to log4j2 in 1.11; if 
this is done incorrectly it could break thing.


Do all Flink processes have this issue, or only TaskExecutors?

Can you provide us with the full exception?

On 23/06/2020 15:43, Thms Hmm wrote:

Hey all,
we are currently migrating our Flink jobs from v1.9.1 to v1.10.1. The 
code has been migrated as well as our Docker images (deploying on K8s 
using standalone mode). Now an issue occurs if we use log4j2 and the 
Kafka Appender which was working before. There are a lot of errors 
regarding "Failed to load a file system via service" 
(NoClassDefFoundError) which lead me searching in this direction. But 
as soon as I removed the Kafka Appender config from log4j it is 
working again and all other exceptions disappeared.
In the logs there is also an exception regarding Kafkas 
"ByteArraySerializer Class could not be found". The Kafka Clients is 
still stored within the lib/ directory but I think it is not loaded.


Any help would be appreciated. Thanks in advance.

Regards Thomas





Re: Renaming the metrics

2020-06-22 Thread Chesnay Schepler

There's currently no way to change this.

A related enhancement was proposed on FLINK-17495 that would at least 
allow you to attach a custom label, but the initial implementation 
wasn't general enough.


On 22/06/2020 15:08, Arvid Heise wrote:

Hi Ori,

I see that the PrometheusPushGatewayReporter [1] has an option for a 
job name, maybe you can use that.


I'm also including Chesnay who probably has more ideas.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter


On Mon, Jun 22, 2020 at 9:01 AM Ori Popowski > wrote:


I have two Flink clusters sending metrics via Prometheus and they
share all the metric names (i.e.
flink_taskmanager_job_task_operator_currentOutputWatermark).

I want to change the flink_ prefix to something else to
distinguish between the clusters (maybe the job-name).

How can I do it?

Thanks.



--

Arvid Heise| Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache 
FlinkConference


Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 
BManaging Directors: Timothy Alexander Steinert, Yip Park Tung Jason, 
Ji (Toni) Cheng





Re: Unable to run flink job in dataproc cluster with jobmanager provided

2020-06-22 Thread Chesnay Schepler
 
configuration property: jobmanager.rpc.port, 6123
2020-06-17 11:28:20,561 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: jobmanager.heap.size, 1024m
2020-06-17 11:28:20,561 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: taskmanager.heap.size, 1024m
2020-06-17 11:28:20,561 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-17 11:28:20,561 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: parallelism.default, 1
2020-06-17 11:28:20,561 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: jobmanager.execution.failover-strategy, region
2020-06-17 11:28:20,562 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: jobmanager.rpc.address, cluster-flink-poc-m
2020-06-17 11:28:20,562 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: jobmanager.heap.mb, 12288
2020-06-17 11:28:20,562 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: taskmanager.heap.mb, 12288
2020-06-17 11:28:20,562 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: taskmanager.numberOfTaskSlots, 4
2020-06-17 11:28:20,562 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: parallelism.default, 28
2020-06-17 11:28:20,563 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: taskmanager.network.numberOfBuffers, 2048
2020-06-17 11:28:20,563 INFO 
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading 
configuration property: fs.hdfs.hadoopconf, /etc/hadoop/conf
2020-06-17 11:28:20,563 INFO 
 org.apache.shaded.flink.runtime.minicluster.MiniCluster - Starting 
Metrics Registry
2020-06-17 11:28:20,610 INFO 
 org.apache.shaded.flink.runtime.metrics.MetricRegistryImpl  - No 
metrics reporter configured, no metrics will be exposed/reported.
2020-06-17 11:28:20,610 INFO 
 org.apache.shaded.flink.runtime.minicluster.MiniCluster - Starting 
RPC Service(s)
2020-06-17 11:28:20,976 INFO  akka.event.slf4j.Slf4jLogger             
                 - Slf4jLogger started
2020-06-17 11:28:21,070 INFO 
 org.apache.shaded.flink.runtime.rpc.akka.AkkaRpcServiceUtils  - 
Trying to start actor system at :0
2020-06-17 11:28:21,115 INFO  akka.event.slf4j.Slf4jLogger             
                 - Slf4jLogger started
2020-06-17 11:28:21,131 INFO  akka.remote.Remoting                     
         - Starting remoting
2020-06-17 11:28:21,279 INFO  akka.remote.Remoting                     
         - Remoting started; listening on addresses 
:[akka.tcp://flink-metrics@<>]
2020-06-17 11:28:21,283 INFO 
 org.apache.shaded.flink.runtime.rpc.akka.AkkaRpcServiceUtils  - Actor 
system started at akka.tcp://flink-metrics@<>




Note : I have removed a few IP addresses from the log.

On Thu, Jun 18, 2020 at 12:08 AM Till Rohrmann <mailto:trohrm...@apache.org>> wrote:


Hi Sourabh,

do you have access to the cluster logs? They could be helpful for
debugging the problem. Which version of Flink are you using?

Cheers,
Till

On Wed, Jun 17, 2020 at 7:39 PM Sourabh Mehta
mailto:sourabhmehta2...@gmail.com>>
wrote:

No, I am not.

On Wed, 17 Jun 2020 at 10:48 PM, Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Are you by any chance creating a local environment via
(Stream)ExecutionEnvironment#createLocalEnvironment?

On 17/06/2020 17:05, Sourabh Mehta wrote:

Hi Team,

I'm  exploring flink for one of my use case, I'm facing
some issues while running a flink job in cluster mode.
Below are the steps I followed to setup and run job in
cluster mode :
1. Setup flink on google cloud dataproc using

https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/flink

2. After setting up the cluster I could see the flink
session started and could see the UI for the same.

3 Submitted job from dataproc master node using below command

sudo HADOOP_CONF_DIR=/etc/hadoop/conf
/usr/lib/flink/bin/flink run -m yarn-cluster -yid
application_1592311654771_0001 -class
com.sm.flink.FlinkDriver
/usr/lib/flink/lib/flink-1.0.10-sm-SNAPSHOT.jar
hdfs://cluster-flink-poc-m:8020/user/flink/rocksdb/

After running the job I see the job started successfully
but created a mini local cluster and ran in local mode. I
don't see any jobs submitted to JobManger and I also see

Re: Completed Job List in Flink UI

2020-06-18 Thread Chesnay Schepler

https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#jobstore-expiration-time

On 18/06/2020 19:57, Ivan Yang wrote:

Hello,

In Flink web UI Overview tab, "Completed Job List” displays recent completed or 
cancelled job only for short period of time. After a while, they are gone. The Job 
Manager is up and never restarted. Is there a config key to keep job history in the 
Completed Job List for longer time? I am using Flink 1.9

Thank you in advance.

Ivan





Re: Flink savepoints history

2020-06-07 Thread Chesnay Schepler
I can't quite find the answer right now, but the Web UI relies entirely 
on the REST API. So, whenever you see something in the UI, and wonder 
where that data comes from, open up the developer tools in your browser, 
go to the network tab, reload the page and see what requests are being made.


On 07/06/2020 17:43, M Singh wrote:

Hi:

I wanted to find out if we can access the savepoints created for a job 
or all jobs using Flink CLI or REST API.


I took a look at the CLI (Apache Flink 1.10 Documentation: 
Command-Line Interface 
) 
and REST API 
(https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/rest_api.html), 
but did not find this information.


I do see the Flink jobmanager UI show the savepoints - but I am not 
sure how it access that information.


Please let me know how I can access the savepoints information.

Thanks.






Re: Timer metric in Flink

2020-06-10 Thread Chesnay Schepler
You cannot add custom metric types, just implementations of the existing 
ones. Your timer(wrapper) will have to implement Gauge or Histogram.


On 10/06/2020 14:17, Vinay Patil wrote:

Hi,

As timer metric is not provided out of the box, can I create a new 
MetricGroup by implementing this interface and add timer capability, 
this will be similar to Histogram wrapper Flink has provided. If yes, 
I can create a wrapper like


`public TimerWrapper implements Timer` , in this case will also have 
to create Timer interface and add it to the metric group.


Is this possible?

I want to have a timer to check Hbase lookup time.

Regards,
Vinay Patil





Re: Simple stateful polling source

2020-06-08 Thread Chesnay Schepler
Small correction to what I said: Sources have to implement 
ParallelSourceFunction in order to be run with a higher parallelism.


The javadocs for the RichSourceFunction are /somewhat /incorrect, but in 
a sense also correct.
This is because you can have a RichSourceFunction that also implements 
ParallelSourceFunction, which would then be functionally equivalent to 
RichParallelSourceFunction.
Ultimately there's little difference between a RichSourceFunction and a 
RichParallelSourceFunction; it's just that the latter also implements 
ParallelSourceFunction.


ParallelSourceFunction also is really just an interface for tagging; 
there's nothing functional in there.
So whenever you look at the javadocs for a method you end up in the 
RichSourceFunction interface; so there's some value in ignoring this 
slight difference for practical purposes.


But to wrap up, generally speaking, yes, you'd always want to extend 
RichParallelSourceFunction for a parallel data source; not out of 
necessity, but simplicity.


On 07/06/2020 17:43, Ken Krugler wrote:

Hi Chesnay,

On Jun 19, 2019, at 6:05 AM, Chesnay Schepler <mailto:ches...@apache.org>> wrote:


A (Rich)SourceFunction that does not implement 
RichParallelSourceFunction is always run with a parallelism of 1.


RichSourceFunction 
<https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/RichSourceFunction.html> says "Base 
class for implementing a *parallel* data source…” and also talks about 
(in a similar, but not identical way as RichParallelSourceFunction 
<https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/source/RichSourceFunction.html>) 
use of getRuntimeContext() to determine the sub-task index.


But you’d always want to extend RichParallelSourceFunction to create a 
parallel data source, yes?


Seems confusing.

Thanks,

— Ken



On 19/06/2019 14:36, Flavio Pompermaier wrote:
My sourcefunction is intrinsically single-thread. Is there a way to 
force this aspect?
I can't find a real difference between a RichParallelSourceFunction 
and a RichSourceFunction.

Is this last (RichSourceFunction) implicitly using parallelism = 1?

On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


It returns a list of states so that state can be re-distributed
if the parallelism changes.

If you hard-code the interface to return a single value then
you're implicitly locking the parallelism.
When you reduce the parallelism you'd no longer be able to
restore all state, since you have less instances than stored state.

On 19/06/2019 14:19, Flavio Pompermaier wrote:

It's not clear to me why the source checkpoint returns a list
of object...when it could be useful to use a list instead of a
single value?
The documentation says The returned list should contain one
entry for redistributable unit of state" but this is not very
clear to me..

Best,
Flavio

On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

This looks fine to me.

What exactly were you worried about?

On 19/06/2019 12:33, Flavio Pompermaier wrote:
> Hi to all,
> in my use case I have to ingest data from a rest service,
where I
> periodically poll the data (of course a queue would be a
better choice
> but this doesn't depend on me).
>
> So I wrote a RichSourceFunction that starts a thread that
poll for new
> data.
> However, I'd like to restart from the last "from" value
(in the case
> the job is stopped).
>
> My initial thought was to write somewhere the last used
date and, on
> job restart, read that date (from a file for example).
However, Flink
> stateful source should be a better choice here...am I
wrong? So I
> made  my source function implementing
ListCheckpointed:
>
> @Override
> public List snapshotState(long checkpointId, long
timestamp)
> throws Exception {
>    return
Collections.singletonList(pollingThread.getDateFromAsString());
> }
> @Override
> public void restoreState(List state) throws
Exception {
>     for (String dateFrom : state) {
>          startDateStr = dateFrom;
>      }
> }
>
> @Override
> public void run(SourceContext ctx) throws
Exception {
>        final Object lock = ctx.getCheckpointLock();
>        Client httpClient = getHttpClient();
>        try {
>               pollingThread = n

Re: Run command after Batch is finished

2020-06-08 Thread Chesnay Schepler
This goes in the right direction; have a look at 
org.apache.flink.api.common.io.FinalizeOnMaster, which an OutputFormat 
can implement to run something on the Master after all subtasks have 
been closed.


On 08/06/2020 17:25, Andrey Zagrebin wrote:

Hi Mark,

I do not know how you output the results in your pipeline.
If you use DataSet#output(OutputFormat outputFormat), you could try 
to extend the format with a custom close method which should be called 
once the task of the sink batch operator is done in the task manager.

I also cc Aljoscha, maybe, he has more ideas.

Best,
Andrey

On Sun, Jun 7, 2020 at 1:35 PM Mark Davis > wrote:


Hi Jeff,

Unfortunately this is not good enough for me.
My clients are very volatile, they start a batch and can go away
any moment without waiting for it to finish. Think of an elastic
web application or an AWS Lambda.

I hopped to find something what could be deployed to the cluster
together with the batch code. Maybe a hook to a job manager or
similar. I do not plan to run anything heavy there, just some
formal cleanups.
Is there something like this?

Thank you!

  Mark

‐‐‐ Original Message ‐‐‐
On Saturday, June 6, 2020 4:29 PM, Jeff Zhang mailto:zjf...@gmail.com>> wrote:


It would run in the client side where ExecutionEnvironment is
created.

Mark Davis mailto:moda...@protonmail.com>> 于2020年6月6日周六 下午8:14写道:

Hi Jeff,

Thank you very much! That is exactly what I need.

Where the listener code will run in the cluster
deployment(YARN, k8s)?
Will it be sent over the network?

Thank you!

  Mark

‐‐‐ Original Message ‐‐‐
On Friday, June 5, 2020 6:13 PM, Jeff Zhang mailto:zjf...@gmail.com>> wrote:


You can try JobListener which you can register to
ExecutionEnvironment.


https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java


Mark Davis mailto:moda...@protonmail.com>> 于2020年6月6日周六
上午12:00写道:

Hi there,

I am running a Batch job with several outputs.
Is there a way to run some code(e.g. release a
distributed lock) after all outputs are finished?

Currently I do this in a try-finally block around
ExecutionEnvironment.execute() call, but I have to
switch to the detached execution mode - in this mode the
finally block is never run.

Thank you!

  Mark



-- 
Best Regards


Jeff Zhang




-- 
Best Regards


Jeff Zhang






Re: Window Function use case;

2020-06-04 Thread Chesnay Schepler
If you input data already contains both the SensorID and FactoryID, why 
would the following not be sufficient?


DataStream sensorEvents = ...; sensorEvents 
.filter(sensorEvent -> sensorEvent.Status.equals("alerte")) 
.map(sensorEvent -> sensorEvent.FactoryID) .addSink()


If the problem is that you only want one factory alert to be raised if 
say, all sensors of a factory go haywire at once, then you're looking at 
a time window; e.g., to only fire at most one alert every hour:


DataStream sensorEvents = ...; sensorEvents 
.filter(sensorEvent -> sensorEvent.Status.equals("alerte")) 
.keyBy(sensorEvent -> sensorEvent.FactoryID) .timeWindow(Time.hours(1)) 
.apply((WindowFunction) 
(factoryId, window, input, out) -> out.collect(new Alert(factoryId)));

.addSink();

Ultimately it would be good to understand what exactly you are 
struggling with, and what you have tried so far.


On 04/06/2020 15:45, Aissa Elaffani wrote:

Hello guys,
I have a use case, where I am receiving data from sensors about their 
status (Normal or Alerte), {SensorID:"1", FactoryID:"1", 
Status:"Normal" ..}, a factory can contain a lot of sensors, so what I 
want to do is, if the status of one sensor in a factory, is Alerte I 
want to raise an alerte for all the factory (the factory status must 
be alerte) ... I did a
stream.keyBy("FactoryID").window(). can you please suggest me a 
window function that can fulfill my use case (if one sensor of a 
factory raises "alerte" I want the factory status to be "alerte") ... 
I hope someone can understand my use case !! Sorry for disturbing you, 
and thak you for your time !

Best,
Aissa





Re: Timer metric in Flink

2020-06-11 Thread Chesnay Schepler
There are no immediate plans, mostly because timers are fairly expensive 
and represent an easy trap to trashing performance of invalidating 
benchmark results.


On 11/06/2020 13:11, Vinay Patil wrote:
Ohh Okay, basically implement the Gauge and add timer functionality to 
it for now.


Is there a plan or JIRA ticket to add Timer metric in future release, 
I think it is good to have


Regards,
Vinay Patil


On Wed, Jun 10, 2020 at 5:55 PM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


You cannot add custom metric types, just implementations of the
existing ones. Your timer(wrapper) will have to implement Gauge or
Histogram.

On 10/06/2020 14:17, Vinay Patil wrote:

Hi,

As timer metric is not provided out of the box, can I create a
new MetricGroup by implementing this interface and add timer
capability, this will be similar to Histogram wrapper Flink has
provided. If yes, I can create a wrapper like

`public TimerWrapper implements Timer` , in this case will also
have to create Timer interface and add it to the metric group.

Is this possible?

I want to have a timer to check Hbase lookup time.

Regards,
Vinay Patil







Re: REST API randomly returns Not Found for an existing job

2020-07-24 Thread Chesnay Schepler

How reproducible is this problem / how often does it occur?
How is the cluster deployed?
Is anything else happening to the cluster around that that time (like a 
JobMaster failure)?


On 24/07/2020 13:28, Tomasz Dudziak wrote:


Hi,

I have come across an issue related to GET /job/:jobId endpoint from 
monitoring REST API in Flink 1.9.0. A few seconds after successfully 
starting a job and confirming its status as RUNNING, that endpoint 
would return 404 (Not Found). Interestingly, querying immediately 
again (literally a millisecond later) would return a valid result. I 
later noticed a similar behaviour in regard to a finished job as well. 
At certain points in time that endpoint would arbitrarily return 404, 
but similarly querying again would succeed. I saw this strange 
behaviour only recently and it used to work fine before.


Do you know what could be the root cause of this? At the moment, as a 
workaround I just query a job a couple of times in a row to ensure 
whether it definitely does not exist or it is just being misreported 
as non-existent, but this feels a bit like cottage industry…


Kind regards,

Tomasz

*Tomasz Dudziak *| Marshall Wace LLP, George House, 131 Sloane Street, 
London, SW1X 9AT |**E-mail: t.dudz...@mwam.com 
| Tel: +44 207 024 7061


This e-mail and any attachments are confidential to the addressee(s) 
and may contain information that is legally privileged and/or 
confidential. Please refer to http://www.mwam.com/email-disclaimer-uk 
for important disclosures regarding this email. If we collect and use 
your personal data we will use it in accordance with our privacy 
policy, which can be reviewed at https://www.mwam.com/privacy-policy.


Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership 
registered in England and Wales with registered number OC302228 and 
registered office at George House, 131 Sloane Street, London, SW1X 
9AT. If you are receiving this e-mail as a client, or an investor in 
an investment vehicle, managed or advised by Marshall Wace North 
America L.P., the sender of this e-mail is communicating with you in 
the sender's capacity as an associated or related person of Marshall 
Wace North America L.P., which is registered with the US Securities 
and Exchange Commission as an investment adviser.






Re: How to start flink standalone session on windows ?

2020-07-24 Thread Chesnay Schepler
Flink no longer runs natively on Windows; you will have to use some 
unix-like environment like WSL or cygwin.



On 24/07/2020 04:27, wangl...@geekplus.com.cn wrote:


There's no  start-cluster.bat and flink.bat in bin directory.

So how can i start flink on windowns OS?

Thanks,
Lei

wangl...@geekplus.com.cn 





Re: Using md5 hash while sinking files to s3

2020-07-16 Thread Chesnay Schepler

Please try configuring :

fs.s3a.etag.checksum.enabled: true


On 16/07/2020 03:11, nikita Balakrishnan wrote:

Hello team,

I’m developing a system where we are trying to sink to an immutable s3
bucket. This bucket has server side encryption set as KMS. The DataStream
sink works perfectly fine when I don’t use the immutable bucket but when I
use an immutable bucket, I get exceptions regarding multipart upload
failures. It says we need to enable md5 hashing for the put object to work.

Here’s the stack trace:

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
java.io.IOException: Uploading parts failed
... 11 common frames omitted
Caused by: java.io.IOException: Uploading parts failed
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
at
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
at
org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
... 10 common frames omitted
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
is required for Put Part requests with Object Lock parameters (Service:
Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx;
S3 Extended Request ID: ), S3 Extended Request ID: xx
:InvalidRequest: Content-MD5 HTTP header is required for Put Part requests
with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error
Code: InvalidRequest; Request ID: ; S3 Extended Request ID: )
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
at

Re: Using md5 hash while sinking files to s3

2020-07-16 Thread Chesnay Schepler
I only quickly skimmed the Hadoop docs and found this (although it is 
not documented very well I might add). If this does not do the trick, 
I'd suggest to reach out to the Hadoop project, since we're using their 
S3 filesystem.


On 16/07/2020 19:32, nikita Balakrishnan wrote:

Hey Chesnay,

Thank you for getting back with that! I tried setting that too, it 
still gives me the same exception. Is there something else that I'm 
missing?
I also have 
fs.s3a.bucket..server-side-encryption-algorithm=SSE-KMS 
and fs.s3a.bucket..server-side-encryption.key set.


Is there no need to set the md5 hash value manually while sinking? The 
fs.s3a.etag.checksum.enabled: true will do it for me? And Do I need to 
specify anywhere that we have to use md5 hashing?



On Thu, Jul 16, 2020 at 12:04 AM Chesnay Schepler <mailto:ches...@apache.org>> wrote:


Please try configuring :

fs.s3a.etag.checksum.enabled: true


On 16/07/2020 03:11, nikita Balakrishnan wrote:

Hello team,

I’m developing a system where we are trying to sink to an immutable s3
bucket. This bucket has server side encryption set as KMS. The DataStream
sink works perfectly fine when I don’t use the immutable bucket but when I
use an immutable bucket, I get exceptions regarding multipart upload
failures. It says we need to enable md5 hashing for the put object to work.

Here’s the stack trace:

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at

org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
at

org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at

org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at

org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
java.io.IOException: Uploading parts failed
... 11 common frames omitted
Caused by: java.io.IOException: Uploading parts failed
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
at

org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
at

org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
at

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
at

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
at

org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
at

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
... 10 common frames omitted
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
is required for Put Part requests with Object Lock parameters (Service:
Amazon S3; Status Code: 400; E

Re: backup configuration in Flink doc

2020-07-16 Thread Chesnay Schepler
They should be public yes; I do not know what the "Backup" category is 
supposed to mean, and I suspect this was a WIP title.


On 16/07/2020 18:01, Steven Wu wrote:


The configuration page has this "backup" section. Can I assume that 
they are public interfaces? The name "backup" is a little confusing to 
me. There are some important pipeline and execution checkpointing 
configs here.

https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#backup

Thanks,
Steven





Re: Question on Pattern Matching

2020-07-16 Thread Chesnay Schepler
Have you read this part of the documentation 
?


From what I understand, it provides you hooks for processing 
matched/timed out patterns.



On 16/07/2020 20:23, Basanth Gowda wrote:

Hello,

We have a use case where we want to know when a certain pattern 
doesn't complete within a given time frame.


For Example A -> B -> C -> D (needs to complete in 10 minutes)

Now with Flink if event D doesn't happen in 10 minutes, the pattern is 
discarded and we can get notified. We also want to track how many of 
them completed (even if they meet SLA). How do we achieve this with 
FLINK CEP or other mechanisms?


thank you,
Basanth





Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

2020-07-29 Thread Chesnay Schepler
I'd recommend to do the aggregation over 5 seconds in 
graphite/prometheus etc., and expose a counter in Flink for each 
attribute/event_name.


User variables are a good choice for encoding the attribute/event_name 
values.


As for your remaining questions:

Flink does not support aggregating operator-level metrics across task 
executors. This job is left to proper time-series databases.


A counter can be reset like this: counter.dec(counter.getCount())
You can also create a custom implementation with whatever behavior you 
desire.


The default meter implementation (MeterView) calculate the rate of 
events per second based on counts that are periodically gathered over 
some time-period (usually 1 minute). If you want to calculate the 
rate-per-second over the last 5 seconds, then new Meterview(5) should do 
the trick.
If you want to have a rate-per-5-seconds, then you will need to 
implement a custom meter. Note that I would generally discourage this as 
it will not work properly with some metric systems which assume rates to 
be per-second.


On 27/07/2020 19:59, Vijay Balakrishnan wrote:

Hi Al,
I am looking at the Custom User Metrics to count incoming records by 
an incomng attribute, event_name and aggregate it over 5 secs.
I looked at 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter.

I am trying to figure out which one to use Counter or Meter.
If using Counter, how do I reset it after 5 secs.
If using Meter which measures avg throughput, How do i specify a 
duration like 5 secs ? markEvent(long n) ???


I am also trying to collect total count of events across all TaskManagers.
Do I collect at 
flink_taskmanager_job_task__numrecordsIn or
flink_taskmanager_job_task_operator__numrecordsIn ?? 
(so at task or operator level


Or should I use User variables like below:
|counter = getRuntimeContext() .getMetricGroup() 
.addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for 
each custom event_name here- I might not know all custom event_names 
in advance .counter("myCounter");|


Pardon my confusion here.
TIA,

On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan 
mailto:bvija...@gmail.com>> wrote:


Hi David,
Thanks for your reply.
I am already using the PrometheusReporter. I am trying to figure
out how to dig into the application data and count grouped by an
attribute called event_name in the incoming application data and
report to Grafana via Prometheus.

I see the following at a high level
task_numRecordsIn
task_numRecordsOut
..operator_numLateRecordsDropped

Trying to dig in deeper than this numRecordsIn to get groped by
event_name attribute coming in the Input record every 5 secs.
TIA,

On Sat, Jul 25, 2020 at 10:55 AM David Anderson
mailto:da...@alpinegizmo.com>> wrote:

Setting up a Flink metrics dashboard in Grafana requires
setting up and configuring one of Flink's metrics reporters
[1] that is supported by Grafana as a data source. That means
your options for a metrics reporter are Graphite, InfluxDB,
Prometheus, or the Prometheus push reporter.

If you want reporting every 5 seconds, with the push based
reporters that's something you would configure in
flink-conf.yaml, whereas with Prometheus you'll need to
configure the scrape interval in the prometheus config file.
For more on using Flink with Prometheus, see the blog post by
Maximilian Bode [2].

Best,
David

[1]

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
[2]
https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html

On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan
mailto:bvija...@gmail.com>> wrote:

Hi,
I am trying to figure out how many records came into the
Flink App from KDS and how many records got moved to the
next step or was dropped by the watermarks.

I see on the Ui Table for *Source. Records Sent* with a
total and the next step *Filter->FlatMap operator with a
Records Received *total. How can I get these metric values
for me to display In Grafana for eg. as I want to know a
count for each 5 secs, how many records came in and how
many were filtered out by the watermark or my Custom
Filter operator etc  ?

I looked at the breakdown of the Source__Custom_Source in
Metrics as show in the attached pic. It has values like
0.NumRecordsIn and 0.NumRecordsOut and so on from 0 to 9
for the parallelism 10 I specified. It also has various
breakdowns like 0.Timestamps/Watermarks.numRecordsIn and
0.Timestamps/Watermarks.numRecordsOut

Attached are some screenshots of the Flink DashBoard UI.

TIA,



Re: Making sense of Meter metrics graph on Grafana

2020-07-29 Thread Chesnay Schepler
Yes; a rate of 1 means that 1 event occurred per second, which in your 
case means one call to markEvent() per second.


Note that the default Meter implementation calculates the rate per 
second over the last minute (basically, rate(T) = (count(T) - 
count(T-60)) / 60; so short spikes tend to be flattened quite a bit.


On 29/07/2020 15:13, Manish G wrote:
I have added Meter metrics to my flink job code,and in grafana I can 
see the graph for the same.


What I observe is that the graph initially rises, and then plateaus at 
1, with occasional variations.


As Meter calculates throughput, so does it mean that the map function, 
wherein I invoke markEvent() method on Meter instance, is getting 
completed once per second?


With regards





<    3   4   5   6   7   8   9   10   11   12   >