Re: Password usage in ssl configuration

2020-11-12 Thread Nico Kruber
Hi Suchithra,
I'm not sure you can actually pass passwords in any other way. I'm also not 
sure this is needed if these are job-/cluster-specific because then, an 
attacker would have to have access to that first in order to get these 
credentials. And if the attacker has access to the job/cluster, it would be 
possible to extract this from the Java process.

Our Ververica Platform, for example, also creates these key/truststores per 
deployment [1] and uses Kubernetes secrets to store the certificates.


Nico

[1] https://docs.ververica.com/user_guide/application_operations/deployments/
configure_flink.html?highlight=ssl#implementation-details
On Friday, 16 October 2020 10:56:35 CET V N, Suchithra (Nokia - IN/Bangalore) 
wrote:
> Hello,
> 
> I have a query regarding the ssl configuration in flink. In flink with ssl
> enabled, flink-conf.yaml configuration file will contain the cleartext
> passwords for keystore and truststore files. Suppose if any attacker gains
> access to this configuration file, using these passwords keystore and
> truststore files can be read. What is the community approach to protect
> these passwords ?
> 
> Regards,
> Suchithra


-- 
Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton 
Wehner

signature.asc
Description: This is a digitally signed message part.


Re: Example flink run with security options? Running on k8s in my case

2020-08-26 Thread Nico Kruber
Hi Adam,
the flink binary will pick up any configuration from the flink-conf.yaml of 
its directory. If that is the same as in the cluster, you wouldn't have to 
pass most of your parameters manually. However, if you prefer not having a 
flink-conf.yaml in place, you could remove the security.ssl.internal.* 
parameter from its call since those only affect internal communication.

If the client's connection to the JM is denied, you would actually have this 
in the JM logs as well which you could check.

To check whether your whole setup works, I would suggest to try without 
security enabled first and then enable it (just to rule out any other issues)

>From the commands you mentioned, it looks like you're just missing 
security.ssl.rest.enabled=true and because of that, the client would not use 
SSL for the connection.

For more information and setup, I recommend reading through [1] which also 
contains an example at the bottom of the page and how to use curl to test or 
use the REST endpoint.


Nico


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html

On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:
> Hey everyone, I've been experimenting with Flink
> using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator and I
> believe I've successfully deployed a JobManager and TaskManager with
> security enabled, and a self-signed certificate (the pods come up great). 
> However, I can't do much with this - I can't port-forward and access the UI,
> nor can I submit jobs to it by running another pod and using the DNS name
> lookup of the service. 
> I always get
>  
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph. 
> ...
>  
> Caused by: org.apache.flink.runtime.rest.ConnectionClosedException: Channel
> became inactive. ... 37 more
>  
>  
> and this is even with all of the -D security options provided.
>  
> The versions of Flink are the same for both my Job and my FlinkCluster
> (1.11.1). 
> Is this a sensible thing to do? If I weren't using the operator for example,
> would users be expected to flink run with all of these options? 
> Does anything look odd here? My guess is because security's on, the Job
> Manager refuses to talk to my submitter. 
> Running as the flink user in the container, I do
>  
> 
>   securityContext:
> 
> runAsUser: 
> 
> runAsGroup: 
> 
>   containers:
> 
>   - name: wordcount
> 
> image: adamroberts/mycoolflink:latest
> 
> args:
> 
> - /opt/flink/bin/flink
> 
> - run
> 
> - -D
> 
> -
> security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key
> 
> - -D 
> 
> -
> security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks
> 
> - -D 
> 
> - security.ssl.rest.keystore-password=thepass # Replace with value
> of flink-tls-keystore.password
> 
> - -D 
> 
> - security.ssl.rest.key-password=thepass # Replace with value of
> tls.p12.password
> 
> - -D 
> 
> - security.ssl.rest.truststore-password=thepass # Replace with value
> of flink-tls-ca.truststore.password
> 
> - -D 
> 
> -
> security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key
> 
> - -D 
> 
> -
> security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststore
> .jks
> 
> - -D 
> 
> - security.ssl.internal.keystore-password=thepass # Replace with
> value of flink-tls-keystore.password
> 
> - -D 
> 
> - security.ssl.internal.key-password=thepass # Replace with value of
> flink-tls-keystore.password
> 
> - -D 
> 
> - security.ssl.internal.truststore-password=thepass # Replace with
> value of flink-tls-truststore.password
> 
> - -m
> 
> - tls-flink-cluster-1-11-jobmanager:8081
> 
> - /opt/flink/examples/batch/WordCount.jar 
> 
> - --input 
> 
> - /opt/flink/NOTICE
> 
>  
> with the secrets mounted in at the above location (if I exec into my
> container, I can see they're all there OK). Note that it is a read-only
> file system. 
> adamroberts/mycoolflink (at this time of this email) is just based
> on https://github.com/apache/flink-docker. 
> Thanks!
>  
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6 3AU





Re: Example flink run with security options? Running on k8s in my case

2020-08-27 Thread Nico Kruber
gt; Flink cluster JobManager/TaskManager settings) and the communication does
> happen successfully, suggesting all is well otherwise. 
> With regards to testing with just a regular curl, I switched security back
> on and did the curl, using this: 
> 
> openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in
> /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes
> 
> curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081
> 
> curl --cacert rest.pem --cert rest.pem
> tls-flink-cluster-1-11-jobmanager:8081
> 
>  
> from the Job CR pod, which is who runs the flink run against my JobManager
> i'd like to connect to. 
> That gives 
>  
> 
> $ openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in
> /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes
> 
> curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081
> 
> curl --cacert rest.pem --cert rest.pem
> tls-flink-cluster-1-11-jobmanager:8081139676043637888:error:0D07207B:asn1
> encoding routines:ASN1_get_object:header too
> long:../crypto/asn1/asn1_lib.c:101:
> 
> so I wonder if my security set up itself is flawed...I'll be happy to share
> the scripting I have to do that if folks feel it'll be of use, thanks again
> 
> - Original message -
> From: Nico Kruber 
> To: user@flink.apache.org
> Cc: Adam Roberts 
> Subject: [EXTERNAL] Re: Example flink run with security options? Running on
> k8s in my case Date: Wed, Aug 26, 2020 11:40 AM
>  
> Hi Adam,
> the flink binary will pick up any configuration from the flink-conf.yaml of
> its directory. If that is the same as in the cluster, you wouldn't have to
> pass most of your parameters manually. However, if you prefer not having a
> flink-conf.yaml in place, you could remove the security.ssl.internal.*
> parameter from its call since those only affect internal communication.
> 
> If the client's connection to the JM is denied, you would actually have this
> in the JM logs as well which you could check.
> 
> To check whether your whole setup works, I would suggest to try without
> security enabled first and then enable it (just to rule out any other
> issues)
> 
> From the commands you mentioned, it looks like you're just missing
> security.ssl.rest.enabled=true and because of that, the client would not use
> SSL for the connection.
> 
> For more information and setup, I recommend reading through [1] which also
> contains an example at the bottom of the page and how to use curl to test or
> use the REST endpoint.
> 
> 
> Nico
> 
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-s
> sl.html 
> On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:
> > Hey everyone, I've been experimenting with Flink
> > using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator  and I
> > believe I've successfully deployed a JobManager and TaskManager with
> > security enabled, and a self-signed certificate (the pods come up great).
> > However, I can't do much with this - I can't port-forward and access the
> > UI, nor can I submit jobs to it by running another pod and using the DNS
> > name lookup of the service.
> > I always get
> >  
> > The program finished with the following exception:
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method
> > caused an error: java.util.concurrent.ExecutionException:
> > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> > JobGraph.
> > ...
> >  
> > Caused by: org.apache.flink.runtime.rest.ConnectionClosedException:
> > Channel
> > became inactive. ... 37 more
> >  
> >  
> > and this is even with all of the -D security options provided.
> >  
> > The versions of Flink are the same for both my Job and my FlinkCluster
> > (1.11.1).
> > Is this a sensible thing to do? If I weren't using the operator for
> > example, would users be expected to flink run with all of these options?
> > Does anything look odd here? My guess is because security's on, the Job
> > Manager refuses to talk to my submitter.
> > Running as the flink user in the container, I do
> >  
> > 
> >   securityContext:
> > 
> > runAsUser: 
> > 
> > runAsGroup: 
> > 
> >   containers:
> > 
> >   - name: wordcount
> > 
> > image: adamroberts/mycoolflink:latest
> > 
> > args:
> > 
> > - /opt/flink/bin/flink
> > 
> > - run
> > 
> > - -D
> > 
> > -
&g

Re: Flink 1.8.3 GC issues

2020-09-10 Thread Nico Kruber
What looks a bit strange to me is that with a running job, the 
SystemProcessingTimeService should actually not be collected (since it is 
still in use)!

My guess is that something is indeed happening during that time frame (maybe 
job restarts?) and I would propose to check your logs for anything suspicious 
in there.


When I did experiments with Beam pipelines on our platform [1], I also 
noticed, that the standard fat jars that Beam creates include Flink runtime 
classes it shouldn't (at least if you are submitting to a separate Flink 
cluster). This can cause all sorts of problems and I would recommend removing 
those from the fat jar as documented in [1].




Nico



[1] https://ververica.zendesk.com/hc/en-us/articles/360014323099

On Thursday, 10 September 2020 13:44:32 CEST Piotr Nowojski wrote:
> Hi Josson,
> 
> Thanks again for the detailed answer, and sorry that I can not help you
> with some immediate answer. I presume that jvm args for 1.8 are the same?
> 
> Can you maybe post what exactly has crashed in your cases a) and b)?
> Re c), in the previously attached word document, it looks like Flink was
> running without problems for a couple of hours/minutes, everything was
> stable, no signs of growing memory consumption, impending problem, until
> around 23:15, when the problem started, right? Has something else happened
> at that time, something that could explain the spike? A checkpoint? Job
> crash/restart? Load spike?
> 
> A couple of other random guesses:
> - have you been monitoring other memory pools for Flink 1.4 and 1.8? Like
> meta space? Growing meta space size can sometimes cause problems. It
> shouldn't be the case here, as you configured XX:MaxMetaspaceSize, but it
> might be still worth checking...
> - another random idea, have you tried upgrading JDK? Maybe that would solve
> the problem?
> 
> Best regards,
> Piotrek
> 
> śr., 9 wrz 2020 o 19:53 Josson Paul  napisał(a):
> > Hi Piotr,
> > 
> >  *JVM start up for Flink 1.4*
> > 
> > *---*
> > 
> > java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-ta
> > skmgr-assurance-1-77d44cf64-z8gd4.heapdump-
> > *Xmx6554m-Xms6554m*-*XX:MaxMetaspaceSize=512m*
> > -XX:+HeapDumpOnOutOfMemoryError-*XX:+UseG1GC*-XX:CICompilerCount=4
> > *-XX:MaxGCPauseMillis=1000*-XX:+DisableExplicitGC-*XX:ParallelGCThreads=4*
> > -Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9
> > %p*-Dio.netty.eventLoopThreads=3*
> > -Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/lo
> > g4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetw
> > orkaddress.cache.ttl=120-Dnum.cores=3-
> > *XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.par
> > allelism=3-XX:ConcGCThreads=4 *
> > -Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.di
> > r=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.di
> > r=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/
> > resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pi
> > pelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.m
> > aglev.MaglevServerstartmaglev> 
> >1.   taskmanager.memory.fraction = 0.7f (This was coming to 4.5 GB. I
> >didn't know at that time that we could set memory fraction to zero
> >because
> >ours is a streaming job. It was  picking up the default )
> >2.Network buffer pool memory was 646MB on the Heap (I think this
> >was the default based on some calculations in the Flink 1.4)
> >3.G1GC region size was 4MB (Default)
> > 
> > I tested this setup by reducing the JVM heap by *1GB.* It still worked
> > perfectly with some lags here and there.
> > 
> > *JVM start up for Flink 1.8*
> > **
> > a) I started with the same configuration as above. Kubenetis POD went out
> > of memory. At this point I realized that in Flink 1.8  network buffer
> > pools
> > are moved to native memory. Based on calculations it was coming to 200MB
> > in
> > native  memory. I increased the overall POD memory to accommodate the
> > buffer pool change keeping the *heap the same*.
> > 
> > b) Even after I modified the overall POD memory,  the POD still crashed.
> > At this point I generated Flame graphs to identify the CPU/Malloc calls
> > (Attached as part of the initial email). Realized that cpu usage of G1GC
> > is
> > significantly different from Flink 1.4. Now I made 2 changes
> > 
> >1.  taskmanager.memory.fraction = 0.01f (This will give more heap for
> >user code)
> >2. Increased cpu from 3 to 4 cores.
> >
> > Above changes helped to hold the cluster a little longer. But it
> > 
> > still crashed after sometime.
> > 
> > c)  Now I made the below changes.
> > 
> >1. I came across this ->
> >http://mail.openjdk.java.net/pipermail/hotspot-gc-use/2017-February/002
> >622.html . Now I changed the G1GC region space to *8MB

Re: Flink job hangs using rocksDb as backend

2018-07-11 Thread Nico Kruber
n.onTimer(RtpeProcessFunction.java:94)
> 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
> 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
> 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
> 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
> org.apache.flink.streaming.runtime.io
> 
> <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
> 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> org.apache.flink.streaming.runtime.io
> 
> <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
> 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> java.lang.Thread.run(Thread.java:748)
> .
> 2018-07-11 09:10:39,575 INFO 
> org.apache.flink.runtime.taskmanager.Task                   
> - Notifying TaskManager about fatal error. Task 'process (1/6)' did not
> react to cancelling signal in the last 30 seconds, but is stuck in
> method:
> 
> 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> java.lang.Class.newInstance(Class.java:442)
> 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:196)
> 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:399)
> 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:304)
> 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:104)
> 
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
> 
> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
> 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
> 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
> 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
> 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
> org.apache.flink.streaming.runtime.io
> 
> <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
> 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> org.apache.flink.streaming.runtime.io
> 
> <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
> 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> java.lang.Thread.run(Thread.java:748)
> /
> 
> 
> 
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: Multi-tenancy environment with mutual auth

2018-07-16 Thread Nico Kruber
Hi Ashish,
this was just merged today for Flink 1.6.
Please have a look at https://github.com/apache/flink/pull/6326 to check
whether this fulfils your needs.


Nico

On 14/07/18 14:02, ashish pok wrote:
> All,
> 
> We are running into a blocking production deployment issue. It looks
> like Flink inter-communications doesnt support SSL mutual auth. Any
> plans/ways to support it? We are going to have to create DMZ for each
> tenant without that, not preferable of course.
> 
> 
> - Ashish

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: Filescheme GS not found sometimes - inconsistent exceptions for reading from GCS

2018-09-13 Thread Nico Kruber
.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
>   at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>   at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>   at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066)
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)
> 
> 
> Any ideas why the behaviour is not deterministic regarding recognising file 
> system schemes?
> 
> 
> Thanks,
> 
> Encho
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: Filescheme GS not found sometimes - inconsistent exceptions for reading from GCS

2018-09-13 Thread Nico Kruber
Sorry, I had a second look and your stacktrace does not even point to
the spilling channel - it reads from the memory segment directly.
-> setting the temp dirs will thus not make a difference

I'm wondering why your deserializer eventually reads from a file on
gs:// directly, instead of, for example, a follow-up map operation.

Nico

On 13/09/18 14:52, Encho Mishinev wrote:
> Hi Nico,
> 
> Unfortunately I can't share any of data, but it is not even data being
> processed at the point of failure - it is still in the
> matching-files-from-GCS phase.
> 
> I am using Apache Beam's FileIO to match files and during one of those
> match-files steps I get the failure above.
> 
> Currently I run the job and when a taskmanager shows this error I reset
> it and restart the job. That works fine since the failure occurs at the
> beginning of the job only. It seems to be a problem within some
> taskmanagers, which is very odd considering that I have them all
> generated by a Kubernetes deployment, i.e. they should be completely
> identical. Sometimes I have to restart 3-4 of them until I have a
> running cluster.
> 
> I will try setting the temporary directory to something other than the
> default, can't hurt.
> 
> Thanks for the help,
> Encho
> 
> On Thu, Sep 13, 2018 at 3:41 PM Nico Kruber  <mailto:n...@data-artisans.com>> wrote:
> 
> Hi Encho,
> the SpillingAdaptiveSpanningRecordDeserializer that you see in your
> stack trace is executed while reading input records from another task.
> If the (serialized) records are too large (> 5MiB), it will write and
> assemble them in a spilling channel, i.e. on disk, instead of using
> memory. This will use the temporary directories specified via
> "io.tmp.dirs" (or "taskmanager.tmp.dirs") which defaults to
> System.getProperty("java.io <http://java.io>.tmpdir").
> -> These paths must actually be on an ordinary file system, not in gs://
> or so.
> 
> The reason you only see this sporadically may be because not all your
> records are that big. It should, however, be deterministic in that it
> should always occur for the same record. Maybe something is wrong here
> and the record length is messed up, e.g. due to a bug in the
> de/serializer or the network stack.
> 
> Do you actually have a minimal working example that you can share
> (either privately with me, or here) and shows this error?
> 
> 
> Nico
> 
> On 29/08/18 14:19, Encho Mishinev wrote:
> > Hello,
> >
> > I am using Flink 1.5.3 and executing jobs through Apache Beam
> 2.6.0. One
> > of my jobs involves reading from Google Cloud Storage which uses the
> > file scheme "gs://". Everything was fine but once in a while I
> would get
> > an exception that the scheme is not recognised. Now I've started
> seeing
> > them more often. It seems to be arbitrary - the exact same job
> with the
> > exact same parameters may finish successfully or throw this exception
> > and fail immediately. I can't figure out why it's not deterministic.
> > Here is the full exception logged upon the job failing:
> >
> > java.lang.Exception: The data preparation for task 'GroupReduce
> (GroupReduce at Match files from GCS/Via
> MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an
> error: Error obtaining the sorted input: Thread 'SortMerger Reading
> Thread' terminated due to an exception: No filesystem found for
> scheme gs
> >       at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
> >       at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> >       at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.RuntimeException: Error obtaining the sorted
> input: Thread 'SortMerger Reading Thread' terminated due to an
> exception: No filesystem found for scheme gs
> >       at
> 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
> >       at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
> >       at
> 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
> >       at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
> >       ... 3 more
> > Caused by:

Re: Failed to fetch BLOB - IO Exception

2018-10-23 Thread Nico Kruber
Hi Manjusha,
If you are, for example, using one of Amazon's Linux AMIs on EMR, you
may fall into a trap that Lasse described during  his Flink Forward talk
[1]: These images include a default cron job that cleans up files in
/tmp which have not been recently accessed. The default BLOB server
directory (blob.storage.directory) will store files under /tmp and on
the JobManager, they are only accessed during deployments, so that falls
under this cleanup detection.
A solution is to change the BLOB storage directory.


Nico

[1]
https://data-artisans.com/flink-forward-berlin/resources/our-successful-journey-with-flink

On 23/10/2018 10:27, Manjusha Vuyyuru wrote:
> Hello,
> 
> Checkpointing to hdfs.
> *state.backend.fs.checkpointdir: hdfs://flink-hdfs:9000/flink-checkpoints*
> *state.checkpoints.num-retained: 2*
> *
> *
> Thanks,
> Manjusha
> 
> 
> On Tue, Oct 23, 2018 at 1:05 PM Dawid Wysakowicz  <mailto:dwysakow...@apache.org>> wrote:
> 
> Hi Manjusha,
> 
> I am not sure what is wrong, but Nico or Till (cc'ed) might be able
> to help you.
> 
> Best,
> 
> Dawid
> 
> On 23/10/2018 06:58, Manjusha Vuyyuru wrote:
>> Hello All,
>>
>> I have a  job which fails lets say after every 14 days with IO
>> Exception, failed to fetch blob.
>> I submitted the job using command line using java jar.Below is the
>> exception I'm getting: 
>>
>> java.io.IOException: Failed to fetch BLOB 
>> d23d168655dd51efe4764f9b22b85a18/p-446f7e0137fd66af062de7a56c55528171d380db-baf0b6bce698d586a3b0d30c6e487d16
>>  from flink-job-mamager/10.20.1.85:38147 <http://10.20.1.85:38147> and store 
>> it under 
>> /tmp/blobStore-e3e34fec-22d9-4b3c-b542-0c1e5cdcf896/incoming/temp-0022
>>  at 
>> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:191)
>>  at 
>> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:177)
>>  at 
>> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:205)
>>  at 
>> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:119)
>>  at 
>> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:878)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: GET operation failed: Server side error: 
>> /tmp/blobStore-5535a94c-5bdd-41f3-878d-8320e53ba7c5/incoming/temp-00182356
>>  at 
>> org.apache.flink.runtime.blob.BlobClient.getInternal(BlobClient.java:253)
>>  at 
>> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:166)
>>  ... 6 more
>> Caused by: java.io.IOException: Server side error: 
>> /tmp/blobStore-5535a94c-5bdd-41f3-878d-8320e53ba7c5/incoming/temp-00182356
>>  at 
>> org.apache.flink.runtime.blob.BlobClient.receiveAndCheckGetResponse(BlobClient.java:306)
>>  at 
>> org.apache.flink.runtime.blob.BlobClient.getInternal(BlobClient.java:247)
>>  ... 7 more
>> Caused by: java.nio.file.NoSuchFileException: 
>> /tmp/blobStore-5535a94c-5bdd-41f3-878d-8320e53ba7c5/incoming/temp-00182356
>>  at 
>> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>>  at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
>>  at 
>> sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
>>  at java.nio.file.Files.move(Files.java:1395)
>>  at 
>> org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:452)
>>  at 
>> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:521)
>>  at 
>> org.apache.flink.runtime.blob.BlobServerConnection.get(BlobServerConnection.java:231)
>>  at 
>> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:117)
>> All the configurations of blob are default, i didn't change anything.
>> Can someone help me to fix this issue.
>> Thanks,
>> Manjusha
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-20 Thread Nico Kruber
Hi Emily,
I'm not familiar with the details of the REST API either but if this is a 
problem with the proxy, maybe it is already interpreting the encoded URL and 
passes it on un-encoded - have you tried encoding the path again? That is, 
encoding the percent-signs:

http://
{ip}:20888/proxy/application_1504649135200_0001/jobs/
1a0fd176ec8aabb9b8464fa481f755f0/cancel-with-savepoint/target-directory/
s3%253A%252F%252F%252Fremit-flink


Nico

On Wednesday, 20 September 2017 00:52:05 CEST Emily McMahon wrote:
> Thanks Eron & Fabian.
> 
> The issue was hitting a yarn proxy url vs the node itself. For example this
> worked
> http://
> {ip}:37716/jobs/1a0fd176ec8aabb9b8464fa481f755f0/cancel-with-savepoint/targe
> t-directory/s3%3A%2F%2F%2Fremit-flink
> 
> But this did not
> http://
> {ip}:20888/proxy/application_1504649135200_0001/jobs/1a0fd176ec8aabb9b8464fa
> 481f755f0/cancel-with-savepoint/target-directory/s3%3A%2F%2F%2Fremit-flink
> 
> It's a bit confusing because the cancel api works with either and the proxy
> url sometimes works as this was successful http://
> {ip}:20888/proxy/application_1504649135200_0001/jobs/cca2dd609c716a7b0a19570
> 0777e5b1f/cancel-with-savepoint/target-directory/tmp/
> 
> Cheers,
> Emily
> 
> On Tue, Sep 19, 2017 at 2:37 PM, Eron Wright  wrote:
> > Good news, it can be done if you carefully encode the target directory
> > with percent-encoding, as per:
> > https://tools.ietf.org/html/rfc3986#section-2.1
> > 
> > For example, given the directory `s3:///savepoint-bucket/my-awesome-job`,
> > which encodes to `s3%3A%2F%2F%2Fsavepoint-bucket%2Fmy-awesome-job`, I was
> > able to submit the following URL:
> > http://localhost:8081/jobs/5c360ded6e4b7d8db103e71d68b7c8
> > 3d/cancel-with-savepoint/target-directory/s3%3A%2F%2F%
> > 2Fsavepoint-bucket%2Fmy-awesome-job
> > 
> > And see the following in the log:
> > 2017-09-19 14:27:45,939 INFO 
> > org.apache.flink.runtime.jobmanager.JobManager> 
> >- Trying to cancel job 5c360ded6e4b7d8db103e71d68b7c83d
> > 
> > with savepoint to s3:///savepoint-bucket/my-awesome-job
> > 
> > -Eron
> > 
> > On Tue, Sep 19, 2017 at 1:54 PM, Fabian Hueske  wrote:
> >> Hi Emily,
> >> 
> >> thanks for reaching out.
> >> I'm not familiar with the details of the Rest API but Ufuk (in CC) might
> >> be able to help you.
> >> 
> >> Best, Fabian
> >> 
> >> 2017-09-19 10:23 GMT+02:00 Emily McMahon :
> >>> I've tried every combination I can think of to pass an s3 path as the
> >>> target directory (url encode, include trailing slash, etc)
> >>> 
> >>> I can successfully pass a local path as the target directory (ie
> >>> /jobs/$jobID/cancel-with-savepoint/target-directory/tmp) so I don't
> >>> think there's a problem with the jobId or rest of the url. I also
> >>> verified
> >>> I can create the savepoint on s3 from the command line so it's not a
> >>> permission issue.
> >>> 
> >>> Here's the same question on stack overflow
> >>>  >>> ully-qualified-path-to-a-savepoint-directory-using-the> (with the
> >>> exception that they are getting a 502 whereas I'm getting a 404)
> >>> 
> >>> using Flink 1.3.1
> >>> 
> >>> Anyone have a working example?
> >>> 
> >>> Thanks,
> >>> Emily



signature.asc
Description: This is a digitally signed message part.


Re: Question about concurrent checkpoints

2017-09-21 Thread Nico Kruber
Hi Narendra,
according to [1], even with asynchronous state snapshots (see [2]), a 
checkpoint is only complete after all sinks have received the barriers and all 
(asynchronous) snapshots have been processed. Since, if the number of 
concurrent checkpoints is 0, no checkpoint barriers will be emitted until the 
previous checkpoint is complete (see [1]), you will not get into the situation 
where two asynchronous snapshots are being taken concurrently.

If you enable concurrent checkpoints and asynchronous snapshots , they will 
process concurrently but on different snapshots of the state, i.e. although 
they are running in parallel, each stores the expected state.
If you want to know more about the details of how this is done, I can 
recommend Stefan's (cc'd) talk at Flink Forward last week [4]. He may also be 
able to answer in more detail in case I missed something.



Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/
stream_checkpointing.html#asynchronous-state-snapshots
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/
state_backends.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
checkpointing.html
[4] https://www.youtube.com/watch?
v=dWQ24wERItM&index=36&list=PLDX4T_cnKjD0JeULl1X6iTn7VIkDeYX_X

On Thursday, 21 September 2017 13:26:17 CEST Narendra Joshi wrote:
> Hi,
> 
> How are concurrent snapshots taken for an operator?
> Let's say an operator receives barriers for a checkpoint from all of
> its inputs. It triggers the checkpoint. Now, the checkpoint starts
> getting saved asynchronously. Before the checkpoint is acknowledged,
> the operator receives all barriers for all inputs for the next
> checkpoint. What will happen in this case if no concurrent checkpoints
> are allowed (i.e. the default value is used)? What will happen if
> concurrent checkpoints are allowed?
> 
> Thanks,
> Narendra Joshi



signature.asc
Description: This is a digitally signed message part.


Re: on Wikipedia Edit Stream example

2017-09-21 Thread Nico Kruber
Hi Haibin,
if you execute the program as in the Wiki edit example [1] from mvn as given 
or from the IDE, a local Flink environment will be set up which is not 
accessible form the outside by default. This is done by the call to
StreamExecutionEnvironment.getExecutionEnvironment();
which also works in remote environments, i.e. if the jar is uploaded to a 
running Flink cluster and executed there.

You can also do that if you like to, i.e. "mvn package" the wiki example and 
let Flink run the code as in [2].


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/
run_example_quickstart.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/
setup_quickstart.html

On Thursday, 21 September 2017 12:02:34 CEST Liu Haibin wrote:
> Hi,
> 
> I'm wondering why we don't need to run ./bin/start-local.sh for "Wikipedia
> Edit Stream" example but we need to do it for "Quickstart" example.
> 
> I found that I can access http://localhost:8081 for "Quickstart" example
> but not for "Wikipedia Edit Stream".
> 
> Where is "Wikipedia Edit Stream" running? Can I have http://localhost:8081
> for "Wikipedia Edit Stream"?
> 
> 
> Regards,
> Haibin



signature.asc
Description: This is a digitally signed message part.


Re: Savepoints and migrating value state data types

2017-09-21 Thread Nico Kruber
Hi Marc,
I assume you have set a UID for your CoProcessFunction as described in [1]?
Also, can you provide the Flink version you are working with and the 
serializer you are using?

If you have the UID set, your strategy seems to be the same as proposed by 
[2]: "Although it is not possible to change the data type of operator state, a 
workaround to overcome this limitation can be to define a second state with a 
different data type and to implement logic to migrate the state from the 
original state into the new state."

I'm no expert on this but it looks like it should work (although I'm curious 
on where the "aBuffer" in the error message comes from). I'm forwarding this 
to Gordon in CC because he probably knows better as he was involved in state 
migration before (afaik).



Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/
upgrading.html#application-state-compatibility
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/
upgrading.html#stateful-operators-and-user-functions

On Wednesday, 20 September 2017 14:16:27 CEST mrooding wrote:
> Hi
> 
> We've got a situation where we're merging several Kafka streams and for
> certain streams, we want to retain up to 6 days of history. We're trying to
> figure out how we can migrate savepoint data between application updates
> when the data type for a certain state buffer updates.
> 
> Let's assume that we have 2 streams with the following data types:
> 
> case class A(id: String, name: String)
> case class B1(id: String, price: Double)
> 
> We have a CoProcessFunction which combines the 2 streams and maintains 2
> different buffer states:
> 
> MapState[String, A] and ValueState[B1]
> 
> In our scenario, we're trying to anticipate the data type of B1 changing in
> the future. Let's assume that in the foreseeable future, B1 will change to:
> 
> case class B2(id: String, price: Double, date: String)
> 
> When we create a snapshot using B1 and then upgrading the application to B2
> the obvious attempt would be to try and retrieve the stored ValueState and
> the new ValueState:
> 
> val oldState = getRuntimeContext.getState(new
> ValueStateDescriptor[B1]("1Buffer", createTypeInformation[B1]))
> val newState = getRuntimeContext.getState(new
> ValueStateDescriptor[B2]("2Buffer", createTypeInformation[B2]))
> 
> However, as soon as you do the following error occurs:
> 
> Unable to restore keyed state [aBuffer]. For memory-backed keyed state, the
> previous serializer of the keyed state must be present; the serializer could
> have been removed from the classpath, or its implementation have changed
> and could not be loaded. This is a temporary restriction that will be fixed
> in future versions.
> 
> Our assumption is that the process operator which has a specified ID which
> Flink uses to save and restore savepoints. The CoProcessorFunction types
> changed from CoProcessFunction[A, B1, A] to CoProcessFunction[A, B2, A] and
> therefore the savepoint data does not apply to the operator anymore. Is this
> assumption correct?
> 
> We've been going through the documentation and source code of Flink and it
> seems like there's no achieve this kind of migrations. If this is the case,
> we'd be interested in contributing to Flink to get this added a.s.a.p. and
> would love to get some feedback on how to approach this.
> 
> Thanks in advance
> 
> Marc
> 
> 
> 
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: This is a digitally signed message part.


Re: Question about concurrent checkpoints

2017-09-21 Thread Nico Kruber
On Thursday, 21 September 2017 20:08:01 CEST Narendra Joshi wrote:
> Nico Kruber  writes:
> > according to [1], even with asynchronous state snapshots (see [2]), a
> > checkpoint is only complete after all sinks have received the barriers and
> > all (asynchronous) snapshots have been processed. Since, if the number of
> > concurrent checkpoints is 0, no checkpoint barriers will be emitted until
> > the previous checkpoint is complete (see [1]), you will not get into the
> > situation where two asynchronous snapshots are being taken concurrently.
> 
> Does this mean that operators would stop processing streams (because
> they received all barriers for a new checkpoint) and wait for
> the ongoing asynchronous checkpoint to complete or it means that no
> barriers would be injected into sources before checkpoint finishes?

The latter (as mentioned): no new barriers are injected into the sources.

The only thing that is waiting for asynchronous state snapshots to complete is 
the checkpoint coordinator (in any case!) since a checkpoint is only complete 
once all operators have stored their state. Operation continues as expected.


Nico

signature.asc
Description: This is a digitally signed message part.


Re: high-availability.jobmanager.port vs jobmanager.rpc.port

2017-09-25 Thread Nico Kruber
Hi Elias,
indeed that looks strange but was introduced with FLINK-3172 [1] with an 
argument about using the same configuration key (as opposed to having two 
different keys as mentioned) starting at
https://issues.apache.org/jira/browse/FLINK-3172?
focusedCommentId=15091940#comment-15091940


Nico

[1] https://issues.apache.org/jira/browse/FLINK-3172

On Sunday, 24 September 2017 03:04:51 CEST Elias Levy wrote:
> I am wondering why HA mode there is a need for a separate config parameter
> to set the JM RPC port (high-availability.jobmanager.port) and why this
> parameter accepts a range, unlike jobmanager.rpc.port.



Re: History Server

2017-09-25 Thread Nico Kruber
Hi Elias,
in theory, it could be integrated into a single web interface, but this was 
not done so far.
I guess the main reason for keeping it separate was probably to have a better 
separation of concerns as the history server is actually independent of the 
current JobManager execution and merely displays previous job results which 
may also come from different or previously existing JobManager instances which 
stored history data in its storage directory.

Chesnay (cc'd) may elaborate a bit more in case you'd like to change that and 
integrate the history server (interface) into the JobManager.


Nico

On Sunday, 24 September 2017 02:48:40 CEST Elias Levy wrote:
> I am curious, why is the History Server a separate process and Web UI
> instead of being part of the Web Dashboard within the Job Manager?




Re: Using HiveBolt from storm-hive with Flink-Storm compatibility wrapper

2017-09-25 Thread Nico Kruber
Hi Federico,
I also did not find any implementation of a hive sink, nor much details on this 
topic in general. Let me forward this to Timo and Fabian (cc'd) who may know 
more.

Nico

On Friday, 22 September 2017 12:14:32 CEST Federico D'Ambrosio wrote:
> Hello everyone,
> 
> I'd like to use the HiveBolt from storm-hive inside a flink job using the
> Flink-Storm compatibility layer but I'm not sure how to integrate it. Let
> me explain, I would have the following:
> 
> val mapper = ...
> 
> val hiveOptions = ...
> 
> streamByID
>   .transform[OUT]("hive-sink", new BoltWrapper[IN, OUT](new
> HiveBolt(hiveOptions)))
> 
> where streamByID is a DataStream[Event].
> 
> What would be the IN and OUT types? HiveBolt executes on a storm Tuple, so,
> I'd think that In should be an Event "tuple-d" ( event => (field1, field2,
> field3 ...) ), while OUT, since I don't want the stream to keep flowing
> would be null or None?
> 
> Alternatively, do you know any implementation of an hive sink in Flink?
> Other than the adaptation of the said HiveBolt in a RichSinkFunction?
> 
> Thanks for your attention,
>  Federico




Re: Building scala examples

2017-09-25 Thread Nico Kruber
Hi Michael,
from what I see, Java and Scala examples reside in different packages, e.g.
* org.apache.flink.streaming.scala.examples.async.AsyncIOExample vs.
* org.apache.flink.streaming.examples.async.AsyncIOExample

A quick run on the Flink 1.3. branch revealed flink-examples-
streaming_2.10-1.3-SNAPSHOT.jar containing both (which you can verify with 
your favorite archiver tool for zip files).

Afaik, there is no simple switch to turn off Java or Scala examples. You may 
either adapt the pom.xml or create your own Project with the examples and 
programming languages you need.


Nico


On Saturday, 23 September 2017 12:45:04 CEST Michael Fong wrote:
> Hi,
> 
> I am studying how to build a scala program from flink-examples/.
> 
> I can see there are two source folders java/ and scala/ from IntelliJ, and
> for most examples, there is a copy of examples for Java and Scala.
> Executing 'mvn clean package -Pbuild-jar' would rests in a jar file under
> target/. I am wondering if that is a Java or Scala example that I just
> compiled? In addition, is there a way to selectively choose Java o Scala
> example to build with current maven settings?
> 
> Thanks in advance,




Re: Can i use Avro serializer as default instead of kryo serializer in Flink for scala API ?

2017-09-25 Thread Nico Kruber
Hi Shashank,
enabling Avro as the default de/serializer for Flink should be as simple as 
the following, according to [1]

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.enableForceAvro()

I am, however, no expert on this and the implications regarding the use of 
Avro from inside Scala, so I included Gordon (cc'd) who may know more.



Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
types_serialization.html

On Saturday, 23 September 2017 10:11:28 CEST shashank agarwal wrote:
> Hello Team,
> 
> As our schema evolves due to business logics. We want to use expendable
> schema like Avro as default serializer and deserializer for flink program
> and states.
> 
> My doubt is, We are using Scala API in our flink program, But Avro default
> supports Java POJO. So how we can use this in our scala APi should we have
> to use serializer like Avro4s ? Or we can use default Avro in our Scala
> flink app than what will be the steps ?
> 
> Please guide.




Re: Building scala examples

2017-09-27 Thread Nico Kruber
Hi Michael,
yes, it seems that the self-contained jars only contain the Java examples.

You may also follow the quickstart [1] to get started writing Flink streaming 
programs in Scala.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/
scala_api_quickstart.html

On Wednesday, 27 September 2017 05:04:31 CEST Michael Fong wrote:
> Thanks, Nico.
> 
> 
> I look again at flink-examples- streaming_2.10-1.4-SNAPSHOT.jar, and it
> indeed contains both.
> 
> Originally I was looking at each self-contained jars as I used them as
> examples to create and run my own streaming program. They only contain java
> compiled class, if I am not mistaken.
> 
> Let me try to create a scala example with similar build procedure.
> 
> Thanks!
> 
> 
> On Mon, Sep 25, 2017 at 10:41 PM, Nico Kruber 
> 
> wrote:
> > Hi Michael,
> > from what I see, Java and Scala examples reside in different packages,
> > e.g.
> > * org.apache.flink.streaming.scala.examples.async.AsyncIOExample vs.
> > * org.apache.flink.streaming.examples.async.AsyncIOExample
> > 
> > A quick run on the Flink 1.3. branch revealed flink-examples-
> > streaming_2.10-1.3-SNAPSHOT.jar containing both (which you can verify with
> > your favorite archiver tool for zip files).
> > 
> > Afaik, there is no simple switch to turn off Java or Scala examples. You
> > may
> > either adapt the pom.xml or create your own Project with the examples and
> > programming languages you need.
> > 
> > 
> > Nico
> > 
> > On Saturday, 23 September 2017 12:45:04 CEST Michael Fong wrote:
> > > Hi,
> > > 
> > > I am studying how to build a scala program from flink-examples/.
> > > 
> > > I can see there are two source folders java/ and scala/ from IntelliJ,
> > 
> > and
> > 
> > > for most examples, there is a copy of examples for Java and Scala.
> > > Executing 'mvn clean package -Pbuild-jar' would rests in a jar file
> > > under
> > > target/. I am wondering if that is a Java or Scala example that I just
> > > compiled? In addition, is there a way to selectively choose Java o Scala
> > > example to build with current maven settings?
> > > 
> > > Thanks in advance,




Re: Negative values using latency marker

2017-11-03 Thread Nico Kruber
Hi Tovi,
if I see this correctly, the LatencyMarker gets its initial timstamp during 
creation at the source and the latency is reported as a metric at a sink by 
comparing the initial timestamp with the current time.
If the clocks between the two machines involved diverge, e.g. the sinks clock 
falling behind, the difference may be negative.


Nico

On Thursday, 2 November 2017 17:58:51 CET Sofer, Tovi  wrote:
> Hi group,
> 
> Can someone maybe elaborate how can latency gauge shown by latency marker be
> negative?
> 
> 2017-11-02 18:54:56,842 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Sink: FinalSink.0.latency: {LatencySourceDescriptor{vertexID=1,
> subtaskIndex=0}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0,
> mean=-5.0}, LatencySourceDescriptor{vertexID=1, subtaskIndex=1}={p99=-5.0,
> p50=-5.0, min=-5.0, max=-5.0, p95=-5.0, mean=-5.0},
> LatencySourceDescriptor{vertexID=1, subtaskIndex=2}={p99=-6.0, p50=-6.0,
> min=-6.0, max=-6.0, p95=-6.0, mean=-6.0},
> LatencySourceDescriptor{vertexID=1, subtaskIndex=3}={p99=-6.0, p50=-6.0,
> min=-6.0, max=-6.0, p95=-6.0, mean=-6.0}} 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.60SecWarmUpRecordsCounter: 2858446
> 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.3.numRecordsOut: 1954784 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.ActualRecordsCounter: 4962675 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.AverageLatencyMs: 0.0753785 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.HighLatencyMsgPercentage: 0.5918576
> 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.0.numRecordsOutPerSecond:
> 12943.1167 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.numRecordsInPerSecond: 51751.4 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.3.numRecordsOutPerSecond: 12935.05
> 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.2.numRecordsOutPerSecond:
> 12946.9166 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.1.numRecordsOutPerSecond:
> 12926.3168 2017-11-02 18:54:56,844 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:24753
> max:19199891 mean:77637.6484 stddev:341333.9414842662 p50:40752.0
> p75:49809.0 p95:190480.95 p98:539110.819994 p99:749224.889995
> p999:3817927.9259998496
> 
> Regards,
> Tovi



signature.asc
Description: This is a digitally signed message part.


Re: Making external calls from a FlinkKafkaPartitioner

2017-11-03 Thread Nico Kruber
Hi Ron,
imho your code should be fine (except for a potential visibility problem on the 
changes of the non-volatile partitionMap member, depending on your needs).

The #open() method should be called (once) for each sink initialization 
(according to the javadoc) and then you should be fine with the asynchronous 
updater thread.
I'm including Gordon (cc'd) just to be sure as he may know more.


Nico

On Friday, 3 November 2017 04:06:02 CET Ron Crocker wrote:
> We have a system where the Kafka partition a message should go into is a
> function of a value in the message. Often, it’s value % # partitions, but
> for some values it’s not - it’s a specified list of partitions that changes
> over time. Our “simple Java library” that produces messages for this system
> also has a background thread that periodically polls a HTTP endpoint (at a
> rate of 1/minute as its default) to refresh that list of special cases.
> 
> It’s easy to create a FlinkKafkaPartitioner that does the mod operation;
> what I’m not so sure about is how to get this polling operation into the
> partitioner. I’m about to try it the obvious way (create a background
> thread that polls the URL and updates the partition map), but I wonder if
> that’s actually going to cause a bunch of problems for the Flink runtime.
> 
> Here’s the code that I have right now:
> public class EventInsertPartitioner extends KafkaPartitioner String>> { private final String partitionerURL;
> private final long updateIntervalInMillis;
> private Map> partitionMap;
> private ScheduledExecutorService executor;
> 
> public EventInsertPartitioner(String partitionerURL, long
> updateIntervalInMillis) { this.partitionerURL = partitionerURL;
> this.updateIntervalInMillis = updateIntervalInMillis;
> this.partitionMap = new HashMap<>();
> }
> 
> @Override
> public void open(int parallelInstanceId, int parallelInstances, int[]
> partitions) { executor = Executors.newScheduledThreadPool(1);
> executor.scheduleAtFixedRate(
> () -> updatePartitionMapRunnable(),
> updateIntervalInMillis,
> updateIntervalInMillis,
> TimeUnit.MILLISECONDS);
> 
> }
> 
> private void updatePartitionMapRunnable() {
> // Make synchronous request to partitionerURL
> // This is a simple JSON that matches our data
> String response = "{1:[1,2,3],2:[2]}";
> // Replace current partitionMap with new HashMap from the response
> this.partitionMap = convertResponseToMap(response);
> // Replacing the current value of partitionMap with the updated
> version doesn't // require synchronization
> }
> 
> private Map> convertResponseToMap(String response) {
> Map> hashMap = new HashMap<>();
> // Convert response to JSON structure and just use that?
> // or Iterate and add to local hashMap
> return hashMap;
> }
> 
> @Override
> public int partition(Tuple2 next, byte[] serializedKey,
> byte[] serializedValue, int numPartitions) { long myKey = next.f0;
> 
> if (partitionMap.containsKey(myKey)) {
> List partitions = partitionMap.get(myKey);
> myKey =
> partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); }
> 
> return (int)(myKey % numPartitions);
> }
> }
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com
> M: +1 630 363 8835

signature.asc
Description: This is a digitally signed message part.


Re: Incremental checkpointing documentation

2017-11-03 Thread Nico Kruber
Hi Elias,
let me answer the questions to the best of my knowledge, but in general I 
think this is as expected.
(Let me give a link to the docs explaining the activation [1] for other 
readers first.)

On Friday, 3 November 2017 01:11:52 CET Elias Levy wrote:
> What is the interaction of incremental checkpointing and external
> checkpoints?

Externalized checkpoints may be incremental [2] (I'll fix the formatting error 
that is not rendering the arguments as a list, making them less visible)

> Any interaction with the state.checkpoints.num-retained config?

Yes, this remains the number of available checkpoints. There may, however, be 
more folders containing RocksDB state that was originally put into checkpoint 
X but is also still required in checkpoint X+10 or so. These files will be 
cleaned up once they are not needed anymore.

> Does incremental checkpointing require any maintenance?

No, state is cleaned up once it is not used/referenced anymore.

> Any interaction with savepoints?

No, a savepoint uses Flink's own data format and is not incremental [3].

> Does it perform better against certain "file systems"?  E.g. it S3 not
> recommended for it?  How about EFS?

I can't think of a reason this should be any different to non-incremental 
checkpoints. Maybe Stefan (cc'd) has some more info on this.

For more details on the whole topic, I can recommend Stefan's talk at the last 
Flink Forward [4] though.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/
large_state_tuning.html#tuning-rocksdb
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/
checkpoints.html#difference-to-savepoints
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/
savepoints.html
[4] https://www.youtube.com/watch?
v=dWQ24wERItM&index=36&list=PLDX4T_cnKjD0JeULl1X6iTn7VIkDeYX_X

signature.asc
Description: This is a digitally signed message part.


Re: Negative values using latency marker

2017-11-06 Thread Nico Kruber
Ok, digging into it a bit further:

The LatencyMarker is scheduled at a certain period with some initialDelay. Its 
initial time is `System.currentTimeMillis() + initialDelay` (when it should 
first be run). Depending on your system's load, this run may actually be 
delayed (but then the marker's time will fall behind, not explaining a 
negative value) but from the Executor's documentation, I don't think, it 
should execute it too early. For future markers, their time will simply be 
increased by the period (which may fall behind for the same reason).

Before emitting the metric, the difference to `System.currentTimeMillis()` 
will be used which is based on system time and may decrease if the clock is 
adjusted, e.g. via NTP. Also, this is probably called from a different thread 
and `System.currentTimeMillis()` apparently may jump backwards there as well 
[1].


Nico

[1] 
https://stackoverflow.com/questions/2978598/will-system-currenttimemillis-always-return-a-value-previous-calls


On Sunday, 5 November 2017 09:22:05 CET Sofer, Tovi  wrote:
> Hi Nico,
> 
> Actually the run below is on my local machine, and both Kafka and flink run
> on it.
> 
> Thanks and regards,
> Tovi
> -----Original Message-
> From: Nico Kruber [mailto:n...@data-artisans.com]
> Sent: יום ו 03 נובמבר 2017 15:22
> To: user@flink.apache.org
> Cc: Sofer, Tovi [ICG-IT] 
> Subject: Re: Negative values using latency marker
> 
> Hi Tovi,
> if I see this correctly, the LatencyMarker gets its initial timstamp during
> creation at the source and the latency is reported as a metric at a sink by
> comparing the initial timestamp with the current time. If the clocks
> between the two machines involved diverge, e.g. the sinks clock falling
> behind, the difference may be negative.
> 
> 
> Nico
> 
> On Thursday, 2 November 2017 17:58:51 CET Sofer, Tovi  wrote:
> > Hi group,
> > 
> > Can someone maybe elaborate how can latency gauge shown by latency
> > marker be negative?
> > 
> > 2017-11-02 18:54:56,842 INFO
> > com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.Sink: FinalSink.0.latency: {LatencySourceDescriptor{vertexID=1,
> > subtaskIndex=0}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0,
> > mean=-5.0}, LatencySourceDescriptor{vertexID=1,
> > subtaskIndex=1}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0,
> > mean=-5.0}, LatencySourceDescriptor{vertexID=1,
> > subtaskIndex=2}={p99=-6.0, p50=-6.0, min=-6.0, max=-6.0, p95=-6.0,
> > mean=-6.0}, LatencySourceDescriptor{vertexID=1,
> > subtaskIndex=3}={p99=-6.0, p50=-6.0, min=-6.0, max=-6.0, p95=-6.0,
> > mean=-6.0}} 2017-11-02 18:54:56,843 INFO
> > com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.AverageE2ELatencyChecker.0.60SecWarmUpRecordsCounter: 2858446
> > 2017-11-02 18:54:56,843 INFO
> > com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.Source: fixTopicConsumerSource.3.numRecordsOut: 1954784 2017-11-02
> > 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.AverageE2ELatencyChecker.0.ActualRecordsCounter: 4962675
> > 2017-11-02
> > 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.AverageE2ELatencyChecker.0.AverageLatencyMs: 0.0753785 2017-11-02
> > 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.AverageE2ELatencyChecker.0.HighLatencyMsgPercentage: 0.5918576
> > 2017-11-02 18:54:56,843 INFO
> > com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.Source: fixTopicConsumerSource.0.numRecordsOutPerSecond:
> > 12943.1167 2017-11-02 18:54:56,843 INFO
> > com.citi.artemis.flink.reporters.ArtemisReporter -
> > [Flink-MetricRegistry-1]
> > 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> > Job.AverageE2ELatencyChecker.0.numRecordsInPerSecond: 51751.4
> > 2017-11-02
> > 18:54:56,843 INFO  com.citi.artemis.flink.repor

Re: ResultPartitionMetrics

2017-11-08 Thread Nico Kruber
Hi Aitozi,
the difference is the scope: the normal metrics (without 
taskmanager.net.detailed-metrics) reflect _all_ buffers of a task while the 
detailed statistics are more fine-grained and give you statistics per input (or 
output) gate - the "total" there reflects the fact that each gate has multiple 
channels and the metrics offered are the sum among all of them.


Nico

On Monday, 23 October 2017 09:31:04 CET Timo Walther wrote:
> Hi Aitozi,
> 
> I will loop in people that are more familar with the network stack and
> metrics. Maybe this is a bug?
> 
> Regards,
> Timo
> 
> Am 10/22/17 um 4:36 PM schrieb aitozi:
> > Hi,
> > 
> > i see in version 1.3, it add the ResultPartitionMetrics with
> > issue:https://issues.apache.org/jira/browse/FLINK-5090
> > 
> > but i am doubt what is the difference between totalQueueLen and
> > inputQueueLength in
> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/met
> > rics.html#network
> > 
> > i read the code where register the both metric in Task.java:
> > 
> > this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
> > 
> > // register detailed network metrics, if configured
> > if
> > (taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWOR
> > K_DETAILED_METRICS)) {
> > 
> > // similar to 
> > MetricUtils.instantiateNetworkMetrics() but inside 
this
> > 
> > IOMetricGroup
> > 
> > MetricGroup networkGroup =
> > 
> > this.metrics.getIOMetricGroup().addGroup("Network");
> > 
> > MetricGroup outputGroup = 
> > networkGroup.addGroup("Output");
> > MetricGroup inputGroup = 
> > networkGroup.addGroup("Input");
> > 
> > // output metrics
> > for (int i = 0; i < producedPartitions.length; 
> > i++) {
> > 
> > 
> > ResultPartitionMetrics.registerQueueLengthMetrics(
> > 
> > outputGroup.addGroup(i), 
> > producedPartitions[i]);
> > 
> > }
> > 
> > for (int i = 0; i < inputGates.length; i++) {
> > 
> > 
> > InputGateMetrics.registerQueueLengthMetrics(
> > 
> > inputGroup.addGroup(i), 
> > inputGates[i]);
> > 
> > }
> > 
> > }
> > 
> > i think the first :initializeBufferMetrics#InputBuffersGauge will get all
> > the buffers in AllInputGate of the Task and i think the method in
> > InputGateMetric
> > "group.gauge("totalQueueLen",metrics.getTotalQueueLenGauge());" does the
> > same thing , if i understand wrong , please tell me,.
> > 
> > thanks,
> > Aitozi
> > 
> > 
> > 
> > --
> > Sent from:
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: This is a digitally signed message part.


Re: Getting java.lang.ClassNotFoundException: for protobuf generated class

2017-11-13 Thread Nico Kruber
Hi Shankara,
can you give us some more details, e.g.
- how do you run the job?
- how do you add/include the jar with the missing class?
- is that jar file part of your program's jar or separate?
- is the missing class, i.e. "com.huawei.ccn.intelliom.ims.MeasurementTable
$measurementTable" (an inner class starting in lower-case?), really in the jar 
file? It might be a wrongly generated protobuf class ...


Nico

On Tuesday, 7 November 2017 15:34:35 CET Shankara wrote:
> Hi,
> 
> I am using flink 2.1.0 version and protobuf-java 2.6.1 version.
> I am getting below exception for protobuf generated class. I have included
> jar which is having that class.
> 
> Can you please help me to check it.
> 
> org.apache.beam.sdk.util.UserCodeException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException
> : Could not forward element to next operator
>   at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>   at
> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invo
> keProcessElement(Unknown Source)
>   at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoF
> nRunner.java:177) at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunne
> r.java:141) at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processEle
> ment(DoFnRunnerWithMetricsUpdate.java:65) at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.pr
> ocessElement(DoFnOperator.java:368) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .pushToOperator(OperatorChain.java:528) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .collect(OperatorChain.java:503) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .collect(OperatorChain.java:483) at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutp
> ut.collect(AbstractStreamOperator.java:891) at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutp
> ut.collect(AbstractStreamOperator.java:869) at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(Timest
> ampedCollector.java:51) at
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$StripIdsMap
> .flatMap(FlinkStreamingTransformTranslators.java:213) at
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$StripIdsMap
> .flatMap(FlinkStreamingTransformTranslators.java:207) at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(Stream
> FlatMap.java:50) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .pushToOperator(OperatorChain.java:528) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .collect(OperatorChain.java:503) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .collect(OperatorChain.java:483) at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutp
> ut.collect(AbstractStreamOperator.java:891) at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutp
> ut.collect(AbstractStreamOperator.java:869) at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermar
> kContext.processAndCollectWithTimestamp(StreamSourceContexts.java:309) at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkConte
> xt.collectWithTimestamp(StreamSourceContexts.java:408) at
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSou
> rceWrapper.emitElement(UnboundedSourceWrapper.java:329) at
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSou
> rceWrapper.run(UnboundedSourceWrapper.java:267) at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:
> 87) at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:
> 55) at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTa
> sk.java:95) at
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(Stopp
> ableSourceStreamTask.java:39) at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:2
> 63) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException
> : Could not forward element to next operator
>   at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .pushToOperator(OperatorChain.java:530) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .collect(OperatorChain.java:503) at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput
> .collect(OperatorChain.java:483) at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutp
> 

Re: AvroParquetWriter may cause task managers to get lost

2017-11-13 Thread Nico Kruber
Hi Ivan,
sure, the more work you do per record, the slower the sink will be. However, 
this should not influence (much) the liveness checks inside flink.
Do you get some meaningful entries in the TaskManagers' logs indicating the 
problem?

I'm no expert on Avro and don't know how much actual work it is to create such 
a writer, but from the code you gave:
- wouldn't your getPos() circumvent the BucketingSink's rolling file property? 
- similarly for flush() which may be dangerous during recovery (judging from 
its documentation - "returns the offset that the file must be truncated to at 
recovery")?


Nico

On Tuesday, 7 November 2017 19:51:35 CET Ivan Budincevic wrote:
> Hi all,
> 
> We recently implemented a feature in our streaming flink job in which we
> have a AvroParquetWriter which we build every time the overridden “write”
> method from org.apache.flink.streaming.connectors.fs.Writer gets called. We
> had to do this because the schema of each record is potentially different
> and we have to get the schema for the AvroParquetWriter out of the record
> itself first. Previously this builder was built only one time in the “open”
> method and from then only the write method was called per record.
 
> Since implementing this our job crashes with “Connection unexpectedly closed
> by remote task manager ‘internal company url’. This might indicate that the
> remote task manager was lost.”
 
> We did not run into any issues on our test environments, so we are
> suspecting this problem occurs only on higher loads as we have on our
> production environment. Unfortunately we still don’t have a proper means of
> reproducing this much load on our test environment to debug.
 
> Would having the AvroParquetWriter being built on every write be causing the
> problem and if so why would that be the case?
 
> Any help in getting to the bottom of the issue would be really appreciated.
> Bellow there is a code snippet of the class which uses the
> AvroParquetWriter.
 
> Best regards,
> Ivan Budincevic
> Software engineer, bol.com
> Netherlands
> 
> package com.bol.measure.timeblocks.files;
> 
> import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.flink.streaming.connectors.fs.Writer;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.parquet.avro.AvroParquetWriter;
> import org.apache.parquet.column.ParquetProperties;
> import org.apache.parquet.hadoop.ParquetFileWriter;
> import org.apache.parquet.hadoop.ParquetWriter;
> import org.apache.parquet.hadoop.metadata.CompressionCodecName;
> 
> import java.io.IOException;
> 
> public class SlottedMeasurementsWriter implements
> Writer {
 private transient
> ParquetWriter parquetWriter;
>   private boolean overwrite;
>   private Path path;
> 
>   public SlottedMeasurementsWriter(boolean overwrite) {
> this.overwrite = overwrite;
>   }
> 
>   @Override
>   public void open(FileSystem fs, Path path) throws IOException {
> this.path = path;
>   }
> 
>   @Override
>   public long flush() throws IOException {
> return parquetWriter.getDataSize();
>   }
> 
>   @Override
>   public long getPos() throws IOException {
> return parquetWriter.getDataSize();
>   }
> 
>   @Override
>   public void close() throws IOException {
> parquetWriter.close();
>   }
> 
>   @Override
>   public void write(SlottedMeasurements slot) throws IOException {
> 
> final AvroParquetWriter.Builder writerBuilder =
>   AvroParquetWriter
> .builder(path)
> .withSchema(slot.getMeasurements().get(0).getSchema())
> .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
> .withDictionaryEncoding(true)
> .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0);
> if (overwrite) {
>   writerBuilder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE);
> }
> 
> parquetWriter = writerBuilder.build();
> 
> for (GenericRecord measurement : slot.getMeasurements()) {
>   parquetWriter.write(measurement);
> }
>   }
> 
> 
>   @Override
>   public Writer duplicate() {
> return new SlottedMeasurementsWriter(this.overwrite);
>   }
> }
> 
> 



signature.asc
Description: This is a digitally signed message part.


Re: Flink HA Zookeeper Connection Timeout

2017-11-13 Thread Nico Kruber
Hi Sathya,
have you checked this yet?
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
jobmanager_high_availability.html

I'm no expert on the HA setup, have you also tried Flink 1.3 just in case?


Nico

On Wednesday, 8 November 2017 04:02:47 CET Sathya Hariesh Prakash (sathypra) 
wrote:
> Hi – We’re currently testing Flink HA and running into a zookeeper timeout
> issue. Error log below.
 
> Is there a production checklist or any information on parameters that are
> related to flink HA that I need to pay attention to?
 
> Any pointers would really help. Please let me know if any additional
> information is needed. Thanks!
 
> NOTE: I see multiple connection timeout messages. With different elapsed
> times.
 
> {
>"timeMillis":1510095254557,
>"thread":"Curator-Framework-0",
>"level":"ERROR",
>   
> "loggerName":"org.apache.flink.shaded.org.apache.curator.ConnectionState",
> "message":"Connection timed out for connection string
> (zookeeper.system.svc.cluster.local:2181) and timeout (15000) / elapsed
> (15004)", "thrown":{
>   "commonElementCount":0,
>   "localizedMessage":"KeeperErrorCode = ConnectionLoss",
>   "message":"KeeperErrorCode = ConnectionLoss",
>  
> "name":"org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossExc
> eption",
 "extendedStackTrace":[
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.ConnectionState",
> "method":"checkTimeouts",
> "file":"ConnectionState.java",
> "line":197,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.ConnectionState",
> "method":"getZooKeeper",
> "file":"ConnectionState.java",
> "line":87,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.CuratorZookeeperClient"
> ,
 "method":"getZooKeeper",
> "file":"CuratorZookeeperClient.java",
> "line":115,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorF
> rameworkImpl",
 "method":"performBackgroundOperation",
> "file":"CuratorFrameworkImpl.java",
> "line":806,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorF
> rameworkImpl",
 "method":"backgroundOperationsLoop",
> "file":"CuratorFrameworkImpl.java",
> "line":792,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorF
> rameworkImpl",
 "method":"access$300",
> "file":"CuratorFrameworkImpl.java",
> "line":62,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
>
> "class":"org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorF
> rameworkImpl$4",
 "method":"call",
> "file":"CuratorFrameworkImpl.java",
> "line":257,
> "exact":true,
> "location":"flink-runtime_2.10-1.2.jar",
> "version":"1.2"
>  },
>  {
> "class":"java.util.concurrent.FutureTask",
> "method":"run",
> "file":"FutureTask.java",
> "line":266,
> "exact":true,
> "location":"?",
> "version":"1.8.0_66"
>  },
>  {
> "class":"java.util.concurrent.ThreadPoolExecutor",
> "method":"runWorker",
> "file":"ThreadPoolExecutor.java",
> "line":1142,
> "exact":true,
> "location":"?",
> "version":"1.8.0_66"
>  },
>  {
> "class":"java.util.concurrent.ThreadPoolExecutor$Worker",
> "method":"run",
> "file":"ThreadPoolExecutor.java",
> "line":617,
> "exact":true,
> "location":"?",
> "version":"1.8.0_66"
>  },
>  {
> "class":"java.lang.Thread",
> "method":"run",
> "file":"Thread.java",
> "line":745,
> "exact":true,
> "location":"?",
> "version":"1.8.0_66"
>  }
>   ]
>},
>"endOfBatch":false,
>"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger",
>"threadId":258,
>"threa

Re: Streaming : a way to "key by partition id" without redispatching data

2017-11-13 Thread Nico Kruber
Hi Gwenhaël,
several functions in Flink require keyed streams because they manage their 
internal state by key. These keys, however, should be independent of the 
current execution and its parallelism so that checkpoints may be restored to 
different levels of parallelism (for re-scaling, see [1]).
Also, different operators, e.g. the source vs. the map, may have a different 
number of parallel tasks in which case you'd need to shuffle the data in order 
to adapt. The same goes for possible differences in the parallelism of the 
Kafka partitions vs. the parallelism you use in Flink.

If, however, all your operators have the same parallelism, doing multiple 
keyBy(0) calls in your program will not re-shuffle the data, because of the 
deterministic assignment of keys to operators.


Nico

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
On Thursday, 9 November 2017 18:00:13 CET Gwenhael Pasquiers wrote:
> Hello,
> 
> (Flink 1.2.1)
> 
> For performances reasons I'm trying to reduce the volume of data of my
> stream as soon as possible by windowing/folding it for 15 minutes before
> continuing to the rest of the chain that contains keyBys and windows that
> will transfer data everywhere.
> 
> Because of the huge volume of data, I want to avoid "moving" the data
> between partitions as much as possible (not like a naïve KeyBy does). I
> wanted to create a custom ProcessFunction (using timer and state to fold
> data for X minutes) in order to fold my data over itself before keying the
> stream but even ProcessFunction needs a keyed stream...
> 
> Is there a specific "key" value that would ensure me that my data won't be
> moved to another taskmanager (that it's hashcode will match the partition
> it is already in) ? I thought about the subtask id but I doubt I'd be that
> lucky :-)
> 
> Suggestions
> 
> · Wouldn't it be useful to be able to do a "partitionnedKeyBy" that
> would not move data between nodes, for windowing operations that can be
> parallelized.
> 
> o   Something like kafka => partitionnedKeyBy(0) => first folding =>
> keyBy(0) => second folding => 
> 
> · Finally, aren't all streams keyed ? Even if they're keyed by a
> totally arbitrary partition id until the user chooses its own key,
> shouldn't we be able to do a window (not windowAll) or process over any
> normal Stream's partition ?
> 
> B.R.
> 
> Gwenhaël PASQUIERS



signature.asc
Description: This is a digitally signed message part.


Re: JobManager shows TaskManager was lost/killed while TaskManger Process is still running and the network is OK.

2017-11-13 Thread Nico Kruber
>From what I read in [1], simply add JVM options to env.java.opts as you would 
when you start a Java program yourself, so setting "-XX:+UseG1GC" should 
enable G1.

Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
config.html#common-options

On Friday, 15 September 2017 19:36:02 CET AndreaKinn wrote:
> Hi, sorry for re-vive this old conversation.
> I have exactly the same problem, can you provide more details about your
> solution?
> Have you used another garbage collector as G1? How can I set it?
> 
> I've seen on configuration guideline I have to set the option: env.java.opts
> but I don't know which is the value to insert to set G1.
> 
> 
> Renkai wrote
> 
> > The zookeeper related logs are loged by user codes,I finally find the
> > reason why the taskmanger was lost,that was I gave the taskmanager a big
> > amount of memory, the jobmanager identify the taskmanager is down during
> > the taskmanager in Full GC.Thanks for your help.
> 
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: This is a digitally signed message part.


Re: Flink takes too much memory in record serializer.

2017-11-14 Thread Nico Kruber
We're actually also trying to have the serializer stateless in future and may 
be able to remove the intermediate serialization buffer which is currently 
growing on heap before we copy the data into the actual target buffer. This 
intermediate buffer grows and is pruned after serialization if it is bigger 
than 5MB (see DataOutputSerializer), and re-used for anything below that 
threshold. So you may actually have up to 5MB per output channel which sits 
waiting for data.
Please refer to https://issues.apache.org/jira/browse/FLINK-4893 for updates 
on this.

These improvements will certainly reduce some of our memory footprint and help 
you. Throughput will then, of course, be limited by your network's speed and 
the number of network buffers to hold this amount of data and to saturate your 
network connections. The availability of these buffers will then limit your 
throughput accordingly.


Nico

On Tuesday, 14 November 2017 11:29:33 CET Chesnay Schepler wrote:
> I don't there's anything you can do except reducing the parallelism or
> the size of your messages.
> 
> A separate serializer is used for each channel as the serializers are
> stateful; they are capable of writing records partially
> to a given MemorySegment to better utilize the allocated memory.
> 
> How many messages is each operator instance processing per second? I
> would imagine that at this scale
> your memory consumption goes through the roof anyway due to the message
> size.
> Even if every operator instance is only processing 10 records/s you're
> already looking at 10TB memory usage
> for in-flight data.
> 
> On 14.11.2017 11:11, yunfan123 wrote:
> > In the class org.apache.flink.runtime.io.network.api.writer.RecordWriter,
> > it has same number of serializers with the numChannels.
> > If I first operator has 500 parallels and the next operator has 1000
> > parallels.
> > And every message in flink is 2MB.
> > The job takes 500 * 1000 * 2MB as 1TB memory in totally!!!
> > Can I do anything to reduce the memory usage.
> > 
> > 
> > 
> > --
> > Sent from:
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: This is a digitally signed message part.


Re: Correlation between data streams/operators and threads

2017-11-17 Thread Nico Kruber
regarding 3.
a) The taskmanager logs are missing, are there any?
b) Also, the JobManager logs say you have 4 slots available in total - is this 
enough for your 5 devices scenario?
c) The JobManager log, however, does not really reveal what it is currently 
doing, can you set the log level to DEBUG to see more?
d) Also, do you still observe CPU load during the 15min as an indication that 
it is actually doing something?
e) During this 15min period where apparently nothing happens, can you provide 
the output of "jstack " (with the PID of your JobManager)?
f) You may further be able to debug into what is happening by running this in 
your IDE in debug mode and pause the execution when you suspect it to hang.


Nico

On Tuesday, 14 November 2017 14:27:36 CET Piotr Nowojski wrote:
> 3. Nico, can you take a look at this one? Isn’t this a blob server issue?
> 
> Piotrek
> 
> > On 14 Nov 2017, at 11:35, Shailesh Jain 
> > wrote:
> > 
> > 3. Have attached the logs and exception raised (15min - configured akka
> > timeout) after submitting the job.
> > 
> > Thanks,
> > Shailesh
> > 
> > 
> > On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski  > > wrote: Hi,
> > 
> > 3. Can you show the logs from job manager and task manager?
> > 
> >> On 14 Nov 2017, at 07:26, Shailesh Jain  >> > wrote:
> >> 
> >> Hi Piotrek,
> >> 
> >> I tried out option 'a' mentioned above, but instead of separate jobs, I'm
> >> creating separate streams per device. Following is the test deployment
> >> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):
> >> 
> >> akka.client.timeout 15 min
> >> jobmanager.heap.mb 1024
> >> jobmanager.rpc.address localhost
> >> jobmanager.rpc.port 6123
> >> jobmanager.web.port 8081
> >> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
> >> metrics.reporter.jmx.port 8789
> >> metrics.reporters jmx
> >> parallelism.default 1
> >> taskmanager.heap.mb 1024
> >> taskmanager.memory.preallocate false
> >> taskmanager.numberOfTaskSlots 4
> >> 
> >> The number of Operators per device stream is 4 (one sink function, 3 CEP
> >> operators).
> >> 
> >> Observations (and questions):
> >> 
> >> 3. Job deployment hangs (never switches to RUNNING) when the number of
> >> devices is greater than 5. Even on increasing the akka client timeout,
> >> it does not help. Will separate jobs being deployed per device instead
> >> of separate streams help here?
> >> 
> >> Thanks,
> >> Shailesh

signature.asc
Description: This is a digitally signed message part.


Re: Getting java.lang.ClassNotFoundException: for protobuf generated class

2017-11-21 Thread Nico Kruber
Hi Shankara,
sorry for the late response, but honestly, I cannot think of a reason that 
some of your program's classes (using only a single jar file) are found some 
others are not, except for the class not being in the jar.

Or there's some class loader issue in the Flink Beam runner (which I find 
unlikely though) - maybe Aljoscha (cc'd) can elaborate a bit more on the Beam 
side and has some other idea.


Nico


On Tuesday, 14 November 2017 14:54:28 CET Shankara wrote:
> Hi Nico,
> 
> 
> - how do you run the job?
> 
>>> If we run same program in flink local then it works fine. For
> 
> flink local we used command line
>   mvn package exec:java
> -Dexec.mainClass=com.huawei.ccn.intelliom.ims.tmon.TMon
> -Dexec.args="--threshold=Measurment:0:4001:1:90:85:CPU
> --broker=192.168.56.1:9092" -Pflink-runner
> 
>When we use flink cluster and submit jar using web UI then we are
> getting exception. like below
>  169/image953.png>
> 
>  Exception :
> 
>  169/image_%281%29.png>
> 
> - how do you add/include the jar with the missing class?
> 
>>> We are generating the linked jar using the maven-jar-plugin. And
> 
> in the bundled jar all the protobuf generated class exist. There is no
> missing class.
> 
> - is that jar file part of your program's jar or separate?
> 
>>> since we are using the jar-plugin, the protobuf jar is also part
> 
> of the generated jar.
> 
> - is the missing class, i.e. "com.huawei.ccn.intelliom.ims.MeasurementTable
> $measurementTable" (an inner class starting in lower-case?), really in the
> jar
> file? It might be a wrongly generated protobuf class ...
> 
>>> sub Class is exit in Protobuf generated class. Please find the
> 
> attached class.
> 
>  169/Selection_028.png>
> 
> Thanks,
> Shankara
> 
> 
> 
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: This is a digitally signed message part.


Re: Getting java.lang.ClassNotFoundException: for protobuf generated class

2017-11-22 Thread Nico Kruber
But wouldn't a failed dependency show another ClassNotFoundException?

On Tuesday, 21 November 2017 20:31:58 CET Gordon Weakliem wrote:
> Isn't one cause for ClassNotFoundException that the class can't load due to
> failed dependencies or a failure in a static constructor?
> 
> If jar -tf target/program.jar | grep MeasurementTable shows the class is
> present, are there other dependencies missing? You may need to add runtime
> dependencies into your pom or gradle.build file.
> 
> On Tue, Nov 21, 2017 at 2:28 AM, Nico Kruber  wrote:
> > Hi Shankara,
> > sorry for the late response, but honestly, I cannot think of a reason that
> > some of your program's classes (using only a single jar file) are found
> > some
> > others are not, except for the class not being in the jar.
> > 
> > Or there's some class loader issue in the Flink Beam runner (which I find
> > unlikely though) - maybe Aljoscha (cc'd) can elaborate a bit more on the
> > Beam
> > side and has some other idea.
> > 
> > 
> > Nico
> > 
> > On Tuesday, 14 November 2017 14:54:28 CET Shankara wrote:
> > > Hi Nico,
> > > 
> > > 
> > > - how do you run the job?
> > > 
> > >>> If we run same program in flink local then it works fine. For
> > > 
> > > flink local we used command line
> > > 
> > >   mvn package exec:java
> > > 
> > > -Dexec.mainClass=com.huawei.ccn.intelliom.ims.tmon.TMon
> > > -Dexec.args="--threshold=Measurment:0:4001:1:90:85:CPU
> > > --broker=192.168.56.1:9092" -Pflink-runner
> > > 
> > >When we use flink cluster and submit jar using web UI then we are
> > > 
> > > getting exception. like below
> > > <http://apache-flink-user-mailing-list-archive.2336050.
> > 
> > n4.nabble.com/file/t1
> > 
> > > 169/image953.png>
> > > 
> > >  Exception :
> > > <http://apache-flink-user-mailing-list-archive.2336050.
> > 
> > n4.nabble.com/file/t1
> > 
> > > 169/image_%281%29.png>
> > > 
> > > - how do you add/include the jar with the missing class?
> > > 
> > >>> We are generating the linked jar using the maven-jar-plugin.
> > 
> > And
> > 
> > > in the bundled jar all the protobuf generated class exist. There is no
> > > missing class.
> > > 
> > > - is that jar file part of your program's jar or separate?
> > > 
> > >>> since we are using the jar-plugin, the protobuf jar is also
> > 
> > part
> > 
> > > of the generated jar.
> > > 
> > > - is the missing class, i.e. "com.huawei.ccn.intelliom.ims.
> > 
> > MeasurementTable
> > 
> > > $measurementTable" (an inner class starting in lower-case?), really in
> > 
> > the
> > 
> > > jar
> > > file? It might be a wrongly generated protobuf class ...
> > > 
> > >>> sub Class is exit in Protobuf generated class. Please find the
> > > 
> > > attached class.
> > > 
> > > <http://apache-flink-user-mailing-list-archive.2336050.
> > 
> > n4.nabble.com/file/t1
> > 
> > > 169/Selection_028.png>
> > > 
> > > Thanks,
> > > Shankara
> > > 
> > > 
> > > 
> > > --
> > > Sent from:
> > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: This is a digitally signed message part.


Re: Correlation between data streams/operators and threads

2017-11-22 Thread Nico Kruber
Hi Shailesh,
your JobManager log suggests that this same JVM instance actually contains a 
TaskManager as well (sorry for not noticing earlier). Also this time, there is 
nothing regarding the BlobServer/BlobCache, but it looks like the task manager 
may think the jobmanager is down.
Can you try with "start-cluster.sh" instead?

Nico

On Tuesday, 21 November 2017 07:26:09 CET Shailesh Jain wrote:
> a) Nope, there are no taskmanager logs, the job never switches to RUNNING
> state.
> 
> b) I think so, because even when I start the job with 4 devices, only 1
> slot is used, and 3 are free.
> 
> c) Attached
> 
> d) Attached
> 
> e) I'll try the debug mode in Eclipse.
> 
> Thanks,
> Shailesh
> 
> On Fri, Nov 17, 2017 at 1:52 PM, Nico Kruber  wrote:
> > regarding 3.
> > a) The taskmanager logs are missing, are there any?
> > b) Also, the JobManager logs say you have 4 slots available in total - is
> > this
> > enough for your 5 devices scenario?
> > c) The JobManager log, however, does not really reveal what it is
> > currently
> > doing, can you set the log level to DEBUG to see more?
> > d) Also, do you still observe CPU load during the 15min as an indication
> > that
> > it is actually doing something?
> > e) During this 15min period where apparently nothing happens, can you
> > provide
> > the output of "jstack " (with the PID of your JobManager)?
> > f) You may further be able to debug into what is happening by running this
> > in
> > your IDE in debug mode and pause the execution when you suspect it to
> > hang.
> > 
> > 
> > Nico
> > 
> > On Tuesday, 14 November 2017 14:27:36 CET Piotr Nowojski wrote:
> > > 3. Nico, can you take a look at this one? Isn’t this a blob server
> > > issue?
> > > 
> > > Piotrek
> > > 
> > > > On 14 Nov 2017, at 11:35, Shailesh Jain 
> > > > wrote:
> > > > 
> > > > 3. Have attached the logs and exception raised (15min - configured
> > > > akka
> > > > timeout) after submitting the job.
> > > > 
> > > > Thanks,
> > > > Shailesh
> > > > 
> > > > 
> > > > On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <
> > 
> > pi...@data-artisans.com
> > 
> > > > <mailto:pi...@data-artisans.com>> wrote: Hi,
> > > > 
> > > > 3. Can you show the logs from job manager and task manager?
> > > > 
> > > >> On 14 Nov 2017, at 07:26, Shailesh Jain  > > >> <mailto:shailesh.j...@stellapps.com>> wrote:
> > > >> 
> > > >> Hi Piotrek,
> > > >> 
> > > >> I tried out option 'a' mentioned above, but instead of separate jobs,
> > 
> > I'm
> > 
> > > >> creating separate streams per device. Following is the test
> > > >> deployment
> > > >> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu
> > 
> > machine):
> > > >> akka.client.timeout 15 min
> > > >> jobmanager.heap.mb 1024
> > > >> jobmanager.rpc.address localhost
> > > >> jobmanager.rpc.port 6123
> > > >> jobmanager.web.port 8081
> > > >> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
> > > >> metrics.reporter.jmx.port 8789
> > > >> metrics.reporters jmx
> > > >> parallelism.default 1
> > > >> taskmanager.heap.mb 1024
> > > >> taskmanager.memory.preallocate false
> > > >> taskmanager.numberOfTaskSlots 4
> > > >> 
> > > >> The number of Operators per device stream is 4 (one sink function, 3
> > 
> > CEP
> > 
> > > >> operators).
> > > >> 
> > > >> Observations (and questions):
> > > >> 
> > > >> 3. Job deployment hangs (never switches to RUNNING) when the number
> > > >> of
> > > >> devices is greater than 5. Even on increasing the akka client
> > > >> timeout,
> > > >> it does not help. Will separate jobs being deployed per device
> > > >> instead
> > > >> of separate streams help here?
> > > >> 
> > > >> Thanks,
> > > >> Shailesh



signature.asc
Description: This is a digitally signed message part.


Re: Flink 1.2.0->1.3.2 TaskManager reporting to JobManager

2017-11-28 Thread Nico Kruber
Hi Regina,
can you explain a bit more on what you are trying to do and how this is
set up? I quickly tried to reproduce locally by starting a cluster and
could not see this behaviour.

Also, can you try to increase the loglevel to INFO and see whether you
see anything suspicious in the logs?


Nico

On 28/11/17 00:19, Chan, Regina wrote:
> Hi,
> 
>  
> 
> As I moved from Flink 1.2.0 to 1.3.2 I noticed that the TaskManager may
> have all tasks with FINISHED but then take about 2-3 minutes before the
> Job execution switches to FINISHED. What is it doing that’s taking this
> long? This was a parallelism = 1 case…
> 
>  
> 
> *Regina Chan*
> 
> *Goldman Sachs–*Enterprise Platforms, Data Architecture
> 
> *30 Hudson Street, 37th floor | Jersey City, NY 07302*(  (212) 902-5697**
> 
>  
> 



signature.asc
Description: OpenPGP digital signature


Re: Checkpoint expired before completing

2017-12-01 Thread Nico Kruber
Hi Steven,
by default, checkpoints time out after 10 minutes if you haven't used
CheckpointConfig#setCheckpointTimeout() to change this timeout.

Depending on your checkpoint interval, and your number of concurrent
checkpoints, there may already be some other checkpoint processes
running while you are waiting for the first to finish. In that case,
succeeding checkpoints may also fail with a timeout. However, they
should definitely get back to normal once your sink has caught up with
all buffered events.

I included Stefan who may shed some more light onto it, but maybe you
can help us identifying the problem by providing logs at DEBUG level
(did akka report any connection loss and gated actors? or maybe some
other error in there?) or even a minimal program to reproduce.


Nico

On 01/12/17 07:36, Steven Wu wrote:
> 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
> 9353 expired before completing
> 
> I might know why this happened in the first place. Our sink operator
> does synchronous HTTP post, which had a 15-mint latency spike when this
> all started. This could block flink threads and prevent checkpoint from
> completing in time. But I don't understand why checkpoint continued to
> fail after HTTP post latency returned to normal. there seems to be some
> lingering/cascading effect of previous failed checkpoints on future
> checkpoints. Only after I redeploy/restart the job an hour later,
> checkpoint starts to work again.
> 
> Would appreciate any suggestions/insights!
> 
> Thanks,
> Steven



signature.asc
Description: OpenPGP digital signature


Re: Blob server not working with 1.4.0.RC2

2017-12-04 Thread Nico Kruber
Hi Bernd,
thanks for the report. I tried to reproduce it locally but both a telnet
connection to the BlobServer as well as the BLOB download by the
TaskManagers work for me. Can you share your configuration that is
causing the problem? You could also try increasing the log level to
DEBUG and see if there is something more in the logs (the exception
thrown in StaticFileServerHandler looks suspicious but is not related to
the BlobServer).
Apparently, the TaskManager resolves flink-jobmanager to 10.104.5.130.
Is that the correct address and can the TaskManager talk to this IP?
(may a firewall block this?)

Did you, by any chance, set up SSL, too? There was a recent thread on
the mailing list [1] where a had some problems with
"security.ssl.verify-hostname" being set to true which may be related.


Nico

[1]
https://lists.apache.org/thread.html/879d072bfd6761947b4bd703324489db50e8b14c328992118af875d8@%3Cuser.flink.apache.org%3E

On 04/12/17 10:03, bernd.winterst...@dev.helaba.de wrote:
> Hi
> Since we switched to Release 1.4 the taskmanagers are unable to download
> blobs from the jobmanager.
> The taskmanager registration still works.
> Netstat on jobmanager shows open ports at 6123 and 5. But a telnet
> connection from taskmanager to jobmanager on port 5 times out.
>  
> Any ideas are welcome.
>  
> Regards
>  
> Bernd
>  
> Jobmanager log:
>  
> 2017-12-04 08:48:30,167 INFO 
> org.apache.flink.runtime.jobmanager.JobManager    - Starting
> JobManager actor
> 2017-12-04 08:48:30,197 INFO 
> org.apache.flink.runtime.blob.BlobServer  - Created
> BLOB server storage directory
> /tmp/blobStore-81cd12c7-394e-4777-85a1-98389b72dd08
> 2017-12-04 08:48:30,205 INFO 
> org.apache.flink.runtime.blob.BlobServer  - Started
> BLOB server at 0.0.0.0:5 - max concurrent requests: 50 - max
> backlog: 1000
> 2017-12-04 08:48:30,608 INFO 
> org.apache.flink.runtime.jobmanager.JobManager    - Starting
> JobManager at akka.tcp://flink@flink-jobmanager:6123/user/jobmanager.
> 2017-12-04 08:48:30,628 INFO 
> org.apache.flink.runtime.jobmanager.MemoryArchivist   - Started
> memory archivist akka://flink/user/archive
> 2017-12-04 08:48:30,676 INFO 
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  
> - Trying to associate with JobManager leader
> akka.tcp://flink@flink-jobmanager:6123/user/jobmanager
> 2017-12-04 08:48:30,692 INFO 
> org.apache.flink.runtime.jobmanager.JobManager    -
> JobManager akka.tcp://flink@flink-jobmanager:6123/user/jobmanager was
> granted leadership with leader session ID
> Some(----).
> 2017-12-04 08:48:30,700 INFO 
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  
> - Resource Manager associating with leading JobManager
> Actor[akka://flink/user/jobmanager#886586058] - leader session
> ----
> 2017-12-04 08:53:50,635 INFO 
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  
> - TaskManager 627338086a766c140909ba45f2e717d0 has started.
> 2017-12-04 08:53:50,638 INFO 
> org.apache.flink.runtime.instance.InstanceManager -
> Registered TaskManager at flink-taskmanager-65cf757d9b-hj65d
> (akka.tcp://flink@flink-taskmanager-65cf757d9b-hj65d:45932/user/taskmanager)
> as f9d2843d0223b15d8fce52aea8231cc6. Current number of registered hosts
> is 1. Current number of alive task slots is 8.
> 2017-12-04 08:53:50,658 WARN 
> akka.serialization.Serialization(akka://flink)    - Using
> the default Java serializer for class
> [org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage]
> which is not recommended because of performance implications. Use
> another serializer or disable this warning using the setting
> 'akka.actor.warn-about-java-serializer-usage'
> 2017-12-04 08:53:55,714 INFO 
> org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  
> - TaskManager 08c3e6f7c765e2ab88e2ea645049cb9d has started.
> 2017-12-04 08:53:55,714 INFO 
> org.apache.flink.runtime.instance.InstanceManager -
> Registered TaskManager at flink-taskmanager-65cf757d9b-jtzw5
> (akka.tcp://flink@flink-taskmanager-65cf757d9b-jtzw5:41710/user/taskmanager)
> as da8a8da3650ce53f460784c54938a071. Current number of registered hosts
> is 2. Current number of alive task slots is 16.
> 2017-12-04 09:04:08,850 ERROR
> org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler 
> - Caught exception
> java.io.IOException: Operation timed out
>     at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>     at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>  
>  
> Taskmanager log:
> 2017-12-

Re: Checkpoint expired before completing

2017-12-04 Thread Nico Kruber
Although there may be no checkpoints in flight with this configuration,
there are most certainly records floating around in various buffers
which filled up during your sink pausing everything. Those records need
to be processed first before the new chackpoint's checkpoint barrier may
make it through (also see [1] for details on how the checkpointing works).

So yes, your second assumption is true and that was what I meant by "get
back to normal once your sink has caught up with all buffered events" in
my first message and I assume Stephan also meant with the cascading effect.


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html

On 02/12/17 23:30, Steven Wu wrote:
> One more question. Since I have set the "Maximum Concurrent Checkpoints"
> to 1. Will cascading effect still be true? 
> 
> Whenever my sink operator returns to normal (in terms of latency), new
> checkpoint after this point should work, right? there are no other
> in-flight/concurrent checkpoints still in progress.
> 
> Or is the min pause just allowing Flink to catch up in-flight msgs in
> various queues/buffers? is that the cascading impact?
> 
> On Sat, Dec 2, 2017 at 2:10 PM, Steven Wu  <mailto:stevenz...@gmail.com>> wrote:
> 
> Stephan, thanks a lot for the explanation. Now everything makes
> sense to me. Will set the min pause.
> 
> On Sat, Dec 2, 2017 at 8:58 AM, Stephan Ewen  <mailto:se...@apache.org>> wrote:
> 
> Hi Steven!
> 
> You are right, there could be some cascading effect from
> previous checkpoints.
> I think the best way to handle that is to set the "minimum pause
> between checkpoints". In fact, I would actually recommend this
> over the checkpoint interval parameter.
> 
> The pause will allow the job to handle such effects that built
> up during an unhealthy checkpoint. You can for example set the
> checkpoint interval to 2 mins and set the pause to 1.5 mins.
> That way, if a checkpoint takes longer than usual, the next one
> will still wait for 1.5 mins after the previous one completed or
> expired, giving the job time to catch up.
> 
> Best,
> Stephan
> 
> 
> On Fri, Dec 1, 2017 at 10:10 PM, Steven Wu  <mailto:stevenz...@gmail.com>> wrote:
> 
> Here is the checkpoint config. no concurrent checkpoints
> with 2 minute checkpoint interval and timeout.
> 
>     Problem is gone after redeployment. I will try if I can
> reproduce the issue
> 
> Inline image 1
> 
> On Fri, Dec 1, 2017 at 6:17 AM, Nico Kruber
> mailto:n...@data-artisans.com>> wrote:
> 
> Hi Steven,
> by default, checkpoints time out after 10 minutes if you
> haven't used
> CheckpointConfig#setCheckpointTimeout() to change this
> timeout.
> 
> Depending on your checkpoint interval, and your number
> of concurrent
> checkpoints, there may already be some other checkpoint
> processes
> running while you are waiting for the first to finish.
> In that case,
> succeeding checkpoints may also fail with a timeout.
> However, they
> should definitely get back to normal once your sink has
> caught up with
> all buffered events.
> 
> I included Stefan who may shed some more light onto it,
> but maybe you
> can help us identifying the problem by providing logs at
> DEBUG level
> (did akka report any connection loss and gated actors?
> or maybe some
> other error in there?) or even a minimal program to
> reproduce.
> 
> 
> Nico
> 
> On 01/12/17 07:36, Steven Wu wrote:
> >
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Checkpoint
> > 9353 expired before completing
> >
> > I might know why this happened in the first place. Our
> sink operator
> > does synchronous HTTP post, which had a 15-mint
> latency spike when this
> > all started. This could block flink threads and
> prevent checkpoint from
> >

Re: AW: Blob server not working with 1.4.0.RC2

2017-12-06 Thread Nico Kruber
Hi Bernd,
at least from our side I don't see a change in the default BlobServer ports 
between 1.3 and 1.4 - without configuration, the OS chooses the port.
If you want to influence the range it is chosen from (or want to fix a 
specific port), you need to set the blob.server.port configuration parameter 
in Flink's flink-conf.yaml.


Regards
Nico

On Wednesday, 6 December 2017 08:45:19 CET bernd.winterst...@dev.helaba.de 
wrote:
> Hi Nico
> I think there were changes in the default port fort the BLOB server. I
> missed the fact that the Kubernetes configuration was still exposing 6124
> for the JobManager BLOB server. Thanks
> 
> Bernd
> 
> -Ursprüngliche Nachricht-
> Von: Nico Kruber [mailto:n...@data-artisans.com]
> Gesendet: Montag, 4. Dezember 2017 14:17
> An: Winterstein, Bernd; user@flink.apache.org
> Betreff: Re: Blob server not working with 1.4.0.RC2
> 
> Hi Bernd,
> thanks for the report. I tried to reproduce it locally but both a telnet
> connection to the BlobServer as well as the BLOB download by the
> TaskManagers work for me. Can you share your configuration that is causing
> the problem? You could also try increasing the log level to DEBUG and see
> if there is something more in the logs (the exception thrown in
> StaticFileServerHandler looks suspicious but is not related to the
> BlobServer). Apparently, the TaskManager resolves flink-jobmanager to
> 10.104.5.130. Is that the correct address and can the TaskManager talk to
> this IP? (may a firewall block this?)
> 
> Did you, by any chance, set up SSL, too? There was a recent thread on the
> mailing list [1] where a had some problems with
> "security.ssl.verify-hostname" being set to true which may be related.
> 
> 
> Nico
> 
> [1]
> https://lists.apache.org/thread.html/879d072bfd6761947b4bd703324489db50e8b14
> c328992118af875d8@%3Cuser.flink.apache.org%3E
> On 04/12/17 10:03, bernd.winterst...@dev.helaba.de wrote:
> > Hi
> > Since we switched to Release 1.4 the taskmanagers are unable to
> > download blobs from the jobmanager.
> > The taskmanager registration still works.
> > Netstat on jobmanager shows open ports at 6123 and 5. But a telnet
> > connection from taskmanager to jobmanager on port 5 times out.
> > 
> > Any ideas are welcome.
> > 
> > Regards
> > 
> > Bernd
> > 
> > Jobmanager log:
> > 
> > 2017-12-04 08:48:30,167 INFO
> > org.apache.flink.runtime.jobmanager.JobManager-
> > Starting JobManager actor
> > 2017-12-04 08:48:30,197 INFO
> > org.apache.flink.runtime.blob.BlobServer  -
> > Created BLOB server storage directory
> > /tmp/blobStore-81cd12c7-394e-4777-85a1-98389b72dd08
> > 2017-12-04 08:48:30,205 INFO
> > org.apache.flink.runtime.blob.BlobServer  -
> > Started BLOB server at 0.0.0.0:5 - max concurrent requests: 50 -
> > max
> > backlog: 1000
> > 2017-12-04 08:48:30,608 INFO
> > org.apache.flink.runtime.jobmanager.JobManager-
> > Starting JobManager at
> > akka.tcp://flink@flink-jobmanager:6123/user/jobmanager. 2017-12-04
> > 08:48:30,628 INFO
> > org.apache.flink.runtime.jobmanager.MemoryArchivist   -
> > Started memory archivist akka://flink/user/archive
> > 2017-12-04 08:48:30,676 INFO
> > org.apache.flink.runtime.clusterframework.standalone.StandaloneResourc
> > eManager
> > - Trying to associate with JobManager leader
> > akka.tcp://flink@flink-jobmanager:6123/user/jobmanager
> > 2017-12-04 08:48:30,692 INFO
> > org.apache.flink.runtime.jobmanager.JobManager-
> > JobManager akka.tcp://flink@flink-jobmanager:6123/user/jobmanager was
> > granted leadership with leader session ID
> > Some(----).
> > 2017-12-04 08:48:30,700 INFO
> > org.apache.flink.runtime.clusterframework.standalone.StandaloneResourc
> > eManager
> > - Resource Manager associating with leading JobManager
> > Actor[akka://flink/user/jobmanager#886586058] - leader session
> > ----
> > 2017-12-04 08:53:50,635 INFO
> > org.apache.flink.runtime.clusterframework.standalone.StandaloneResourc
> > eManager
> > - TaskManager 627338086a766c140909ba45f2e717d0 has started.
> > 2017-12-04 08:53:50,638 INFO
> > org.apache.flink.runtime.instance.InstanceManager -
> > Registered TaskManager at flink-taskmanager-65cf757d9b-hj65d
> > (akka.tcp://flink@flink-taskmanager-65cf757d9b-hj65d:45932/user/taskma
> > nager) as f9d2843d0223b15d8fce52aea8231cc6. Current number of
> > registered hosts is 1. Cur

Re: ProgramInvocationException: Could not upload the jar files to the job manager / No space left on device

2017-12-14 Thread Nico Kruber
Hi Regina,
judging from the exception you posted, this is not about storing the
file in HDFS, but a step before that where the BlobServer first puts the
incoming file into its local file system in the directory given by the
`blob.storage.directory` configuration property. If this property is not
set or empty, it will fall back to `java.io.tmpdir`. The BlobServer
creates a subdirectory `blobStore-` and put incoming files into
`/blobStore-/incoming` with file names
`temp-12345678` (using an atomic file counter). It seems that there is
no space left in the filesystem of this directory.

If you set the log level to INFO, you should see a message like "Created
BLOB server storage directory ..." with the path. Can you double check
whether there is really no space left there?


Nico

On 12/12/17 08:02, Chan, Regina wrote:
> And if it helps, I’m running on flink 1.2.1. I saw this ticket:
> https://issues.apache.org/jira/browse/FLINK-5828 It only started
> happening when I was running all 50 flows at the same time. However, it
> looks like it’s not an issue with creating the cache directory but with
> running out of space there? But what’s in there is also tiny.
> 
>  
> 
> bash-4.1$ hdfs dfs -du -h
> hdfs://d191291/user/delp/.flink/application_1510733430616_2098853
> 
> 1.1 K   
> hdfs://d191291/user/delp/.flink/application_1510733430616_2098853/5c71e4b6-2567-4d34-98dc-73b29c502736-taskmanager-conf.yaml
> 
> 1.4 K   
> hdfs://d191291/user/delp/.flink/application_1510733430616_2098853/flink-conf.yaml
> 
> 93.5 M  
> hdfs://d191291/user/delp/.flink/application_1510733430616_2098853/flink-dist_2.10-1.2.1.jar
> 
> 264.8 M 
> hdfs://d191291/user/delp/.flink/application_1510733430616_2098853/lib
> 
> 1.9 K   
> hdfs://d191291/user/delp/.flink/application_1510733430616_2098853/log4j.properties
> 
>  
> 
>  
> 
> *From:*Chan, Regina [Tech]
> *Sent:* Tuesday, December 12, 2017 1:56 AM
> *To:* 'user@flink.apache.org'
> *Subject:* ProgramInvocationException: Could not upload the jar files to
> the job manager / No space left on device
> 
>  
> 
> Hi,
> 
>  
> 
> I’m currently submitting 50 separate jobs to a 50TM, 1 slot set up. Each
> job has 1 parallelism. There’s plenty of space left in my cluster and on
> that node. It’s not clear to me what’s happening. Any pointers?
> 
>  
> 
> On the client side, when I try to execute, I see the following:
> 
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Could not upload the jar files to the job manager.
> 
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> 
>     at
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
> 
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
> 
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387)
> 
>     at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> 
>     at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
> 
>     at
> com.gs.ep.da.lake.refinerlib.flink.FlowData.execute(FlowData.java:143)
> 
>     at
> com.gs.ep.da.lake.refinerlib.flink.FlowData.flowPartialIngestionHalf(FlowData.java:107)
> 
>     at
> com.gs.ep.da.lake.refinerlib.flink.FlowData.call(FlowData.java:72)
> 
>     at
> com.gs.ep.da.lake.refinerlib.flink.FlowData.call(FlowData.java:39)
> 
>     at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> 
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> 
>     at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> 
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 
>     at java.lang.Thread.run(Thread.java:745)
> 
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Could
> not upload the jar files to the job manager.
> 
>     at
> org.apache.flink.runtime.client.JobSubmissionClientActor$1.call(JobSubmissionClientActor.java:150)
> 
>     at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:95)
> 
>     at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 
>     at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 
>     at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 
>     at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 
>     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 
>     at
> scala.concurrent.forkj

Re: Flink 1.4.0 can not override JAVA_HOME for single-job deployment on YARN

2017-12-14 Thread Nico Kruber
Hi,
are you running Flink in an JRE >= 8? We dropped Java 7 support for
Flink 1.4.


Nico

On 14/12/17 12:35, 杨光 wrote:
> Hi,
> I am usring flink single-job mode on YARN. After i upgrade flink
> verson from 1.3.2 to  1.4.0, the parameter
> "yarn.taskmanager.env.JAVA_HOME" doesn’t work  as before.
> I can only found error log on yarn like this:
> 
> Exception in thread "main" java.lang.UnsupportedClassVersionError:
> org/apache/flink/yarn/YarnApplicationMasterRunner : Unsupported
> major.minor version 52.0
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:482)
> 
> Is there something different i should know  to avoid this problem ?
> Thanks!
> 



signature.asc
Description: OpenPGP digital signature


Re: Flink 1.4 with cassandra-connector: Shading error

2017-12-19 Thread Nico Kruber
Hi Dominik,
nice assessment of the issue: in the version of the cassandra-driver we
use there is even a comment about why:


try {
 // prevent this string from being shaded
 Class.forName(String.format("%s.%s.channel.Channel", "io", "netty"));
 shaded = false;
} catch (ClassNotFoundException e) {
 try {
   Class.forName("com.datastax.shaded.netty.channel.Channel");
   shaded = true;
 } catch (ClassNotFoundException e1) {
   throw new AssertionError("Cannot locate Netty classes in the
classpath:" + e1);
 }
}


@Chesnay: Should we instead shade into datastax' namespace as shown?
This would also make sure to follow the shaded path in that class which,
for example, deactivates epoll.


Nico


On 18/12/17 15:43, Timo Walther wrote:
> Hi Dominik,
> 
> thanks for reporting your issue. I will loop in Chesnay that might know
> more about your problem.
> 
> There were a lot of dependency changes in 1.4 to make the future more
> dependency friendly. Maybe this has not been tested properly.
> 
> Regards,
> Timo
> 
> 
> Am 12/18/17 um 3:07 PM schrieb domi...@dbruhn.de:
>> Hey everyone,
>> I'm trying to migrate one of my jobs to Flink 1.4 and I'm running into
>> a classpath/shading error.
>>
>> What happens is that when Flink calls into Cluster.connect(),
>> somewhere down in the stream, the cassandra library tries to
>> initialize Netty, and I'm getting the following exception:
>>
>> Caused by: java.lang.AssertionError: Cannot locate Netty classes in
>> the classpath:java.lang.ClassNotFoundException:
>> com.datastax.shaded.netty.channel.Channel.
>> Full exception here:
>> https://gist.github.com/anonymous/16a44eabb45ad7f20d551dd29b83d2fb
>>
>> I think this can be explained very easily:
>> 1. The flink-cassandra-connector-1.4 jar contains the Datastax
>> Cassandra driver inside
>> 2. The flink-cassandra-connector-1.4 jar contains a shaded (to
>> org.apache.flink.cassandra.shaded.io.netty) netty
>> 3. The DataStax driver executes the following code when it initializes
>> netty (in NettyUtils starting line 58):
>>
>> 
>>  try {
>>     Class.forName(String.format("%s.%s.channel.Channel", "io",
>> "netty"));
>>     shaded = false;
>>     } catch (ClassNotFoundException var9) {
>>     try {
>> Class.forName("com.datastax.shaded.netty.channel.Channel");
>>     shaded = true;
>>     } catch (ClassNotFoundException var8) {
>>     throw new AssertionError("Cannot locate Netty classes
>> in the classpath:" + var8);
>>     }
>>     }
>> 
>>
>>
>> Neither of the two packages (io.netty.channel and
>> com.datastax.shaded.netty.channel.Channel) are available and the
>> shaded one from Flink is not in the list and is never used here.
>>
>> My question: Did anyone ever use the cassandra connector on 1.4. As it
>> looks to me it is completely broken and can never work. But maybe I'm
>> failing to see something. I don't include netty in my dependencies,
>> that could of course fix it, but I'm suspecting I will run into more
>> dependency problems then.
>>
>> Here are my dependencies:
>> https://gist.github.com/anonymous/565a0ad017976502a62b2919115b31fd
>>
>> What additional information can I provide?
>>
>> Thanks,
>> Dominik
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: S3 Access in eu-central-1

2018-01-02 Thread Nico Kruber
Sorry for the late response,
but I finally got around adding this workaround to our "common issues"
section with PR https://github.com/apache/flink/pull/5231

Nico

On 29/11/17 09:31, Ufuk Celebi wrote:
> Hey Dominik,
> 
> yes, we should definitely add this to the docs.
> 
> @Nico: You recently updated the Flink S3 setup docs. Would you mind
> adding these hints for eu-central-1 from Steve? I think that would be
> super helpful!
> 
> Best,
> 
> Ufuk
> 
> On Tue, Nov 28, 2017 at 10:00 PM, Dominik Bruhn  wrote:
>> Hey Stephan, Hey Steve,
>> that was the right hint, adding that open to the Java-Options fixed the
>> problem. Maybe we should add this somehow to our Flink Wiki?
>>
>> Thanks!
>>
>> Dominik
>>
>> On 28/11/17 11:55, Stephan Ewen wrote:
>>>
>>> Got a pointer from Steve that this is answered on Stack Overflow here:
>>> https://stackoverflow.com/questions/36154484/aws-java-sdk-manually-set-signature-version
>>> 
>>>
>>> Flink 1.4 contains a specially bundled "fs-s3-hadoop" with smaller no
>>> footprint, compatible across Hadoop versions, and based on a later s3a and
>>> AWS sdk. In that connector, it should work out of the box because it uses a
>>> later AWS SDK. You can also use it with earlier Hadoop versions because
>>> dependencies are relocated, so it should not cash/conflict.
>>>
>>>
>>>
>>>
>>> On Mon, Nov 27, 2017 at 8:58 PM, Stephan Ewen >> > wrote:
>>>
>>> Hi!
>>>
>>> The endpoint config entry looks correct.
>>> I was looking at this issue to see if there are pointers to anything
>>> else, but it looks like the explicit endpoint entry is the most
>>> important thing: https://issues.apache.org/jira/browse/HADOOP-13324
>>> 
>>>
>>> I cc-ed Steve Loughran, who is Hadoop's S3 expert (sorry Steve for
>>> pulling you in again - listening and learning still about the subtle
>>> bits and pieces of S3).
>>> @Steve are S3 V4 endpoints supported in Hadoop 2.7.x already, or
>>> only in Hadoop 2.8?
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Mon, Nov 27, 2017 at 9:47 AM, Dominik Bruhn >> > wrote:
>>>
>>> Hey,
>>> can anyone give a hint? Does anyone have flink running with an
>>> S3 Bucket in Frankfurt/eu-central-1 and can share his config and
>>> setup?
>>>
>>> Thanks,
>>> Dominik
>>>
>>> On 22. Nov 2017, at 17:52, domi...@dbruhn.de
>>>  wrote:
>>>
 Hey everyone,
 I'm trying since hours to get Flink 1.3.2 (downloaded for
 hadoop 2.7) to snapshot/checkpoint to an S3 bucket which is
 hosted in the eu-central-1 region. Everything works fine for
 other regions. I'm running my job on a JobTracker in local
 mode. I googled the internet and found several hints, most of
 them telling that setting the `fs.s3a.endpoint` should solve
 it. It doesn't. I'm also sure that the core-site.xml (see
 below) is picked up, if I put garbage into the endpoint then I
 receive a hostname not found error.

 The exception I'm getting is:
 com.amazonaws.services.s3.model.AmazonS3Exception: Status
 Code: 400, AWS Service: Amazon S3, AWS Request ID:
 432415098B0994BC, AWS Error Code: null, AWS Error Message: Bad
 Request, S3 Extended Request ID:

 1PSDe4EOh7zvfNPdWrwoBKKOtsS/gf9atn5movRzcpvIH2WsR+ptXvXyFyEHXjDb3F9AniXgsBQ=

 I read the AWS FAQ but I don't think that

 https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#ioexception-400-bad-request

 
 applies to me as I'm not running the NativeFileSystem.

 I suspect this is related to the v4 signing protocol which is
 required for S3 in Frankfurt. Could it be that the aws-sdk
 version is just too old? I tried to play around with it but
 the hadoop adapter is incompatible with newer versions.

 I have the following core-site.xml:

 
 

 fs.s3.implorg.apache.hadoop.fs.s3a.S3AFileSystem

 fs.s3a.buffer.dir/tmp

 fs.s3a.access.keysomething

 fs.s3a.secret.keywont-tell

 fs.s3a.endpoints3.eu-central-1.amazonaws.com
 
 >>>
 Here is my lib folder with the versions of the aws-sdk and the
 hadoop-aws integration:
 -rw---1 root root   11.4M Mar 20  2014
 aws-java-sdk-1.7.4.jar
 -rw-r--r--1 1005 1006   70.0M Aug  3 12:10
>>>

Re: BackPressure handling

2018-01-02 Thread Nico Kruber
Hi Vishal,
let me already point you towards the JIRA issue for the credit-based
flow control: https://issues.apache.org/jira/browse/FLINK-7282

I'll have a look at the rest of this email thread tomorrow...


Regards,
Nico

On 02/01/18 17:52, Vishal Santoshi wrote:
> Could you please point me to any documentation on the  "credit-based
> flow control" approach
> 
> On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther  > wrote:
> 
> Hi Vishal,
> 
> your assumptions sound reasonable to me. The community is currently
> working on a more fine-grained back pressuring with credit-based
> flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
> Nico that might tell you more about the details. Until then I guess
> you have to implement a custom source/adapt an existing source to
> let the data flow in more realistic.
> 
> Regards,
> Timo
> 
> [1]
> http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-timeline.html
> 
> 
> [2] https://www.youtube.com/watch?v=scStdhz9FHc
> 
> 
> 
> Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
> 
> I did a simulation on session windows ( in 2 modes ) and let it
> rip for about 12 hours
> 
> 1. Replay where a kafka topic with retention of 7 days was the
> source ( earliest )
> 2. Start the pipe with kafka source ( latest )
> 
> I saw results that differed dramatically.
> 
> On replay the pipeline stalled after  good ramp up while in the
> second case the pipeline hummed on without issues. For the same
> time period the data consumed is significantly more in the
> second case with the WM progression stalled in the first case
> with no hint of resolution ( the incoming data on source topic
> far outstrips the WM progression )  I think I know the reasons
> and this is my hypothesis.
> 
> In replay mode the number of windows open do not have an upper
> bound. While buffer exhaustion ( and data in flight with
> watermark )  is the reason for throttle, it does not really
> limit the open windows and in fact creates windows that reflect
> futuristic data ( future is relative to the current WM ) . So if
> partition x has data for watermark time t(x) and partition y for
> watermark time t(y) and t(x) << t(y) where the overall watermark
> is t(x) nothing significantly throttles consumption from the y
> partition ( in fact for x too ) , the bounded buffer based
> approach does not give minute control AFAIK as one would hope
> and that implies there are far more open windows than the system
> can handle and that leads to the pathological case where the
> buffers fill up  ( I believe that happens way late ) and
> throttling occurs but the WM does not proceed and windows that
> could ease the glut the throttling cannot proceed. In the
> replay mode the amount of data implies that the Fetchers keep
> pulling data at the maximum consumption allowed by the open
> ended buffer approach.
> 
> My question thus is, is there any way to have a finer control of
> back pressure, where in the consumption from a source is
> throttled preemptively ( by for example decreasing the buffers
> associated for a pipe or the size allocated ) or sleeps in the
> Fetcher code that can help aligning the performance to have real
> time consumption  characteristics
> 
> Regards,
> 
> Vishal.
> 
> 
> 
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Testing Flink 1.4 unable to write to s3 locally

2018-01-03 Thread Nico Kruber
Hi Kyle,
except for putting the jar into the lib/ folder and setting up
credentials, nothing else should be required [1].

The S3ErrorResponseHandler class itself is in the jar, as you can see with
jar tf flink-s3-fs-presto-1.4.0.jar | grep
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler

Therefore, the cause for this exception would be interesting (as Stephan
suggested).


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended

On 03/01/18 10:22, Stephan Ewen wrote:
> Hi Kyle!
> 
> Is there more of the stack trace available, like an original exception
> cause?
> 
> Best,
> Stephan
> 
> 
> On Sun, Dec 31, 2017 at 5:10 PM, Kyle Hamlin  > wrote:
> 
> Hi,
> 
> When testing Flink 1.4 locally the error below keeps getting thrown.
> I've followed the setup by moving the flink-s3-fs-presto.jar from
> the opt/ folder to the lib/ folder. Is there something additional I
> need to do?
> 
> java.lang.NoClassDefFoundError: Could not initialize class
> 
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler
> at
> 
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:363)
> at
> 
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:542)
> at
> 
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639)
> at
> 
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212)
> at
> 
> org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132)
> at
> 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> at
> 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.(FsCheckpointStreamFactory.java:99)
> at
> 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)
> at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)
> at
> 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
> at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
> at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
> at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: ElasticSearch Connector for version 6.x and scala 2.11

2018-01-04 Thread Nico Kruber
Actually, Flink's netty dependency (4.0.27) is shaded away into the
"org.apache.flink.shaded.netty4.io.netty" package now (since version
1.4) and should thus not clash anymore.
However, other netty versions may come into play from the job itself or
from the integration of Hadoop's classpath (if available).


Nico

On 01/12/17 14:42, Jens Oberender wrote:
> Hi
> 
> A workmate of mine tried to migrate the existing flink connector to
> ElasticSearch 6 but we had problems with netty dependencies that clashed
> (Flink uses 4.0.27 and ES is on 4.1).
> You can change the flink-connector-elasticsearch5 connector to ES 5.6.4,
> but then you have to do some adaptions to get it working, as they
> changed the API within a major version!
> But with that version you can write to ES 6.
> 
> He put his changes on Github:
> https://github.com/cognitix/flink/tree/connector-elasticsearch6
> It's called flink-connector-elasticsearch6, there.
> 
> We want to write a new and clean version for ElasticSearch 6, but
> currently don't have the time.
> 
> Best regards,
>   Jens Oberender
> 
> 
> Am 01.12.2017 um 09:52 schrieb Fabian Hueske:
>> Hi Rahul,
>>
>> Flink does not provide a connector for ElasticSearch 6 yet.
>> There is this JIRA issue to track the development progress [1].
>>
>> Best, Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8101
>>
>> 2017-12-01 7:22 GMT+01:00 Rahul Raj > >:
>>
>> Hi All,
>>
>> Is there a Flink Elastic search connector for version 6.0 and scala
>> 2.11? I couldn't find it listed
>> here 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/elasticsearch.html
>> 
>> 
>> .
>>
>> Regards,
>> Rahul Raj
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Exception on running an Elasticpipe flink connector

2018-01-04 Thread Nico Kruber
Hi Vipul,
Yes, this looks like a problem with a different netty version being
picked up.

First of all, let me advertise Flink 1.4 for this since there we
properly shade away our netty dependency (on version 4.0.27 atm) so you
(or in this case Elasticsearch) can rely on your required version. Since
you are executing your job inside a Flink cluster, there will be some
more things in the classpath than what your job itself requires, e.g.
core/runtime Flink dependencies and also things from the Hadoop
classpath if present.

Locally in the IDE, only a limited set of libraries are included and the
classpath is set up a bit differently, I suppose. Maybe, you are also
affected by the Maven shading problem for maven >= 3.3 [1][2].
As a workaround, can you try to shade elasticsearch's netty away? See
[3] for details.


Regards
Nico

[1] https://issues.apache.org/jira/browse/FLINK-5013
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/building.html
[3]
https://stackoverflow.com/questions/42962892/flink-with-elasticsearch-5-sink-conflicts-of-io-netty-library

On 04/01/18 07:09, vipul singh wrote:
> Hello,
> 
> We are working on a Flink ES connector, sourcing from a kafka stream,
> and sinking data into elasticsearch. The code works fine in intellij,
> but while running the code on emr(version 5.9, which uses flink 1.3.2)
> using flink-yarn-session, we are seeing this exception
> 
> Using the parallelism provided by the remote cluster (1). To use
> another parallelism, set it at the ./bin/flink client.
> 
> Starting execution of program
> 
> 2018-01-02 23:19:16,217 INFO org.apache.flink.yarn.YarnClusterClient
> - Starting program in interactive mode
> 
> 
> 
> The program finished with the following exception:
> 
> java.lang.NoSuchMethodError:
> 
> io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
> 
> at
> 
> org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
> 
> at
> 
> org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:449)
> 
> at
> 
> org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:91)
> 
> at
> 
> org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:976)
> 
> at
> 
> org.elasticsearch.transport.TcpTransport.sendRequest(TcpTransport.java:958)
> 
> at
> 
> org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:520)
> 
> at
> 
> org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:465)
> 
> at
> 
> org.elasticsearch.transport.TransportService.submitRequest(TransportService.java:451)
> 
> at
> 
> org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:403)
> 
> at
> 
> org.elasticsearch.client.transport.TransportClientNodesService$NodeSampler.sample(TransportClientNodesService.java:338)
> 
> at
> 
> org.elasticsearch.client.transport.TransportClientNodesService.addTransportAddresses(TransportClientNodesService.java:179)
> 
> at
> 
> org.elasticsearch.client.transport.TransportClient.addTransportAddress(TransportClient.java:301)
> 
> On searching online, it seems like this maybe due to netty version
> conflicts.
> However when we ran a dependency tree on our pom, and we dont see netty
> coming from anywhere else but flink:
> https://gist.github.com/neoeahit/b42b435e3c4519e632be87782de1cc06
> 
> Could you please suggest how can we resolve this error,
> 
> Thanks,
> Vipul
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

2018-01-16 Thread Nico Kruber
IMHO, this looks like a bug and it makes sense that you only see this
with an HA setup:

The JobFound message contains the ExecutionGraph which, however, does
not implement the Serializable interface. Without HA, when browsing the
web interface, this message is (probably) not serialized since it is
only served to you via HTML. For HA, this may come from another
JobManager than the Web interface you are browsing.
I'm including Till (cc'd) as he might know more.


Nico

On 16/01/18 09:22, jelmer wrote:
> HI, 
> 
> We recently upgraded our test environment to from flink 1.3.2 to flink
> 1.4.0.
> 
> We are using a high availability setup on the job manager. And now often
> when I go to the job details in the web ui the call will timeout and the
> following error will pop up in the job manager log
> 
> 
> akka.remote.MessageSerializer$SerializationException: Failed to
> serialize remote message [class
> org.apache.flink.runtime.messages.JobManagerMessages$JobFound] using
> serializer [class akka.serialization.JavaSerializer].
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:888)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:780)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:755)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> Caused by: java.io.NotSerializableException:
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> ~[na:1.8.0_131]
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[na:1.8.0_131]
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[na:1.8.0_131]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:47)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> ... 17 common frames omitted
> 
> 
> 
> I isolated it further, and it seems to be triggered by this call
> 
> https://hostname/jobs/28076fffbcf7eab3f17900a54cc7c41d
> 
> I cannot reproduce it on my local lapop without HA setup.
> Before I dig any deeper, has anyone already come across this ?



signature.asc
Description: OpenPGP digital signature


Re: How to get automatic fail over working in Flink

2018-01-16 Thread Nico Kruber
Hi James,
In this scenario, with the restart strategy set, the job should restart
(without YARN/Mesos) as long as you have enough slots available.

Can you check with the web interface on http://:8081/ that
enough slots are available after killing one TaskManager?

Can you provide JobManager and TaskManager logs and some more details on
the job you are running?


Nico

On 16/01/18 07:04, Data Engineer wrote:
> This question has been asked on StackOverflow:
> https://stackoverflow.com/questions/48262080/how-to-get-automatic-fail-over-working-in-flink
> 
> I am using Apache Flink 1.4 on a cluster of 3 machines, out of which one
> is the JobManager and the other 2 host TaskManagers.
> 
> I start flink in cluster mode and submit a flink job. I have configured
> 24 task slots in the flink config, and for the job I use 6 task slots.
> 
> When I submit the job, I see 3 tasks are assigned to Worker machine 1
> and 3 are assigned to Worker machine 2. Now, when I kill the TaskManager
> on WorkerMachine 2, I see that the entire job fails.
> 
> Is this the expected behaviour, or does it have automatic failover as in
> Spark.
> 
> Do we need to use YARN/Mesos to achieve automatic failover?
> 
> We tried the Restart Strategy, but when it restarts we get an exception
> saying that no task slots are available and then the job fails. We think
> that 24 slots is enough to take over. What could we be doing wrong here?
> 
> Regards,
> James



signature.asc
Description: OpenPGP digital signature


Re: logging question

2018-01-16 Thread Nico Kruber
Just a guess, but probably our logging initialisation changes the global
log level (see conf/log4j.properties). DataStream.collect() executes the
program along with creating a local Flink "cluster" (if you are testing
locally / in an IDE) and initializing logging, among other things.

Please comment the first line out and uncomment the following one to
read like this:
==
# This affects logging for both user code and Flink
#log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=INFO
==


Nico

On 13/01/18 13:52, j...@vooght.de wrote:
> Hello,
> I am learning Flink and using the docker image along with the AMIDST
> library for this.
> Below is a sample task from AMIDST which provides INFO output up until I
> reach updateModel(). I pasted the short method as well and wonder what
> prevents the Logger from
> 
>     //Set-up Flink session
>     env = ExecutionEnvironment.getExecutionEnvironment();
>     env.getConfig().disableSysoutLogging();
>     Logger LOG = LoggerFactory.getLogger("> ParallelMLExample");
> 
>     //generate a random dataset
>     DataFlink dataFlink = new
> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
> 
>     //Creates a DAG with the NaiveBayes structure for the random
> dataset
>     DAG dag =
> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
> "DiscreteVar4");
>     LOG.info(dag.toString());
> 
>     //Create the Learner object
>     ParameterLearningAlgorithm learningAlgorithmFlink = new
> ParallelMaximumLikelihood();
> 
>     //Learning parameters
>     learningAlgorithmFlink.setBatchSize(10);
>     learningAlgorithmFlink.setDAG(dag);
> 
>     //Initialize the learning process
>     learningAlgorithmFlink.initLearning();
> 
>     //Learn from the flink data
>     LOG.info("BEFORE UPDATEMODEL");
>     learningAlgorithmFlink.updateModel(dataFlink);
>     LOG.info("AFTER UPDATEMODEL");
> 
>     //Print the learnt Bayes Net
>     BayesianNetwork bn =
> learningAlgorithmFlink.getLearntBayesianNetwork();
>     LOG.info(bn.toString());
> 
> 
> Below is the updateModel method.
> 
>     public double updateModel(DataFlink dataUpdate) {
>     try {
>     Configuration config = new Configuration();
>     config.setString(BN_NAME, this.dag.getName());
>     config.setBytes(EFBN_NAME,
> Serialization.serializeObject(efBayesianNetwork));
> 
>     DataSet dataset = dataUpdate.getDataSet();
>     this.sumSS = dataset.map(new SufficientSatisticsMAP())
>     .withParameters(config)
>     .reduce(new SufficientSatisticsReduce())
>     .collect().get(0);
> 
>     //Add the prior
>     sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
> 
>     JobExecutionResult result =
> dataset.getExecutionEnvironment().getLastJobExecutionResult();
> 
>     numInstances =
> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
> 
>     numInstances++;//Initial counts
> 
>     }catch(Exception ex){
>     throw new UndeclaredThrowableException(ex);
>     }
> 
>     return this.getLogMarginalProbability();
>     }
> 
> 
> Not sure why LOG.info past that method are not output to the console.
> TIA
> JP



signature.asc
Description: OpenPGP digital signature


Re: Parallel stream consumption

2018-01-16 Thread Nico Kruber
Hi Jason,
I'd suggest to start with [1] and [2] for getting the basics of a Flink
program.
The DataStream API basically wires operators together with streams so
that whatever stream gets out of one operator is the input of the next.
By connecting both functions to the same Kafka stream source, your
program results in this:

Kafka --> Function1
  |
  --> Function2

where both functions receive all elements the previous stream offers
(elements are broadcasted). If you want the two functions to work on
different elements, you could add a filter before each function:

DataStream inputStream = ... got a kafka stream
inputStream.filter(...).process( Function1 );
inputStream.filter(...).process( Function2 );

or split the stream (see [3] for available operators).

I'm no expert on Kafka though, so I can't give you an advise on the
performance - I'd suggest to create some small benchmarks for your setup
since this probably depends on the cluster architecture and the
parallelism of the operators and the number of Kafka partitions.
Maybe Gordon (cc'd) can give some more insights.


Regards
Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/concepts/programming-model.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html

On 12/01/18 21:57, Jason Kania wrote:
> Hi,
> 
> I have a question that I have not resolved via the documentation,
> looking in the "Parallel Execution", "Streaming"  and the "Connectors"
> sections. If I retrieve a kafka stream and then call the process
> function against it in parallel, as follows, does it consume in some
> round robin fashion between the two process calls or is each element
> coming out of the kafka connector consumed by both processors in parallel?
> 
> DataStream inputStream = ... got a kafka stream
> inputStream.process( Function1 );
> inputStream.process( Function2 );
> 
> If it possible to consume in parallel by pointing at the single stream,
> is it typically slower or faster than having two kafka streams with
> different group ids?
> 
> If not documented elsewhere, this would be good to cover since it is
> fundamental.
> 
> Thanks,
> 
> Jason



signature.asc
Description: OpenPGP digital signature


Re: Parallel stream consumption

2018-01-16 Thread Nico Kruber
Just found a nice (but old) blog post that explains Flink's integration
with Kafka:
https://data-artisans.com/blog/kafka-flink-a-practical-how-to

I guess, the basics are still valid


Nico

On 16/01/18 11:17, Nico Kruber wrote:
> Hi Jason,
> I'd suggest to start with [1] and [2] for getting the basics of a Flink
> program.
> The DataStream API basically wires operators together with streams so
> that whatever stream gets out of one operator is the input of the next.
> By connecting both functions to the same Kafka stream source, your
> program results in this:
> 
> Kafka --> Function1
>   |
>   --> Function2
> 
> where both functions receive all elements the previous stream offers
> (elements are broadcasted). If you want the two functions to work on
> different elements, you could add a filter before each function:
> 
> DataStream inputStream = ... got a kafka stream
> inputStream.filter(...).process( Function1 );
> inputStream.filter(...).process( Function2 );
> 
> or split the stream (see [3] for available operators).
> 
> I'm no expert on Kafka though, so I can't give you an advise on the
> performance - I'd suggest to create some small benchmarks for your setup
> since this probably depends on the cluster architecture and the
> parallelism of the operators and the number of Kafka partitions.
> Maybe Gordon (cc'd) can give some more insights.
> 
> 
> Regards
> Nico
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/concepts/programming-model.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html
> 
> On 12/01/18 21:57, Jason Kania wrote:
>> Hi,
>>
>> I have a question that I have not resolved via the documentation,
>> looking in the "Parallel Execution", "Streaming"  and the "Connectors"
>> sections. If I retrieve a kafka stream and then call the process
>> function against it in parallel, as follows, does it consume in some
>> round robin fashion between the two process calls or is each element
>> coming out of the kafka connector consumed by both processors in parallel?
>>
>> DataStream inputStream = ... got a kafka stream
>> inputStream.process( Function1 );
>> inputStream.process( Function2 );
>>
>> If it possible to consume in parallel by pointing at the single stream,
>> is it typically slower or faster than having two kafka streams with
>> different group ids?
>>
>> If not documented elsewhere, this would be good to cover since it is
>> fundamental.
>>
>> Thanks,
>>
>> Jason
> 



signature.asc
Description: OpenPGP digital signature


Re: Unrecoverable job failure after Json parse error?

2018-01-16 Thread Nico Kruber
Hi Adrian,
couldn't you solve this by providing your own DeserializationSchema [1],
possibly extending from JSONKeyValueDeserializationSchema and catching
the error there?


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema

On 12/01/18 18:26, Adrian Vasiliu wrote:
> Hello,
> 
> When using FlinkKafkaConsumer011 with JSONKeyValueDeserializationSchema,
> if an invalid, non-parsable message is sent to the Kafka topic, the
> consumer expectedly fails with JsonParseException. So far so good, but
> this leads to the following loop: the job switches to FAILED
> then attempts to restart and fails again, and so on. That is, the
> parsing error leads to the Kafka message not being committed, hence it
> keeps being received. 
> Since the JsonParseException can't be catched in application code, what
> would be the recommended way to handle the case of possibly
> non-parseable Kafka messages?
>  
> Is there is a way to configure the Flink Kafka consumer to treat the
> case of non-parseable messages by logging the parsing error then
> committing the message such that the processing can continue? Is there
> isn't, would such an enhancement make sense?
> 
> Unless there is a better solution, it looks as a requirement to
> guarantee that FlinkKafkaConsumer011 only receives valid messages, which
> can be annoying in practice.
> 
> For reference, here's the stack of the JsonParseException in the log:
> 
> Source: Custom Source(1/1) switched to FAILED
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
> Unexpected character (':' (code 58)): Expected space separating
> root-level values
> at [Source: UNKNOWN; line: 1, column: 3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890)
> at
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55)
> at
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> 
> My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac.
> 
> Thanks,
> Adrian
> Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
> Compagnie IBM France
> Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
> RCS Nanterre 552 118 465
> Forme Sociale : S.A.S.
> Capital Social : 657.364.587 €
> SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A



signature.asc
Description: OpenPGP digital signature


Re: Low throughput when trying to send data with Sockets

2018-01-16 Thread Nico Kruber
Hi George,
I suspect issuing a read operation for every 68 bytes incurs too much
overhead to perform as you would like it to. Instead, create a bigger
buffer (64k?) and extract single events from sub-regions of this buffer
instead.
Please note, however, that then the first buffer will only be processed
when this method returns (the details depend on the underlying channel
[1]). This is a trade-off between latency and throughput at some point.
If you set non-blocking mode for your channels, you will always get what
the channel has available and continue immediately. You can set this up
via this, for example:

==
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://jenkov.com";, 80));

while(! socketChannel.finishConnect() ){
//wait, or do something else...
}
==


Nico

[1]
https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer)

On 15/01/18 13:19, George Theodorakis wrote:
> Hello,
> 
> I am trying to separate the logic of my application by generating and
> processing data in different physical engines. 
> 
> I have created my custom socket source class:
> 
> class SocketSourceFunction extends SourceFunction[Event2]{
>       @volatile private var isRunning:Boolean = true;
>       @transient private var serverSocket: ServerSocketChannel = null; 
> 
>       override def run(ctx: SourceContext[Event2]) = {
>   val hostname = "localhost"
>   val port = 6667
>           println("listening:" + port)                
>   val server = ServerSocketChannel.open();
>   server.bind(new InetSocketAddress (hostname, port));    
>   var buffer = ByteBuffer.allocate (68);
>   val des = new EventDeSerializer2()
>       
>   while (isRunning) {
>             println("waiting...")            
>             var socketChannel = server.accept();
> 
>      if (socketChannel != null){
>                println("accept:" + socketChannel)
>                while (true) {
>     var bytes = 0;
>     bytes = socketChannel.read(buffer)
>     if( bytes > 0) {
>     if (!buffer.hasRemaining()) {
>     buffer.rewind()
>     var event: Event2 = des.deserialize(buffer.array())
>     ctx.collect(event)
>     buffer.clear()
>     }
>     }
>                      }
> }
>           }          
>       }
> 
>       override def cancel() = {
>         isRunning = false;
>         val socket = this.serverSocket; 
>         if (socket != null) { 
>           try { 
>             socket.close(); 
>            }catch { case e: Throwable => {  
>              System.err.println(String.format("error: %s", e.getMessage()));
>         e.printStackTrace();
>         System.exit(1);
>              }
>            }
>          } 
>       }
> }
> 
> I am sending data with either raw sockets using ByteBuffers or with a
> Flink generator (serializing my Events and using writeToSocket()
> method). However, in both cases, I am experiencing less than 10x
> throughput in comparison to in-memory generation, even when using
> a 10gbit connection (the throughput is much lower).
> 
> Is there any obvious defect in my implementation?
> 
> Thank you in advance,
> George



signature.asc
Description: OpenPGP digital signature


Re: Low throughput when trying to send data with Sockets

2018-01-16 Thread Nico Kruber
(back to the ml again)

If you implement the ParallelSourceFunction interface instead, Flink
will run as many source instances as the configured parallelism. Each
instance will run the same code and you'll thus have multiple sockets to
connect to, if that is what you wanted.


One more thing regarding your source: typically you'd want the
checkpoint lock around the collect() call, i.e.

synchronized (ctx.getCheckpointLock()) {
  ctx.collect(...)
}


Nico

On 16/01/18 12:27, George Theodorakis wrote:
> Thank you very much, indeed this was my bottleneck. 
> 
> My problem now is that my source is not parallel, so when I am
> increasing parallelism, system's throughput falls.
> 
> Is opening multiple sockets a quick solution to make the source parallel?
> 
> G.
> 
> 2018-01-16 10:51 GMT+00:00 Nico Kruber  <mailto:n...@data-artisans.com>>:
> 
> Hi George,
> I suspect issuing a read operation for every 68 bytes incurs too much
> overhead to perform as you would like it to. Instead, create a bigger
> buffer (64k?) and extract single events from sub-regions of this buffer
> instead.
> Please note, however, that then the first buffer will only be processed
> when this method returns (the details depend on the underlying channel
> [1]). This is a trade-off between latency and throughput at some point.
> If you set non-blocking mode for your channels, you will always get what
> the channel has available and continue immediately. You can set this up
> via this, for example:
> 
> ==
> socketChannel.configureBlocking(false);
> socketChannel.connect(new InetSocketAddress("http://jenkov.com
> <http://jenkov.com>", 80));
> 
> while(! socketChannel.finishConnect() ){
>     //wait, or do something else...
> }
> ==
> 
> 
> Nico
> 
> [1]
> 
> https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer)
> 
> <https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer)>
> 
> On 15/01/18 13:19, George Theodorakis wrote:
> > Hello,
> >
> > I am trying to separate the logic of my application by generating and
> > processing data in different physical engines. 
> >
> > I have created my custom socket source class:
> >
> > class SocketSourceFunction extends SourceFunction[Event2]{
> >       @volatile private var isRunning:Boolean = true;
> >       @transient private var serverSocket: ServerSocketChannel =
> null; 
> >
> >       override def run(ctx: SourceContext[Event2]) = {
> >   val hostname = "localhost"
> >   val port = 6667
> >           println("listening:" + port)                
> >   val server = ServerSocketChannel.open();
> >   server.bind(new InetSocketAddress (hostname, port));    
> >   var buffer = ByteBuffer.allocate (68);
> >   val des = new EventDeSerializer2()
> >       
> >   while (isRunning) {
> >             println("waiting...")            
> >             var socketChannel = server.accept();
> >
> >      if (socketChannel != null){
> >                println("accept:" + socketChannel)
> >                while (true) {
> >     var bytes = 0;
> >     bytes = socketChannel.read(buffer)
> >     if( bytes > 0) {
> >     if (!buffer.hasRemaining()) {
> >     buffer.rewind()
> >     var event: Event2 = des.deserialize(buffer.array())
> >     ctx.collect(event)
> >     buffer.clear()
> >     }
> >     }
> >                      }
> > }
> >           }          
> >       }
> >
> >       override def cancel() = {
> >         isRunning = false;
> >         val socket = this.serverSocket; 
> >         if (socket != null) { 
> >           try { 
> >             socket.close(); 
> >            }catch { case e: Throwable => {  
> >              System.err.println(String.format("error: %s",
> e.getMessage()));
> >         e.printStackTrace();
> >         System.exit(1);
> >              }
> >            }
> >          } 
> >       }
> > }
> >
> > I am sending data with either raw sockets using ByteBuffers or with a
> > Flink generator (serializing my Events and using writeToSocket()
> > method). However, in both cases, I am experiencing less than 10x
> > throughput in comparison to in-memory generation, even when using
> > a 10gbit connection (the throughput is much lower).
> >
> > Is there any obvious defect in my implementation?
> >
> > Thank you in advance,
> > George
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Unrecoverable job failure after Json parse error?

2018-01-16 Thread Nico Kruber
Nice, I didn't even read that far myself :P
-> turns out the API was prepared for that after all

I'm not sure about a default option for handling/skipping corrupted
messages since the handling of those is probably highly use-case
specific. If you nonetheless feel that this should be in there, feel
free to open an improvement request in our issue tracker at
https://issues.apache.org/jira/browse/FLINK


Nico

On 16/01/18 13:35, Adrian Vasiliu wrote:
> Hi Nico,
> Thanks a lot. I did consider that, but I've missed the clarification of
> the contract brought by the piece a doc you
> pointed: "returning |null| to allow the Flink Kafka consumer to silently
> skip the corrupted message".
> I suppose it could be an improvement
> for JSONKeyValueDeserializationSchema to provide this behaviour as an
> out-of-the-box option. But anyway, I do have a solution in hands.
> Thanks again.
> Adrian
>  
> 
> - Original message -
> From: Nico Kruber 
> To: Adrian Vasiliu , user@flink.apache.org
> Cc:
> Subject: Re: Unrecoverable job failure after Json parse error?
> Date: Tue, Jan 16, 2018 11:34 AM
>  
> Hi Adrian,
> couldn't you solve this by providing your own DeserializationSchema [1],
> possibly extending from JSONKeyValueDeserializationSchema and catching
> the error there?
> 
> 
> Nico
> 
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema
> 
> On 12/01/18 18:26, Adrian Vasiliu wrote:
> > Hello,
> >
> > When using FlinkKafkaConsumer011
> with JSONKeyValueDeserializationSchema,
> > if an invalid, non-parsable message is sent to the Kafka topic, the
> > consumer expectedly fails with JsonParseException. So far so good, but
> > this leads to the following loop: the job switches to FAILED
> > then attempts to restart and fails again, and so on. That is, the
> > parsing error leads to the Kafka message not being committed, hence it
> > keeps being received. 
> > Since the JsonParseException can't be catched in application code,
> what
> > would be the recommended way to handle the case of possibly
> > non-parseable Kafka messages?
> >  
> > Is there is a way to configure the Flink Kafka consumer to treat the
> > case of non-parseable messages by logging the parsing error then
> > committing the message such that the processing can continue? Is there
> > isn't, would such an enhancement make sense?
> >
> > Unless there is a better solution, it looks as a requirement to
> > guarantee that FlinkKafkaConsumer011 only receives valid messages,
> which
> > can be annoying in practice.
> >
> > For reference, here's the stack of the JsonParseException in the log:
> >
> > Source: Custom Source(1/1) switched to FAILED
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
> > Unexpected character (':' (code 58)): Expected space separating
> > root-level values
> > at [Source: UNKNOWN; line: 1, column: 3]
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.Object

Re: Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase

2018-02-26 Thread Nico Kruber
Judging from the code, you should separate different jars with a colon
":", i.e. "—addclasspath jar1:jar2"


Nico

On 26/02/18 10:36, kant kodali wrote:
> Hi Gordon,
> 
> Thanks for the response!! How do I add multiple jars to the classpaths?
> Are they separated by a semicolon and still using one flag like
> "—addclasspath jar1; jar2" or specify the flag multiple times like
> "—addclasspath jar1 —addclasspath jar2" or specify just the directory
> "—addclasspath ./opt" so it adds all the jars in that directory!
> 
> Thanks!
> 
> On Sun, Feb 25, 2018 at 11:29 PM, Tzu-Li (Gordon) Tai
> mailto:tzuli...@apache.org>> wrote:
> 
> Hi,
> 
> Good to see that you have it working! Yes, each of the Kafka
> version-specific connectors also have a dependency on the base Kafka
> connector module.
> 
> Note that it is usually not recommended to put optional dependencies
> (such as the connectors) under the lib folder.
> To add additional dependencies when using the Scala shell, there is
> a “—addclasspath” option which allows you to specify paths to the
> dependency jars.
> 
> Cheers,
> Gordon
> 
> 
> On 25 February 2018 at 12:22:28 PM, kant kodali (kanth...@gmail.com
> ) wrote:
> 
>> Exception went away after
>> downloading flink-connector-kafka-base_2.11-1.4.1.jar to lib folder
>>
>> On Sat, Feb 24, 2018 at 6:36 PM, kant kodali > > wrote:
>>
>> Hi,
>>
>> I couldn't get flink and kafka working together. It looks like
>> all examples I tried from web site fails with the following
>> Exception.
>>
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
>>
>>
>> *or when I do something like this like it is in the website*
>>
>>
>>  val stream = senv.addSource(new
>> FlinkKafkaConsumer08[String]("join_test", new
>> SimpleStringSchema(), properties)).print()
>>
>> *I get the following exception*
>>
>> :73: error: overloaded method value addSource with
>> alternatives:
>>
>>   [T](function:
>> 
>> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T]
>> => Unit)(implicit evidence$10:
>> 
>> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
>> 
>>
>>   [T](function:
>> 
>> org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit
>> evidence$9:
>> 
>> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
>>
>>  cannot be applied to (org.apache.flink.streaming.co
>> 
>> nnectors.kafka.FlinkKafkaConsumer08[String])
>>
>>        val stream = senv.addSource(new
>> FlinkKafkaConsumer08[String]("join_test", new
>> SimpleStringSchema(), properties)).print()
>>
>>
>> can anyone share a simple example of how to get Kafka Stream
>> as a Table using scala shell? No need for any fancy schema
>> just needs to print the value. I am using the latest version
>> of flink 1.41 and my lib folder
>> containers flink-connector-kafka-0.8_2.11-1.4.1.jar 
>>
>> I wanted to use Kafka 0.9 but that didn't work so I thought
>> let me just get something working first and downgraded to 0.8
>> but 0.8 examples on the website also don't seem to work using
>> scala shell. 
>>
>> Thanks!!
>>
>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: TaskManager crashes with PageRank algorithm in Gelly

2018-02-26 Thread Nico Kruber
Hi,
without knowing Gelly here, maybe it has to do something with cleaning
up the allocated memory as mentioned in [1]:

taskmanager.memory.preallocate: Can be either of true or false.
Specifies whether task managers should allocate all managed memory when
starting up. (DEFAULT: false). When taskmanager.memory.off-heap is set
to true, then it is advised that this configuration is also set to true.
If this configuration is set to false cleaning up of the allocated
offheap memory happens only when the configured JVM parameter
MaxDirectMemorySize is reached by triggering a full GC. Note: For
streaming setups, we highly recommend to set this value to false as the
core state backends currently do not use the managed memory.


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#managed-memory

On 22/02/18 19:56, santoshg wrote:
> An update - I was able to overcome these issues by setting the preallocate
> flag to true. Not sure why this fixes the problem. Need to dig a bit deeper.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 



signature.asc
Description: OpenPGP digital signature


Re: Which test cluster to use for checkpointing tests?

2018-02-26 Thread Nico Kruber
Hi Ken,
LocalFlinkMiniCluster should run checkpoints just fine. It looks like it
was attempting to even create one but could not finish. Maybe your
program was not fully running yet?
Can you tell us a little bit more about your set up and how you
configured the LocalFlinkMiniCluster?


Nico

On 23/02/18 21:42, Ken Krugler wrote:
> Hi all,
> 
> For testing checkpointing, is it possible to use LocalFlinkMiniCluster?
> 
> Asking because I’m not seeing checkpoint calls being made to my custom 
> function (implements ListCheckpointed) when I’m running with 
> LocalFlinkMiniCluster.
> 
> Though I do see entries like this logged:
> 
> 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using application-defined 
> state backend for checkpoint/savepoint metadata: MemoryStateBackend (data in 
> heap memory / checkpoints to JobManager).
> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint 
> triggering task Source: Seed urls source (1/2) is not being executed at the 
> moment. Aborting checkpoint.
> 
> But when I browse the Flink source, tests for checkpointing seem to be using 
> TestCluster, e.g. in ResumeCheckpointManuallyITCase
> 
> Thanks,
> 
> — Ken
> 
> 
> http://about.me/kkrugler
> +1 530-210-6378
> 



signature.asc
Description: OpenPGP digital signature


Re: Which test cluster to use for checkpointing tests?

2018-02-28 Thread Nico Kruber
I was a bit confused about when you said that the "source is done" which
is when I realized you must be using the batch API for which
checkpointing is not available / needed. Let me quote from [1] which
imho has not changed:

DataSet:

Fault tolerance for the DataSet API works by restarting the job and
redoing all of the work. [...] The periodic in-flight checkpoints are
not used here.

DataStream:

This one would start immediately inserting data (as it is a streaming
job), and draw periodic checkpoints that make sure replay-on-failure
only has to redo only a bit, not everything.


Nico

[1]
https://lists.apache.org/thread.html/3121ad01f5adf4246aa035dfb886af534b063963dee0f86d63b675a1@1447086324@%3Cuser.flink.apache.org%3E

On 26/02/18 22:55, Ken Krugler wrote:
> Hi Nico,
> 
>> On Feb 26, 2018, at 9:41 AM, Nico Kruber > <mailto:n...@data-artisans.com>> wrote:
>>
>> Hi Ken,
>> LocalFlinkMiniCluster should run checkpoints just fine. It looks like it
>> was attempting to even create one but could not finish. Maybe your
>> program was not fully running yet?
> 
> In the logs I see:
> 
> 18/02/23 12:40:50 INFO taskmanager.Task:957 - Source: Seed urls source
> (1/2) (56fdede2f4783455b4ab8f290e700baa) switched from DEPLOYING to RUNNING.
> 18/02/23 12:40:50 DEBUG tasks.StreamTask:214 - Initializing Source: Seed
> urls source (1/2).
> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint
> triggering task Source: Seed urls source (1/2) is not being executed at
> the moment. Aborting checkpoint.
> 
> Maybe the checkpoint here is happening too soon after the “Initializing
> Source” message.
> 
> After that the source is done (it only triggers the iteration with a
> single starting tuple), so I wouldn’t expect checkpointing to actually
> do anything. I was just using these messages as indications that I had
> configured my workflow properly to actually do checkpointing.
> 
>> Can you tell us a little bit more about your set up and how you
>> configured the LocalFlinkMiniCluster?
> 
> Potential issue #1 - I’ve got a workflow with multiple iterations.
> 
> For that reason I had to force checkpointing via:
> 
>         env.setStateBackend(new MemoryStateBackend());
> env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);
> 
> 
> Potential issue #2 - because of the fun with tracking iteration
> progress, I subclass LocalStreamEnvironment to add this async execution
> method:
> 
> public JobSubmissionResult executeAsync(String jobName) throws Exception {
> // transform the streaming program into a JobGraph
> StreamGraph streamGraph = getStreamGraph();
> streamGraph.setJobName(jobName);
> 
> JobGraph jobGraph = streamGraph.getJobGraph();
> 
> Configuration configuration = new Configuration();
> configuration.addAll(jobGraph.getJobConfiguration());
> 
> configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
> configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
> jobGraph.getMaximumParallelism());
> 
> // add (and override) the settings with what the user defined
> configuration.addAll(_conf);
> 
> _exec = new LocalFlinkMiniCluster(configuration, true);
> _exec.start(true);
> 
> 
> // The above code is all basically the same as Flink's
> LocalStreamEnvironment.
> // The change is that here we call submitJobDetached vs. submitJobAndWait.
> // We assume that eventually someone calls stop(job id), which then
> terminates
> // the LocalFlinkMinimCluster.
> return _exec.submitJobDetached(jobGraph);
> }
> 
> However I don’t think that would impact checkpointing.
> 
> Anything else I should do to debug whether checkpointing is operating as
> expected? In the logs, at DEBUG level, I don’t see any errors or
> warnings related to this.
> 
> Thanks,
> 
> — Ken
> 
>>
>>
>> Nico
>>
>> On 23/02/18 21:42, Ken Krugler wrote:
>>> Hi all,
>>>
>>> For testing checkpointing, is it possible to use LocalFlinkMiniCluster?
>>>
>>> Asking because I’m not seeing checkpoint calls being made to my
>>> custom function (implements ListCheckpointed) when I’m running with
>>> LocalFlinkMiniCluster.
>>>
>>> Though I do see entries like this logged:
>>>
>>> 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using
>>> application-defined state backend for checkpoint/savepoint metadata:
>>> MemoryStateBackend (data in heap memory / checkpoints to JobManager).
>>> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 -
>>> Checkpoint triggering task Source: Seed urls source (1/2) is not
>>> being executed at the moment. Aborting checkpoint.
>>>
>>> But when I browse the Flink source, tests for checkpointing seem to
>>> be using TestCluster, e.g. in ResumeCheckpointManuallyITCase
>>>
>>> Thanks,
>>>
>>> — Ken
>>>
>>> 
>>> http://about.me/kkrugler
>>> +1 530-210-6378
>>>
>>
> 
> 
> http://about.me/kkrugler
> +1 530-210-6378
> 



signature.asc
Description: OpenPGP digital signature


Re: logging question

2018-02-28 Thread Nico Kruber
I'm a bit curious on how you hand your log4j into the docker image for
consumption. On SO you are referring to bin/flink-console.sh but
executing Flink in docker is a bit different.
Maybe I'm wrong, but looking at the sources of the docker image [1], it
will not forward any additional parameters to the docker container via
additions to the command starting the docker image.


Nico

[1]
https://github.com/docker-flink/docker-flink/tree/master/1.4/hadoop28-scala_2.11-alpine

On 27/02/18 18:25, JP de Vooght wrote:
> Hello Nico,
> 
> took me a while to respond. Thank you for the comments. I had explored a
> little more the docker-image and startup scripts. That allowed me to
> better understand the log4j properties file used but I am still facing
> this odd behavior.
> 
> I created a stackoverflow entry for this
> 
> https://stackoverflow.com/questions/48853497/docker-flink-not-showing-all-log-statements
> 
> Below, I am just showing the properties file below which I hadn't put on SO.
> 
> # This affects logging for both user code and Flink
> log4j.rootLogger=INFO, file, console
>  
> # Uncomment this if you want to _only_ change Flink's logging
> log4j.logger.org.apache.flink=OFF
>  
> # The following lines keep the log level of common libraries/connectors on
> # log level INFO. The root logger does not override this. You have to
> manually
> # change the log levels here.
> log4j.logger.akka=INFO
> log4j.logger.org.apache.kafka=INFO
> log4j.logger.org.apache.hadoop=INFO
> log4j.logger.org.apache.zookeeper=INFO
>  
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>  
> # Log all infos to the console
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.Target=System.out
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{-MM-dd
> HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>  
> # Suppress the irrelevant (wrong) warnings
> log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
> log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
> 
> JP
> 
> 
> On 01/16/2018 10:50 AM, Nico Kruber wrote:
>> Just a guess, but probably our logging initialisation changes the global
>> log level (see conf/log4j.properties). DataStream.collect() executes the
>> program along with creating a local Flink "cluster" (if you are testing
>> locally / in an IDE) and initializing logging, among other things.
>>
>> Please comment the first line out and uncomment the following one to
>> read like this:
>> ==
>> # This affects logging for both user code and Flink
>> #log4j.rootLogger=INFO, file
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>> log4j.logger.org.apache.flink=INFO
>> ==
>>
>>
>> Nico
>>
>> On 13/01/18 13:52, j...@vooght.de wrote:
>>> Hello,
>>> I am learning Flink and using the docker image along with the AMIDST
>>> library for this.
>>> Below is a sample task from AMIDST which provides INFO output up until I
>>> reach updateModel(). I pasted the short method as well and wonder what
>>> prevents the Logger from
>>>
>>>     //Set-up Flink session
>>>     env = ExecutionEnvironment.getExecutionEnvironment();
>>>     env.getConfig().disableSysoutLogging();
>>>     Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>>
>>>     //generate a random dataset
>>>     DataFlink dataFlink = new
>>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>>
>>>     //Creates a DAG with the NaiveBayes structure for the random
>>> dataset
>>>     DAG dag =
>>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>>> "DiscreteVar4");
>>>     LOG.info(dag.toString());
>>>
>>>     //Create the Learner object
>>>     ParameterLearningAlgorithm learningAlgorithmFlink = new
>>> ParallelMaximumLikelihood();
>>>
>>>     //Learning parameters
>>>     learningAlgorithmFlink.setBatchSize(10);
>>>     learningAlgorithmFlink.setDAG(dag);
>>>
>>>     //Initialize the learning process
>>>     learningAlgorithmFlink.initLearning();
>&

Re: Which test cluster to use for checkpointing tests?

2018-03-06 Thread Nico Kruber
Hi Ken,
sorry, I was mislead by the fact that you are using iterations and those
were only documented for the DataSet API.

Running checkpoints with closed sources sounds like a more general thing
than being part of the iterations rework of FLIP-15. I couldn't dig up
anything on jira regarding this improvement either.

@Stephan: is this documented somewhere?


Nico

On 02/03/18 23:55, Ken Krugler wrote:
> Hi Stephan,
> 
> Thanks for the update.
> 
> So is support for “running checkpoints with closed sources” part
> of FLIP-15
> ,
> or something separate?
> 
> Regards,
> 
> — Ken
> 
>> On Mar 1, 2018, at 9:07 AM, Stephan Ewen > > wrote:
>>
>> @Ken The issue you are running into is that Checkpointing works
>> currently only until the job reaches the point where the pipeline
>> starts to drain out, meaning when the sources are done. In your case,
>> the source is done immediately, sending out only one tuple.
>>
>> Running checkpoints with closed sources is something that's on the
>> feature list and will come soon…
> 
> 
> http://about.me/kkrugler
> +1 530-210-6378
> 



signature.asc
Description: OpenPGP digital signature


Re: Which test cluster to use for checkpointing tests?

2018-03-06 Thread Nico Kruber
There are still some upcoming changes for the network stack, but most of
the heavy stuff it already through - you may track this under
https://issues.apache.org/jira/browse/FLINK-8581

FLIP-6 is somewhat similar and currently only undergoes some stability
improvements/bug fixing. The architectural changes are merged now.


Nico

On 06/03/18 11:24, Paris Carbone wrote:
> Hey,
> 
> Indeed checkpointing iterations and dealing with closed sources are 
> orthogonal issues, that is why the latter is not part of FLIP-15. Though, you 
> kinda need both to have meaningful checkpoints for jobs with iterations.
> One has to do with correctness (checkpointing strongly connected components 
> in the execution graph) and the other about termination (terminating the 
> checkpointing protocol when certain tasks ‘finish’).
> 
> I am willing to help out resolving the first issue, though I prefer to wait 
> for ongoing changes in the network model and FLIP-6 to be finalised to apply 
> this change properly (are they?). 
> 
> Paris
> 
>> On 6 Mar 2018, at 10:51, Nico Kruber  wrote:
>>
>> Hi Ken,
>> sorry, I was mislead by the fact that you are using iterations and those
>> were only documented for the DataSet API.
>>
>> Running checkpoints with closed sources sounds like a more general thing
>> than being part of the iterations rework of FLIP-15. I couldn't dig up
>> anything on jira regarding this improvement either.
>>
>> @Stephan: is this documented somewhere?
>>
>>
>> Nico
>>
>> On 02/03/18 23:55, Ken Krugler wrote:
>>> Hi Stephan,
>>>
>>> Thanks for the update.
>>>
>>> So is support for “running checkpoints with closed sources” part
>>> of FLIP-15
>>> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>,
>>> or something separate?
>>>
>>> Regards,
>>>
>>> — Ken
>>>
>>>> On Mar 1, 2018, at 9:07 AM, Stephan Ewen >>> <mailto:se...@apache.org>> wrote:
>>>>
>>>> @Ken The issue you are running into is that Checkpointing works
>>>> currently only until the job reaches the point where the pipeline
>>>> starts to drain out, meaning when the sources are done. In your case,
>>>> the source is done immediately, sending out only one tuple.
>>>>
>>>> Running checkpoints with closed sources is something that's on the
>>>> feature list and will come soon…
>>>
>>> 
>>> http://about.me/kkrugler
>>> +1 530-210-6378
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Flink is looking for Kafka topic "n/a"

2018-03-06 Thread Nico Kruber
Hi Mu,
which version of flink are you using? I checked the latest branches for
1.2 - 1.5 to look for findLeaderForPartitions at line 205 in
Kafka08Fetcher but they did not match. From what I can see in the code,
there is a MARKER partition state with topic "n/a" but that is
explicitly removed from the list of partitions to find leaders for in
the code and solely used during cancelling the fetcher.

I don't know whether this is possible, but I suppose there could be more
than one marker and we should call removeAll() instead - @Gordon, can
you elaborate/check whether this could happen?


Nico

On 06/03/18 12:51, Mu Kong wrote:
> Hi,
> 
> I have encountered a wired problem.
> After I start the job for several days, Flink gave me the following error:
> 
> /java.lang.RuntimeException: Unable to find a leader for partitions:
> [Partition: KafkaTopicPartition{topic='n/a', partition=-1},
> KafkaPartitionHandle=[n/a,-1], offset=(not set)]/
> /        at
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:495)/
> /        at
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:205)/
> /        at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)/
> /        at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)/
> /        at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)/
> /        at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)/
> /        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)/
> /        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)/
> /        at java.lang.Thread.run(Thread.java:748)/
> /
> /
> The Flink job died after this error and tried to restart but in vain at
> the end.
> 
> Is there any reason why Flink was unable to find a leader for the partition?
> A more confusing question would be why is it trying to find topic 'n/a',
> instead of the topic we have specified?
> 
> Thanks in advance!
> 
> Best regards,
> Mu



signature.asc
Description: OpenPGP digital signature


Re: akka.remote.ReliableDeliverySupervisor Temporary failure in name resolution

2018-03-06 Thread Nico Kruber
Hi Miki,
I'm no expert on the Kubernetes part, but could that be related to
https://github.com/kubernetes/kubernetes/issues/6667?

I'm not sure this is an Akka issue: if it cannot communicate with some
address it basically blocks it from further connection attempts for a
given time (here 5 seconds).

Is there some firewall or port configuration blocking the connection
between the JobManager and the (new) TaskManager?


I tried to reproduce it locally with minikube, but starting jobmanager
and taskmanager services as described in [1] and then deleting the task
managers and re-starting them again worked without a flaw. My bet is on
something Flink-external because of the "Temporary failure in name
resolution" error message.
Maybe @Patrick (cc'd) has encountered this before and knows more.



Nico


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/kubernetes.html

On 06/03/18 11:35, miki haiat wrote:
> Hi , 
> 
> Im running flink jobs on kubernetes after a day or so.
> the task manager and job manager    losing connection   and i have to
> restart earthing . 
> Im assuming that one of the pods crashed and when now pod start he cant
> find the job manager ?
> Also i saw that is an Akka issue...  and it wiil be fixed in version 1.5 .
> 
> How can i safely deploy jobs on kubernetes .
> 
> 
> task manager logs 
> 
> 2018-03-06 07:23:18,186 INFO 
> org.apache.flink.runtime.taskmanager.TaskManager              -
> Trying to register at JobManager
> akka.tcp://flink@flink-jobmanager:6123/user/jobmanager (attempt
> 1594, timeout: 3 milliseconds)
> 2018-03-06 07:23:48,196 INFO 
> org.apache.flink.runtime.taskmanager.TaskManager              -
> Trying to register at JobManager
> akka.tcp://flink@flink-jobmanager:6123/user/jobmanager (attempt
> 1595, timeout: 3 milliseconds)
> 2018-03-06 07:24:18,216 INFO 
> org.apache.flink.runtime.taskmanager.TaskManager              -
> Trying to register at JobManager
> akka.tcp://flink@flink-jobmanager:6123/user/jobmanager (attempt
> 1596, timeout: 3 milliseconds)
> 2018-03-06 07:24:48,237 INFO 
> org.apache.flink.runtime.taskmanager.TaskManager              -
> Trying to register at JobManager
> akka.tcp://flink@flink-jobmanager:6123/user/jobmanager (attempt
> 1597, timeout: 3 milliseconds)
> 2018-03-06 07:24:53,042 WARN 
> akka.remote.ReliableDeliverySupervisor                        -
> Association with remote system
> [akka.tcp://flink@flink-jobmanager:6123] has failed, address is now
> gated for [5000] ms. Reason: [Disassociated] 
> 
> 
> Job manager logs 
> 
> 
> 2018-03-06 07:25:18,262 INFO 
> org.apache.flink.runtime.instance.InstanceManager             -
> Registered TaskManager at flink-taskmanager-3509325052-bqtkd
> 
> (akka.tcp://flink@flink-taskmanager-3509325052-bqtkd:35073/user/taskmanager)
> as c37614c28df29d34b80676488e386da3. Current number of registered
> hosts is 2. Current number of alive task slots is 16.
> 2018-03-06 07:25:18,263 WARN 
> akka.remote.ReliableDeliverySupervisor                        -
> Association with remote system
> [akka.tcp://flink@flink-taskmanager-3509325052-bqtkd:35073] has
> failed, address is now gated for [5000] ms. Reason: [Association
> failed with
> [akka.tcp://flink@flink-taskmanager-3509325052-bqtkd:35073]] Caused
> by: [flink-taskmanager-3509325052-bqtkd: Temporary failure in name
> resolution]
> 2018-03-06 07:25:23,282 WARN 
> akka.remote.ReliableDeliverySupervisor                        -
> Association with remote system
> [akka.tcp://flink@flink-taskmanager-3509325052-bqtkd:35073] has
> failed, address is now gated for [5000] ms. Reason: [Association
> failed with
> [akka.tcp://flink@flink-taskmanager-3509325052-bqtkd:35073]] Caused
> by: [flink-taskmanager-3509325052-bqtkd]
> 2018-03-06 07:25:28,303 WARN 
> akka.remote.ReliableDeliverySupervisor                        -
> Association with remote system
> [akka.tcp://flink@flink-taskmanager-3509325052-bqtkd:35073] has
> failed, address is now gated for [5000] ms. Reason: [Association
> failed with
> [akka.tcp://flink@flink-taskmanager-3509325052-bqtkd:35073]] Caused
> by: [flink-taskmanager-3509325052-bqtkd: Temporary failure in name
> resolution]
> 2018-03-06 07:25:33,322 WARN 
> akka.remote.ReliableDeliverySupervisor                        -
> Association with remote system
> [akka.tcp://flink@flink-taskmanager-3509325052-bqtkd:35073] has
> failed, address is now gated for [5000] ms. Reason: [Association
> failed with
> [akka.tcp://flink@flink-taskmanager-3509325052-bqtkd:35073]] Caused
> by: [flink-taskmanager-3509325052-bqtkd]
> 2018-03-06 07:25:38,343 WARN 
> akka.remote.ReliableDeliverySupervisor                        -
> Association with remote system
>

Re: Kafka offset auto-commit stops after timeout

2018-03-06 Thread Nico Kruber
Hi Edward,
looking through the Kafka code, I do see a path where they deliberately
do not want recursive retries, i.e. if the coordinator is unknown. It
seems like you are getting into this scenario.

I'm no expert on Kafka and therefore I'm not sure on the implications or
ways to circumvent/fix this, maybe the Kafka folks can help you with
this on their mailing list or Gordon (cc'd) knows - although this seems
Flink-unrelated.

Regarding the use of OffsetCommitMode.ON_CHECKPOINTS: I looked at our
code and with this (@Gordon, please correct me if I'm wrong), we will
commit the offsets ourselves and will try to commit every time a
checkpoint completes. In case of a failure in the last commit, we will
simply commit the new one instead with the next checkpoint.


Nico

On 05/03/18 17:11, Edward wrote:
> We have noticed that the Kafka offset auto-commit functionality seems to stop
> working after it encounters a timeout. It appears in the logs like this:
> 
> 2018-03-04 07:02:54,779 INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
> the coordinator kafka06:9092 (id: 2147483641 rack: null) dead for group
> consumergroup01
> 2018-03-04 07:02:54,780 WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
> Auto-commit of offsets {topic01-24=OffsetAndMetadata{offset=153237895,
> metadata=''}} failed for group consumergroup01: Offset commit failed with a
> retriable exception. You should retry committing offsets. The underlying
> error was: The request timed out.
> 
> After this message is logged, no more offsets are committed by the job until
> it is restarted (and if the flink process ends abnormally, the offsets never
> get committed).
> 
> This is using Flink 1.4.0 which uses kafka-clients 0.11.0.2. We are using
> the default kafka client settings for enable.auto.commit (true) and
> auto.commit.interval.ms (5000). We are not using Flink checkpointing, so the
> kafka client offset commit mode is OffsetCommitMode.KAFKA_PERIODIC (not
> OffsetCommitMode.ON_CHECKPOINTS).
> 
> I'm wondering if others have encountered this?
> 
> And if so, does enabling checkpointing resolve the issue, because
> Kafka09Fetcher.doCommitInternalOffsetsToKafka is called from the Flink code?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 



signature.asc
Description: OpenPGP digital signature


Re: bin/start-cluster.sh won't start jobmanager on master machine

2018-03-06 Thread Nico Kruber
Hi Yesheng,
`nohup /bin/bash -l bin/jobmanager.sh start cluster ...` looks a bit
strange since it should (imho) be an absolute path towards flink.

What you could do to diagnose further, is to try to run the ssh command
manually, i.e. figure out what is being executed by calling
bash -x ./bin/start-cluster.sh
and then run the ssh command without "-n" and not in background "&".
Then you should also see the JobManager stdout to diagnose further.

If that does not help yet, please log into the master manually and
execute the "nohup /bin/bash..." command there to see what is going on.

Depending on where the failure was, there may even be logs on the master
machine.


Nico

On 04/03/18 15:52, Yesheng Ma wrote:
> Hi all,
> 
> ​​When I execute bin/start-cluster.sh on the master machine, actually
> the command `nohup /bin/bash -l bin/jobmanager.sh start cluster ...` is
> exexuted, which does not open the job manager properly.
> 
> I think there might be something wrong with the `-l` argument, since
> when I use the `bin/jobmanager.sh start` command, everything is fine.
> Kindly point out if I've done any configuration wrong. Thanks!
> 
> Best,
> Yesheng
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Akka wants to connect with username "flink"

2018-03-06 Thread Nico Kruber
Hi Lukas,
those are akka-internal names that you don't have to worry about.

It looks like your TaskManager cannot reach the JobManager.
Is 'jobmanager.rpc.address' configured correctly on the TaskManager? And
is it reachable by this name? Is port 6123 allowed through the firewall?

Are you sure the JobManager is running?
How do you start the cluster? If you have been using start-cluster.sh
(as per [1]), please also try to start the services manually to check
whether there's something wrong there.


Nico


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html

On 04/03/18 13:57, Lukas Werner wrote:
> Hey there,
> 
> For my master thesis I'm trying to set up a flink standalone cluster on
> 4 nodes. I've worked along the documentation which pretty neatly
> explains how to set it up. But when I start the cluster there is a
> warning and when I'm trying to run a job, there is an error with the
> same message:
> 
> akka.pattern.AskTimeoutException:Asktimed out on
> [Actor[akka.tcp://flink@MYHOSTNAME:6123/user/jobmanager#-818199108]]
> after [1 ms]. Sender[null] sent message of type
> "org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage"
> 
> Increasing the timeout didn't work. When I open the taskmanagers in web
> UI, all of them have the following pattern:
> 
> akka.tcp://flink@MYHOSTNAME:33779/user/taskmanager
> 
> Does anyone have an idea how to solve this to get the cluster working?
> Thanks in advance!
> 
> One last thing: There isn't a user "flink" on the cluster and won't be
> created. So any advices without telling me I should create that user
> would be very appreciated! Thanks!
> 
> Kind regards,
> Lukas
> 
>  
> 



signature.asc
Description: OpenPGP digital signature


Re: Flink is looking for Kafka topic "n/a"

2018-03-08 Thread Nico Kruber
I think, I found a code path (race between threads) that may lead to two
markers being in the list.

I created https://issues.apache.org/jira/browse/FLINK-8896 to track this
and will have a pull request ready (probably) today.


Nico

On 07/03/18 10:09, Mu Kong wrote:
> Hi Gordon,
> 
> Thanks for your response.
> I think I've misspoken about the failure after "n/a" exception.
> The behavior after this exception would be:
> 
> switched from RUNNING to CANCELING
> switched from CANCELING to CANCELED
> Try to restart or fail the job "X" () if no
> longer possible.
> switched from state FAILING to RESTARTING
> Restarting the job "X" ()
> Recovering checkpoints from ZooKeeper
> Found 1 checkpoints in ZooKeeper
> Trying to retrieve checkpoint 1091
> Restoring from latest valid checkpoint: Checkpoint 1091 @
>  for 
> switched from CREATED to SCHEDULED
> switched from SCHEDULED to DEPLOYING
> switched from DEPLOYING to RUNNING
> (several check pointings)
> switched from RUNNING to FAILED
> TimerException{java.io.EOFException:Premature EOF: no length prefix
> available}
>         at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException: Premature EOF: no length prefix available
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
> 
> Since there several successful check points after the restart, I think
> the later failure might be something else.
> Also, could you please share more information about the MARKER in the
> code? Like which piece of code should I look for.
> 
> And thanks for the suggestion to let me upgrade the flink to 1.3.2
> 
> Best regards,
> Mu
> 
> 
> On Wed, Mar 7, 2018 at 3:04 PM, Tzu-Li Tai  > wrote:
> 
> Hi Mu,
> 
> You mentioned that the job stopped after the "n/a" topic error, but
> the job
> failed to recover.
> What exception did you encounter in the restart executions? Was it
> the same
> error?
> This would verify if we actually should be removing more than one of
> these
> special MARKER partition states.
> 
> On the other hand, if I recall correctly, the Kafka consumer had a
> severe
> bug in 1.3.0 which could lead to potential duplicate data, which was
> fixed
> in 1.3.2. Though I don't think it is related to the error you
> encountered, I
> strongly recommend that you use 1.3.2 instead.
> 
> Cheers,
> Gordon
> 
> 
> 
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Incremental checkpointing performance

2018-03-19 Thread Nico Kruber
Hi Miyuru,
Indeed, the behaviour you observed sounds strange and kind of go against
the results Stefan presented in [1]. To see what is going on, can you
also share your changes to Flink's configuration, i.e. flink-conf.yaml?

Let's first make sure you're really comparing RocksDBStateBackend with
vs without incremental checkpoints:
- if you remove this from the code:
env.setStateBackend(new RocksDBStateBackend(
   new FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
   true));
then you will end up with the state backend configured via the
"state.backend" property. Was this set to "rocksdb"? Alternatively, you
can set the second parameter to the RocksDBStateBackend constructor to
false to get the right back-end.

You can also verify the values you see from the web interface by looking
into the logs (at INFO level). There, you should see reports like this:
"Asynchronous RocksDB snapshot (..., asynchronous part) in thread ...
took ... ms." and "Asynchronous RocksDB snapshot (..., synchronous part)
in thread ... took ... ms."

Other than that, from what I know about it (Stefan (cc'd), correct me if
I'm wrong), incremental checkpoints only do hard links locally to the
changed sst files and then copy the data in there to the checkpoint
store (the path you gave). A full checkpoint must copy all current data.
If, between two checkpoints, you write more data than the contents of
the database, e.g. by updating a key multiple times, you may indeed have
more data to store. Judging from the state sizes you gave, this is
probably not the case.


Let's get started with this and see whether there is anything unusual.


Regards,
Nico


[1]
https://berlin.flink-forward.org/kb_sessions/a-look-at-flinks-internal-data-structures-and-algorithms-for-efficient-checkpointing/

On 19/03/18 05:25, Miyuru Dayarathna wrote:
> Hi,
> 
> We did a performance test of Flink's incremental checkpointing to
> measure the average time it takes to create a checkpoint and the average
> checkpoint file size. We did this test on a single computer in order to
> avoid the latencies introduced by network communication. The computer
> had Intel® Core™ i7-7600U CPU @ 2.80GHz, 4 cores, 16GB RAM, 450GB HDD,
> 101GB free SSD space. The computer was running on Ubuntu 16.04
> LTS, Apache Flink 1.4.1, Java version "1.8.0_152", Java(TM) SE Runtime
> Environment (build 1.8.0_152-b16), Java HotSpot(TM) 64-Bit Server VM
> (build 25.152-b16, mixed mode). We used the JVM flags -Xmx8g -Xms8g. The
> test was run for 40 minutes.
> 
> The Flink application we used is as follows,
> //-
> public class LengthWindowIncrementalCheckpointing {
>     private static DataStream>
> inputStream = null;
>     private static final int PARALLELISM = 1;
>     private static final int timeoutMillis = 10;
>     private static final int WINDOWLENGTH = 1;
>     private static final int SLIDELENGTH = 1;
>     private static Logger logger =
> LoggerFactory.getLogger(LengthWindowIncrementalCheckpointing.class);
> 
>     public static void main(String[] args) {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>     // start a checkpoint every 1000 ms
>     env.enableCheckpointing(1000);
>     try {
>     env.setStateBackend(new RocksDBStateBackend(
>     new
> FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>     true));
>     } catch (IOException e) {
>     e.printStackTrace();
>     }
> 
>     env.setBufferTimeout(timeoutMillis);
>     inputStream = env.addSource(new
> MicrobenchSourceFunction()).setParallelism(PARALLELISM).name("inputStream");
> 
>     DataStream>
> incrementStream2 =
>     inputStream.filter(new FilterFunction Float, Integer, String>>() {
>     @Override
>     public boolean filter(Tuple4 String> tuple) throws Exception {
>     if (tuple.f1 > 10) {
>     return true;
>     }
>     return false;
>     }
>     }).keyBy(1).countWindow(WINDOWLENGTH, SLIDELENGTH).sum(2);
>     incrementStream2.writeUsingOutputFormat(new
> DiscardingOutputFormat     String>>());
> 
>     try {
>     env.execute("Flink application.");
>     } catch (Exception e) {
>     logger.error("Error in starting the Flink stream
> application: " + e.getMessage(), e);
>     }
>     }
> }
> 
> //-
> 
> I have attached two charts (Average_latencies.jpg and
> Average_state_sizes.jpg) with the results and another image with the
> Flink dashboard (Flink-Dashboard.png). The average state size chart
> indicates that the size of an incrementa

Re: flink on mesos

2018-03-19 Thread Nico Kruber
Can you elaborate a bit more on what is not working? (please provide a
log file or the standard output/error).
Also, can you try a newer flink checkount? The start scripts have been
merged into a single one for 'flip6' and 'old' - I guess,
mesos-appmaster.sh should be the right script for you now.


Regards
Nico

On 18/03/18 17:06, miki haiat wrote:
> I think  that you can use the catalog option only if you install dc/os ?
>  
> 
>  iv  installed  mesos and marathon  
> 
> 
> 
> 
> On Sun, Mar 18, 2018 at 5:59 PM, Lasse Nedergaard
> mailto:lassenederga...@gmail.com>> wrote:
> 
> Hi. 
> Go to Catalog, Search for Flink and click deploy
> 
> Med venlig hilsen / Best regards
> Lasse Nedergaard
> 
> 
> Den 18. mar. 2018 kl. 16.18 skrev miki haiat  >:
> 
>>
>> Hi , 
>>
>> Im trying to run flink on mesos iv  installed  mesos and marathon
>> successfully but im unable to create flink job/task manager 
>>
>> im running this command but mesos wont start any task 
>>
>> ./mesos-appmaster-flip6-session.sh  -n 1
>>
>>
>>
>> i cant figure out the proper way to run flink on  mesos  
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: CsvSink

2018-03-19 Thread Nico Kruber
Hi Karim,
when I was trying to reproduce your code, I got an exception with the
name 'table' being used - by replacing it and completing the job with
some input, I did see the csv file popping up. Also, the job was
crashing when the file 1.txt already existed.

The code I used (running Flink 1.5-SNAPSHOT):

  def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Int, Long, String)] = get3TupleDataStream(env)
  .assignAscendingTimestamps(_._2)
tableEnv.registerDataStream("mytable", stream, 'A, 'B, 'C)

val results = tableEnv.sqlQuery( """
   |SELECT
   | A,C
   | FROM mytable
 """.stripMargin)

val result: Table = results

val path = "file:///tmp/test/1.txt"
val sink :TableSink[Row]=   new CsvTableSink(
  path, // output path
  fieldDelim = "|", // optional: delimit files by '|'
  numFiles = 1, // optional: write to a single file
  writeMode = WriteMode.NO_OVERWRITE)

result.writeToSink(sink)

env.execute("this job")
  }

  def get3TupleDataStream(env: StreamExecutionEnvironment):
DataStream[(Int, Long, String)] = {
val data = new mutable.MutableList[(Int, Long, String)]
data.+=((1, 1L, "Hi"))
data.+=((2, 2L, "Hello"))
data.+=((3, 2L, "Hello world"))
data.+=((4, 3L, "Hello world, how are you?"))
data.+=((5, 3L, "I am fine."))
data.+=((6, 3L, "Luke Skywalker"))
env.fromCollection(data)
  }


Nico

On 16/03/18 22:50, karim amer wrote:
> Hi There,
> 
>  I am trying to write a CSVsink to disk but it's not getting written. I
> think the file is getting overwritten or truncated once The Stream
> process finishes. Does anyone know why the file is getting overwritten
> or truncated and how can i fix this ?
> 
> 
> tableEnv.registerDataStream("table", watermarkedStream)
> 
> val results = tableEnv.sqlQuery( """
> |SELECT
> | A
> | FROM table
> """.stripMargin)
> 
> 
> 
> val result: Table = results
> 
> val path = "file:///path/test/1.txt"
> val sink :TableSink[Row]=   new CsvTableSink(
>   path, // output path
> fieldDelim = "|", // optional: delimit files by '|'
> numFiles = 1, // optional: write to a single file
> writeMode = WriteMode.NO_OVERWRITE)
> 
> result.writeToSink(sink)
> 
> env.execute("this job")
> 
> 
> 
> 
> Thanks



signature.asc
Description: OpenPGP digital signature


Re: Calling close() on Failure

2018-03-19 Thread Nico Kruber
Hi Gregory,
I tried to reproduce the behaviour you described but in my case (Flink
1.5-SNAPSHOT, using the SocketWindowWordCount adapted to let the first
flatmap be a RichFlatMapFunction with a close() method), the close()
method was actually called on the task manager I did not kill. Since the
close() actually comes from the RichFunction, the handling compared to a
ProcessFunction should not be different.

Can you give more details on your program and why you think it was not
called?


Regards
Nico

On 15/03/18 21:16, Gregory Fee wrote:
> Hello! I had a program lose a task manager the other day. The fail over
> back to a checkpoint and recovery worked like a charm. However, on one
> of my ProcessFunctions I defined a close() method and I noticed that it
> did not get called. To be clear, the task manager that failed was
> running that ProcessFunction. It makes sense to me that close() might
> not be callable in that case. But I had parallelism at 24 and I know
> that other instances of that ProcessFunction were running on machines
> that were gracefully shutdown yet zero close() functions were invoked.
> It seems like close() should get called on operators that are shutdown
> gracefully even in a failure condition. Is that how Flink is supposed to
> work? Am I missing something?
> 
> -- 
> *Gregory Fee*
> 
> Engineer
> 425.830.4734 
> Lyft 



signature.asc
Description: OpenPGP digital signature


Re: Submiting jobs via UI/Rest API

2018-03-19 Thread Nico Kruber
Thanks for reporting these issues,

1. This behaviour is actually intended since we do not spawn any thread
that is waiting for the job completion (which may or may not occur
eventually). Therefore, the web UI always submits jobs in detached mode
and you could not wait for job completion anyway. Any call after
env.execute() may thus not give you any more data than you already had
before. As a safety precaution, we stop the execution of the main()
method after env.execute().
If there was a possibility to wait for job completion, you would be able
to block the whole web UI with it.

2. This seems to be solved: I tried to submit this skeleton to Flink
1.5-SNAPSHOT and only got a failure message like this:
{"errors":["The main method caused an error."]}
The code I tried was
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.execute("Socket Window WordCount");

I also tried with a StreamTransformation only but got the same message.



Regards
Nico

On 09/03/18 14:33, eSKa wrote:
> Hi guys,
> 
> We were trying to use UI's "Submit new job" functionality (and later REST
> endpoints for that).
> There were few problems we found:
> 1. When we ran job that had additional code done after env execution (or any
> sink) the code was not executed. E.g. our job was calculating some data,
> writing it to temp location in sink and when everything was successfully,
> move files to proper location on HDFS. Running job using Java's
> YARNClusterClient API worked fine.
> 2. We wanted to test job using "Show Plan" option but it seems that running
> this option for job that did not have anything to run (e.g. calculated input
> paths list was empty) results in killing the container on YARN. I didnt find
> any suspicious logs in jobManager:
> 
> 
> /2018-03-09 14:13:53,979 INFO  com.my_job.CustomFlinkJob  
> - All job done :)
> 2018-03-09 14:13:53,996 INFO 
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Removing web
> dashboard root cache directory
> /tmp/flink-web-1fe30b99-9ad1-4531-b14b-143ea6c3d9ed
> 2018-03-09 14:13:54,004 INFO  org.apache.flink.runtime.blob.BlobServer
>  
> - Stopped BLOB server at 0.0.0.0:60727
> 2018-03-09 14:13:54,007 INFO 
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Removing web
> dashboard jar upload directory
> /tmp/flink-web-8d7b68fc-1ef7-4869-91c1-5bebb370b529
> /
> 
> We are using Flink 1.3.1 version, next week will play with 1.4.1.
> Any chances for fixing that bugs in next versions?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 



signature.asc
Description: OpenPGP digital signature


Re: Flink kafka connector with JAAS configurations crashed

2018-03-19 Thread Nico Kruber
Hi,
I'm no expert on Kafka here, but as the tasks are run on the worker
nodes (where the TaskManagers are run), please double-check whether the
file under /data/apps/spark/kafka_client_jaas.conf on these nodes also
contains the same configuration as on the node running the JobManager,
i.e. an appropriate entry for 'KafkaClient'.


Regards
Nico

On 13/03/18 08:42, sundy wrote:
> 
> Hi ,all 
> 
> I use the code below to set kafka JASS config,   the
> serverConfig.jasspath is  /data/apps/spark/kafka_client_jaas.conf,   but
> on flink standalone deployment, it crashs. I am sure the
> kafka_client_jass.conf is valid, cause other applications(Spark
> streaming) are still working fine with it. So I think it may be not the
> problem caused by kafka 0.10 client.
> 
> System.setProperty("java.security.auth.login.config", serverConfig.jasspath);
> properties.setProperty("security.protocol", "SASL_PLAINTEXT");
> properties.setProperty("sasl.mechanism", "PLAIN");
> 
> 
> Exceptions msgs are:
> 
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:56)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:91)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:422)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: 
> java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in 
> the JAAS configuration. System property 'java.security.auth.login.config' is 
> /data/apps/spark/kafka_client_jaas.conf
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:94)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
>   at 
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657)
>   ... 11 more
> Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' 
> entry in the JAAS configuration. System property 
> 'java.security.auth.login.config' is /data/apps/spark/kafka_client_jaas.conf
>   at 
> org.apache.kafka.common.security.JaasUtils.defaultJaasConfig(JaasUtils.java:85)
>   at 
> org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:67)
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:85)
>   ... 15 more
> 
> 
> 
> File content looks like below:
> 
> KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule
> required username="admin" password=“xxx"; };
> 
> It seems like the kafka_client_jaas.conf file has been read, but the
> KafkaClient entry could not be resolved. That’s very strange, other
> applications with the same config are working fine. And I wrote a simple
> Java code to test the the file, it works fine too.
> 
> 
> public static void main(String[] args) {
>   Map maps = new HashMap<>();
>   System.setProperty("java.security.auth.login.config",
> "/data/apps/spark/kafka_client_jaas.conf");
>   Configuration jassConfig = JaasUtils.jaasConfig(LoginType.CLIENT, maps);
>   AppConfigurationEntry object[] =
> jassConfig.getAppConfigurationEntry("KafkaClient");
>   for(AppConfigurationEntry entry : object){
>     System.out.println(entry.getOptions());
>   }
> }
> 
> 
> 
> 
>  
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Flink web UI authentication

2018-03-19 Thread Nico Kruber
Hi Sampath,
aside from allowing only certain origins via the configuration parameter
"web.access-control-allow-origin", I am not aware of anything like
username/password authentication. Chesnay (cc'd) may know more about
future plans.
You can, however, wrap a proxy like squid around the web UI if you need
this.


Regards
Nico

On 13/03/18 11:16, Sampath Bhat wrote:
> Hello
> 
> I would like to know if flink supports any user level authentication
> like username/password for flink web ui.
> 
> Regards
> Sampath S
> 



signature.asc
Description: OpenPGP digital signature


Re: Incremental checkpointing performance

2018-03-23 Thread Nico Kruber
 76 ms).
> --
> I did not get "Asynchronous RocksDB snapshot ..." kind of message in the
> logs. Even if I have changed the state backend properties in the
> flink-conf.yaml file the log message remained the same. I think there is
> some issue with detecting the correct state back end.
> 
> Regarding the following sentence,
> 
> Other than that, from what I know about it (Stefan (cc'd), correct me if
> I'm wrong), incremental checkpoints only do hard links locally to the
> changed sst files and then copy the data in there to the checkpoint
> store (the path you gave). A full checkpoint must copy all current data.
> If, between two checkpoints, you write more data than the contents of
> the database, e.g. by updating a key multiple times, you may indeed have
> more data to store. Judging from the state sizes you gave, this is
> probably not the case.
> 
> 
> I have used the average checkpoint size in the charts which was obtained
> through the Flink dashboard. I hope the values in the Flink dashboard
> shows the holistic accurate view of the checkpoint sizes. If not, could
> you please explain how to measure the size of an incremental checkpoint
> in Flink?
> 
> 
> Thanks,
> Miyuru
> 
> On Monday, 19 March 2018, 19:46:36 GMT+5:30, Nico Kruber
>  wrote:
> 
> 
> Hi Miyuru,
> Indeed, the behaviour you observed sounds strange and kind of go against
> the results Stefan presented in [1]. To see what is going on, can you
> also share your changes to Flink's configuration, i.e. flink-conf.yaml?
> 
> Let's first make sure you're really comparing RocksDBStateBackend with
> vs without incremental checkpoints:
> - if you remove this from the code:
>     env.setStateBackend(new RocksDBStateBackend(
>           new FsStateBackend("file:///home/ubuntu/tmp-flink-rocksdb"),
>           true));
> then you will end up with the state backend configured via the
> "state.backend" property. Was this set to "rocksdb"? Alternatively, you
> can set the second parameter to the RocksDBStateBackend constructor to
> false to get the right back-end.
> 
> You can also verify the values you see from the web interface by looking
> into the logs (at INFO level). There, you should see reports like this:
> "Asynchronous RocksDB snapshot (..., asynchronous part) in thread ...
> took ... ms." and "Asynchronous RocksDB snapshot (..., synchronous part)
> in thread ... took ... ms."
> 
> Other than that, from what I know about it (Stefan (cc'd), correct me if
> I'm wrong), incremental checkpoints only do hard links locally to the
> changed sst files and then copy the data in there to the checkpoint
> store (the path you gave). A full checkpoint must copy all current data.
> If, between two checkpoints, you write more data than the contents of
> the database, e.g. by updating a key multiple times, you may indeed have
> more data to store. Judging from the state sizes you gave, this is
> probably not the case.
> 
> 
> Let's get started with this and see whether there is anything unusual.
> 
> 
> Regards,
> Nico
> 
> 
> [1]
> https://berlin.flink-forward.org/kb_sessions/a-look-at-flinks-internal-data-structures-and-algorithms-for-efficient-checkpointing/
> 
> On 19/03/18 05:25, Miyuru Dayarathna wrote:
>> Hi,
>>
>> We did a performance test of Flink's incremental checkpointing to
>> measure the average time it takes to create a checkpoint and the average
>> checkpoint file size. We did this test on a single computer in order to
>> avoid the latencies introduced by network communication. The computer
>> had Intel® Core™ i7-7600U CPU @ 2.80GHz, 4 cores, 16GB RAM, 450GB HDD,
>> 101GB free SSD space. The computer was running on Ubuntu 16.04
>> LTS, Apache Flink 1.4.1, Java version "1.8.0_152", Java(TM) SE Runtime
>> Environment (build 1.8.0_152-b16), Java HotSpot(TM) 64-Bit Server VM
>> (build 25.152-b16, mixed mode). We used the JVM flags -Xmx8g -Xms8g. The
>> test was run for 40 minutes.
>>
>> The Flink application we used is as follows,
>>
> //-
>> public class LengthWindowIncrementalCheckpointing {
>>     private static DataStream>
>> inputStream = null;
>>     private static final int PARALLELISM = 1;
>>     private static final int timeoutMill

Re: How does setMaxParallelism work

2018-03-28 Thread Nico Kruber
Hi James,
the number of subtasks being used is defined by the parallelism, the max
parallelism, however, "... determines the maximum parallelism to which
you can scale operators" [1]. That is, once set, you cannot ever (even
after restarting your program from a savepoint) increase the operator's
parallelism above this value. The actual parallelism can be set per job
in your program but also in the flink client:
flink run -p   


Nico



[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly

On 28/03/18 09:25, Data Engineer wrote:
> I have a sample application that reads around 2 GB of csv files,
> converts each record into Avro object and sends it to kafka.
> I use a custom FileReader that reads the files in a directory.
> I have set taskmanager.numberOfTaskSlots to 4.
> I see that if I use setParallelism(3), 3 subtasks are created. But if I
> use setMaxParallelism(3), only 1 subtask is created.
> 
> On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke  <mailto:jornfra...@gmail.com>> wrote:
> 
> What was the input format, the size and the program that you tried
> to execute
> 
> On 28. Mar 2018, at 08:18, Data Engineer  <mailto:dataenginee...@gmail.com>> wrote:
> 
>> I went through the explanation on MaxParallelism in the official
>> docs here:
>> 
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>> 
>> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
>>
>> However, I am not able to figure out how Flink decides the
>> parallelism value.
>> For instance, if I setMaxParallelism to 3, I see that for my job,
>>     there is only 1 subtask that is created. How did Flink decide that
>> 1 subtask was enough?
>>
>> Regards,
>> James
> 
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: timeWindow emits records before window ends?

2018-03-28 Thread Nico Kruber
Hi Alex,
If you don't set an offset for the Window, it will be aligned with epoch
[1], i.e. assume you started your program at time 00:02:18, then the
window by default starts 00:00:00 and ends 00:02:59.999 and you will
emit records 42 after you started your program.
If you need the window to count 3 minutes from any other time, then
please refer to using TumblingEventTimeWindows#of(Time size, Time offset).


Nico


[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#tumbling-windows

On 27/03/18 16:22, NEKRASSOV, ALEXEI wrote:
> Hello,
> 
>  
> 
> With time characteristic set to IngestionTime I expected
> “timeWindow(Time.minutes(3))” to NOT produce any records in the first 3
> minutes of running the job, and yet it does emit the record before 3
> minutes elapse.
> 
> Am I doing something wrong? Or my understanding of timeWindow is incorrect?
> 
>  
> 
> For example, in Flink UI I see:
> 
>  
> 
> TriggerWindow(TumblingEventTimeWindows(18),
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@7c810ef9,
> aggFunction=nextgen.McdrAggregator@7d7758be}, EventTimeTrigger(),
> WindowedStream.aggregate(WindowedStream.java:752)) -> Map
> 
>  
> 
> With “duration” 42s and “records sent” 689516.
> 
>  
> 
> I expected no records would be sent out until 18 ms elapse.
> 
>  
> 
> Thanks,
> 
> Alex Nekrassov
> 
> nekras...@att.com <mailto:nekras...@att.com>
> 
>  
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: How does setMaxParallelism work

2018-03-28 Thread Nico Kruber
Flink does not decide the parallelism based on your job.
There is a default parallelism (configured via parallelism.default [1],
by default 1) which is used if you do not specify it yourself.


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options

On 28/03/18 13:21, Data Engineer wrote:
> Agreed. But how did Flink decide that it should allot 1 subtask? Why not
> 2 or 3?
> I am trying to understand the implications of using setMaxParallelism vs
> setParallelism
> 
> On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber  <mailto:n...@data-artisans.com>> wrote:
> 
> Hi James,
> the number of subtasks being used is defined by the parallelism, the max
> parallelism, however, "... determines the maximum parallelism to which
> you can scale operators" [1]. That is, once set, you cannot ever (even
> after restarting your program from a savepoint) increase the operator's
> parallelism above this value. The actual parallelism can be set per job
> in your program but also in the flink client:
> flink run -p   
> 
> 
> Nico
> 
> 
> 
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
> 
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
> 
> On 28/03/18 09:25, Data Engineer wrote:
> > I have a sample application that reads around 2 GB of csv files,
> > converts each record into Avro object and sends it to kafka.
> > I use a custom FileReader that reads the files in a directory.
> > I have set taskmanager.numberOfTaskSlots to 4.
> > I see that if I use setParallelism(3), 3 subtasks are created. But if I
> > use setMaxParallelism(3), only 1 subtask is created.
> >
> > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke  <mailto:jornfra...@gmail.com>
> > <mailto:jornfra...@gmail.com <mailto:jornfra...@gmail.com>>> wrote:
> >
> >     What was the input format, the size and the program that you tried
> >     to execute
> >
> >     On 28. Mar 2018, at 08:18, Data Engineer  <mailto:dataenginee...@gmail.com>
> >     <mailto:dataenginee...@gmail.com 
> <mailto:dataenginee...@gmail.com>>> wrote:
> >
> >>     I went through the explanation on MaxParallelism in the official
> >>     docs here:
> >>     
> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
> 
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
> >>     
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
> 
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>>
> >>
> >>     However, I am not able to figure out how Flink decides the
> >>     parallelism value.
> >>     For instance, if I setMaxParallelism to 3, I see that for my job,
> >>     there is only 1 subtask that is created. How did Flink decide that
> >>     1 subtask was enough?
> >>
> >>     Regards,
> >>     James
> >
> >
> 
> --
> Nico Kruber | Software Engineer
> data Artisans
> 
> Follow us @dataArtisans
>     --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
> 
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: Master and Slave files

2018-03-28 Thread Nico Kruber
If you refer to the files under the conf folder, these are only used by
the standalone cluster startup scripts, i.e. bin/start-cluster.sh and
bin/stop-cluster.sh


Nico

On 28/03/18 12:27, Alexander Smirnov wrote:
> Hi,
> 
> are the files needed only on cluster startup stage?
> are they only used by bash scripts?
> 
> Alex

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: All sub-tasks stuck in CREATED state after "BlobServerConnection: GET operation failed"

2018-03-29 Thread Nico Kruber
My launch command is basically:
> 
>     flink-${FLINK_VERSION}/bin/flink run -m yarn-cluster -yn
> ${NODE_COUNT} -ys ${SLOT_COUNT} -yjm
> ${JOB_MANAGER_MEMORY} -ytm ${TASK_MANAGER_MEMORY} -yst
> -yD restart-strategy=fixed-delay -yD
> restart-strategy.fixed-delay.attempts=3 -yD
> "restart-strategy.fixed-delay.delay=30 s" -p
> ${PARALLELISM} $@
> 
> 
> I'm also setting this to fix some classloading error
> (with the previous build that still works)
> -yD.classloader.resolve-order=parent-first
> 
> 
> Cluster was AWS EMR, release 5.12.0.
> 
> Thanks.
> 
> 
> 
> 
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: How does setMaxParallelism work

2018-03-30 Thread Nico Kruber
No, currently, this it is up to you to decide whether you need to scale
and how. If, for a running Flink job, you decide to scale, you
- flink cancel --withSavepoint  
- flink run -p  --fromSavepoint 
 


Nico

On 29/03/18 19:27, NEKRASSOV, ALEXEI wrote:
> Is there an auto-scaling feature in Flink, where I start with parallelism of 
> (for example) 1, but Flink notices I have high volume of data to process, and 
> automatically increases parallelism of a running job?
> 
> Thanks,
> Alex
> 
> -Original Message-
> From: Nico Kruber [mailto:n...@data-artisans.com] 
> Sent: Wednesday, March 28, 2018 8:54 AM
> To: Data Engineer 
> Cc: Jörn Franke ; user@flink.apache.org
> Subject: Re: How does setMaxParallelism work
> 
> Flink does not decide the parallelism based on your job.
> There is a default parallelism (configured via parallelism.default [1], by 
> default 1) which is used if you do not specify it yourself.
> 
> 
> Nico
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options
> 
> On 28/03/18 13:21, Data Engineer wrote:
>> Agreed. But how did Flink decide that it should allot 1 subtask? Why 
>> not
>> 2 or 3?
>> I am trying to understand the implications of using setMaxParallelism 
>> vs setParallelism
>>
>> On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber > <mailto:n...@data-artisans.com>> wrote:
>>
>> Hi James,
>> the number of subtasks being used is defined by the parallelism, the max
>> parallelism, however, "... determines the maximum parallelism to which
>> you can scale operators" [1]. That is, once set, you cannot ever (even
>> after restarting your program from a savepoint) increase the operator's
>> parallelism above this value. The actual parallelism can be set per job
>> in your program but also in the flink client:
>> flink run -p   
>>
>>
>> Nico
>>
>>
>>
>> [1]
>> 
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>> 
>> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production
>> _ready.html#set-maximum-parallelism-for-operators-explicitly>
>>
>> On 28/03/18 09:25, Data Engineer wrote:
>> > I have a sample application that reads around 2 GB of csv files,
>> > converts each record into Avro object and sends it to kafka.
>> > I use a custom FileReader that reads the files in a directory.
>> > I have set taskmanager.numberOfTaskSlots to 4.
>> > I see that if I use setParallelism(3), 3 subtasks are created. But if I
>> > use setMaxParallelism(3), only 1 subtask is created.
>> >
>> > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke > <mailto:jornfra...@gmail.com>
>> > <mailto:jornfra...@gmail.com <mailto:jornfra...@gmail.com>>> wrote:
>> >
>> >     What was the input format, the size and the program that you tried
>> >     to execute
>> >
>> >     On 28. Mar 2018, at 08:18, Data Engineer > <mailto:dataenginee...@gmail.com>
>> >     <mailto:dataenginee...@gmail.com 
>> <mailto:dataenginee...@gmail.com>>> wrote:
>> >
>> >>     I went through the explanation on MaxParallelism in the official
>> >>     docs here:
>> >>     
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>> 
>> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
>> >>     
>> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>> 
>> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>>
>> >>
>> >>     However, I am not able to figure out how Flink decides the
>> >>     parallelism value.
>> >>     For instance, if I setMaxParallelism to 3, I see that for my job,
>> >>     there is only 1 subtask that is created. How did Flink decide that
>> >>     1 subtask was enough?
>> >>
>> >>     Regards,
>> >>     James
>> >
>> >
>>
>> --
>> Nico Kruber | Software Engineer

Re: Reg. the checkpointing mechanism

2018-04-06 Thread Nico Kruber
Hi James,
The checkpoint coordinator at the JobManager is triggering the
checkpoints by inserting checkpoint barriers into the sources. These
will get to the TaskManagers via the same communication channels data is
flowing between them. Please refer to [1] for more details.


Nico


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
On 06/04/18 13:47, Data Engineer wrote:
> I have a Flink application (v 1.4.2) where I have enabled checkpointing
> with an interval of 200 ms.
> This is what I have configured in my driver program:
> 
>             env.enableCheckpointing(checkpointInterval,
> CheckpointingMode.EXACTLY_ONCE);
>             CheckpointConfig config = env.getCheckpointConfig();
>            
> config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> 
> In the JobManager logs, I see multiple entries saying "Checkpoint
> triggered".
> I would like to clarify whether the JobManager is triggering the
> checkpoint every 200 ms? Or does the JobManager only initiate the
> checkpointing and the TaskManager do it on its own?
> 
> Regards,
> James
> 
> 
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-24 Thread Nico Kruber
e.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>   at
>>>> 
>>>> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>   at
>>>> 
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>>>>   at
>>>> 
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>     
>>>> <http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
>>>>   at
>>>> 
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>>>>   at
>>>> 
>>>> org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
>>>>   at
>>>> 
>>>> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>>>>   at
>>>> 
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>>>>   at
>>>> 
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>>>>   at
>>>> 
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>   at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> *Number 4:*
>>>>
>>>> "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA
>>>> waiting
>>>>   java.lang.Thread.State: WAITING
>>>>   at sun.misc.Unsafe.park(Unsafe.java:-1)
>>>>   at
>>>> 
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>>>>   at
>>>> 
>>>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>>>>   at
>>>> 
>>>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>>>>   at
>>>> 
>>>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>>>>   at
>>>> 
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>>>   at
>>>> 
>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
>>>>   at
>>>> 
>>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
>>>>   - locked <0x23eb> (a java.lang.Object)
>>>>   at
>>>> 
>>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>>>>   at
>>>> 
>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
>>>>   at
>>>> org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>>>>   at
>>>> 
>>>> my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
>>>>   at my.package.algorithm.Sample.co
>>>> 
>>>> <http://my.package.algorithm.Sample.co>mputeApproximateDeltaFast(Sample.java:492)
>>>>   at my.package.algorithm.Sample.ru
>>>> <http://my.package.algorithm.Sample.ru>n(Sample.java:291).
>>>>   at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> While I realize these dumps on their own may not be
>>>> helpful, they at least (as far as I know) indicate
>>>> that the threads are all waiting on something.
>>>> But if it was resource scarcity I believe the
>>>> program would terminate with an exception.
>>>> And if it was garbage collection activity, I believe
>>>> the JVM process would not be at 0% CPU usage.
>>>>
>>>> *Note: *I realize I didn't provide the user-code
>>>> code that generates the execution plan for Flink
>>>> which led to the contexts in which the threads are
>>>> waiting, but I hope it may not be necessary.
>>>> My problem now is that I am unsure on how to proceed
>>>> to further debug this issue:
>>>> - The assigned memory is fully used, but there are
>>>> no exceptions about lack of memory.
>>>> - The CPU usage is at 0% and all threads are all in
>>>> a waiting state, but I don't understand what signal
>>>> they're waiting for exactly.
>>>>
>>>> Hoping anyone might be able to give me a hint.
>>>>
>>>> Thank you very much for your time.
>>>>
>>>> Best regards,
>>>>
>>>> Miguel E. Coimbra
>>>
>>
>>
> 
> 
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: Batch job stuck in Canceled state in Flink 1.5

2018-05-03 Thread Nico Kruber
GeneratedMethodAccessor107.invoke(Unknown Source)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >> > at java.lang.reflect.Method.invoke(Method.java:498)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> >> >> > at
> >> >> >
> >> >> >
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> >> >> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> >> >> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> >> >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> >> >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> >> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> >> >> > at
> >> >> >
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> >> > at
> >> >> >
> >> >> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> >> > at
> >> >> >
> >> >> >
> >> >> >
> 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> >> >
> >> >> >
> >> >> > Thanks
> >> >> > Amit
> >> >
> >> >
> >
> >
> 
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: Batch job stuck in Canceled state in Flink 1.5

2018-05-22 Thread Nico Kruber
Hi Amit,
thanks for providing the logs, I'll look into it. We currently have a
suspicion of this being caused by
https://issues.apache.org/jira/browse/FLINK-9406 which we found by
looking over the surrounding code. The RC4 has been cancelled since we
see this as a release blocker.

To rule out further errors, can you also provide logs for the task
manager producing partitions d6946b39439f10e8189322becf1b8887,
9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c81?
The task manager log you provided covers the task manager asking for
this partition only for which the job manager produces the
PartitionProducerDisposedException that you see.
I'm looking for the logs of task managers with the following execution
IDs in their logs:
- 2826f9d430e05e9defaa46f60292fa79
- 7ef992a067881a07409819e3aea32004
- ec923ce6d891d89cf6fecb5e3f5b7cc5

Regarding the operators being stuck: I'll have a further look into the
logs and state transition and will come back to you.


Nico


On 21/05/18 09:27, Amit Jain wrote:
> Hi All,
> 
> I totally missed this thread. I've encountered same issue in Flink
> 1.5.0 RC4. Please look over the attached logs of JM and impacted TM.
> 
> Job ID 390a96eaae733f8e2f12fc6c49b26b8b
> 
> --
> Thanks,
> Amit
> 
> On Thu, May 3, 2018 at 8:31 PM, Nico Kruber  wrote:
>> Also, please have a look at the other TaskManagers' logs, in particular
>> the one that is running the operator that was mentioned in the
>> exception. You should look out for the ID 98f5976716234236dc69fb0e82a0cc34.
>>
>>
>> Nico
>>
>>
>> PS: Flink logs files should compress quite nicely if they grow too big :)
>>
>> On 03/05/18 14:07, Stephan Ewen wrote:
>>> Google Drive would be great.
>>>
>>> Thanks!
>>>
>>> On Thu, May 3, 2018 at 1:33 PM, Amit Jain >> <mailto:aj201...@gmail.com>> wrote:
>>>
>>> Hi Stephan,
>>>
>>> Size of JM log file is 122 MB. Could you provide me other media to
>>> post the same? We can use Google Drive if that's fine with you.
>>>
>>> --
>>> Thanks,
>>> Amit
>>>
>>> On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen >> <mailto:se...@apache.org>> wrote:
>>> > Hi Amit!
>>> >
>>> > Thanks for sharing this, this looks like a regression with the
>>> network stack
>>> > changes.
>>> >
>>> > The log you shared from the TaskManager gives some hint, but that
>>> exception
>>> > alone should not be a problem. That exception can occur under a
>>> race between
>>> > deployment of some tasks while the whole job is entering a
>>> recovery phase
>>> > (maybe we should not print it so prominently to not confuse
>>> users). There
>>> > must be something else happening on the JobManager. Can you share
>>> the JM
>>> > logs as well?
>>> >
>>> > Thanks a lot,
>>> > Stephan
>>> >
>>> >
>>> > On Wed, May 2, 2018 at 12:21 PM, Amit Jain >> <mailto:aj201...@gmail.com>> wrote:
>>> >>
>>> >> Thanks! Fabian
>>> >>
>>> >> I will try using the current release-1.5 branch and update this
>>> thread.
>>> >>
>>> >> --
>>> >> Thanks,
>>> >> Amit
>>> >>
>>> >> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske >> <mailto:fhue...@gmail.com>> wrote:
>>> >> > Hi Amit,
>>> >> >
>>> >> > We recently fixed a bug in the network stack that affected
>>> batch jobs
>>> >> > (FLINK-9144).
>>> >> > The fix was added after your commit.
>>> >> >
>>> >> > Do you have a chance to build the current release-1.5 branch
>>> and check
>>> >> > if
>>> >> > the fix also resolves your problem?
>>> >> >
>>> >> > Otherwise it would be great if you could open a blocker issue
>>> for the
>>> >> > 1.5
>>> >> > release to ensure that this is fixed.
>>> >> >
>>> >> > Thanks,
>>> >> > Fabian
>>> >> >
>>> >> > 2018-04-29 18:

Re: PartitionNotFoundException after deployment

2018-06-05 Thread Nico Kruber
Hi Gyula,
as a follow-up, you may be interested in
https://issues.apache.org/jira/browse/FLINK-9413


Nico

On 04/05/18 15:36, Gyula Fóra wrote:
> Looks pretty clear that one operator takes too long to start (even on
> the UI it shows it in the created state for far too long). Any idea what
> might cause this delay? It actually often crashes on Akka ask timeout
> during scheduling the node.
> 
> Gyula
> 
> Piotr Nowojski  <mailto:pi...@data-artisans.com>> ezt írta (időpont: 2018. máj. 4., P,
> 15:33):
> 
> Ufuk: I don’t know why.
> 
> +1 for your other suggestions.
> 
> Piotrek
> 
> > On 4 May 2018, at 14:52, Ufuk Celebi  <mailto:u...@data-artisans.com>> wrote:
> >
> > Hey Gyula!
> >
> > I'm including Piotr and Nico (cc'd) who have worked on the network
> > stack in the last releases.
> >
> > Registering the network structures including the intermediate results
> > actually happens **before** any state is restored. I'm not sure why
> > this reproducibly happens when you restore state. @Nico, Piotr: any
> > ideas here?
> >
> > In general I think what happens here is the following:
> > - a task requests the result of a local upstream producer, but that
> > one has not registered its intermediate result yet
> > - this should result in a retry of the request with some backoff
> > (controlled via the config params you mention
> > taskmanager.network.request-backoff.max,
> > taskmanager.network.request-backoff.initial)
> >
> > As a first step I would set logging to DEBUG and check the TM logs for
> > messages like "Retriggering partition request {}:{}."
> >
> > You can also check the SingleInputGate code which has the logic for
> > retriggering requests.
> >
> > – Ufuk
> >
> >
> > On Fri, May 4, 2018 at 10:27 AM, Gyula Fóra  <mailto:gyula.f...@gmail.com>> wrote:
> >> Hi Ufuk,
> >>
> >> Do you have any quick idea what could cause this problems in
> flink 1.4.2?
> >> Seems like one operator takes too long to deploy and downstream
> tasks error
> >> out on partition not found. This only seems to happen when the job is
> >> restored from state and in fact that operator has some keyed and
> operator
> >> state as well.
> >>
> >> Deploying the same job from empty state works well. We tried
> increasing the
> >> taskmanager.network.request-backoff.max that didnt help.
> >>
> >> It would be great if you have some pointers where to look
> further, I havent
> >> seen this happening before.
> >>
> >> Thank you!
> >> Gyula
> >>
> >> The errror:
> >> org.apache.flink.runtime.io
> <http://org.apache.flink.runtime.io>.network.partition.: Partition
> >> 4c5e9cd5dd410331103f51127996068a@b35ef4ffe25e3d17c5d6051ebe2860cd
> not found.
> >>    at
> >> org.apache.flink.runtime.io
> 
> <http://org.apache.flink.runtime.io>.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
> >>    at
> >> org.apache.flink.runtime.io
> 
> <http://org.apache.flink.runtime.io>.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:115)
> >>    at
> >> org.apache.flink.runtime.io
> 
> <http://org.apache.flink.runtime.io>.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:159)
> >>    at java.util.TimerThread.mainLoop(Timer.java:555)
> >>    at java.util.TimerThread.run(Timer.java:505)
> >
> >
> >
> > --
> > Data Artisans GmbH | Stresemannstr. 121a | 10963 Berlin
> >
> > i...@data-artisans.com <mailto:i...@data-artisans.com>
> > +49-30-43208879 
> >
> > Registered at Amtsgericht Charlottenburg - HRB 158244 B
> > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: Queryable State

2017-01-13 Thread Nico Kruber
Hi Dawid,
I'll try to reproduce the error in the next couple of days. Can you also share 
the value deserializer you use? Also, have you tried even smaller examples in 
the meantime? Did they work?

As a side-note in general regarding the queryable state "sink" using ListState 
(".asQueryableState(, ListStateDescriptor)"): everything that enters 
this operator will be stored forever and never cleaned. Eventually, it will 
pile up too much memory and is thus of limited use. Maybe it should even be 
removed from the API.


Nico

On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> Hey Ufuk.
> Did you maybe had a while to have a look at that problem?
> 
> 2017-01-09 10:47 GMT+01:00 Ufuk Celebi :
> > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > the course of the day. From a first impression, this seems like a bug
> > to me.
> > 
> > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > 
> >  wrote:
> > > Hi I was experimenting with the Query State feature and I have some
> > 
> > problems
> > 
> > > querying the state.
> > > 
> > > The code which I use to produce the queryable state is:
> > > env.addSource(kafkaConsumer).map(
> > > 
> > >   e => e match {
> > >   
> > > case LoginClickEvent(_, t) => ("login", 1, t)
> > > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > >   
> > >   }).keyBy(0).timeWindow(Time.seconds(1))
> > >   .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > >   e2._3)))
> > >   .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
> > >   .keyBy("key")
> > >   .asQueryableState(
> > >   
> > > "type-time-series-count",
> > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > > 
> > >   "type-time-series-count",
> > >   classOf[KeyedDataPoint[java.lang.Integer]]))
> > > 
> > > As you see it is a rather simple job, in which I try to count events of
> > > different types in windows and then query by event type.
> > > 
> > > In client code I do:
> > > // Query Flink state
> > > val future = client.getKvState(jobId, "type-time-series-count",
> > > 
> > > key.hashCode, seralizedKey)
> > > 
> > > // Await async result
> > > val serializedResult: Array[Byte] = Await.result(
> > > 
> > >   future, new FiniteDuration(
> > >   
> > > 10,
> > > duration.SECONDS))
> > > 
> > > // Deserialize response
> > > val results = deserializeResponse(serializedResult)
> > > 
> > > results
> > >   
> > >   }
> > > 
> > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > util.List[KeyedDataPoint[lang
> > > 
> > >   .Integer]] = {
> > >   
> > > KvStateRequestSerializer.deserializeList(serializedResult,
> > > 
> > > getValueSerializer())
> > > 
> > >   }
> > > 
> > > As I was trying to debug the issue I see the first element in list gets
> > > deserialized correctly, but it fails on the second one. It seems like
> > > the
> > > serialized result is broken. Do you have any idea if I am doing sth
> > 
> > wrong or
> > 
> > > there is some bug?
> > > 
> > > 
> > > The exception I get is:
> > > java.io.EOFException: null
> > > at
> > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > 
> > DataInputDeserializer.java:157)
> > 
> > > at
> > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > 
> > DataInputDeserializer.java:240)
> > 
> > > at
> > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(
> > 
> > PojoSerializer.java:386)
> > 
> > > at
> > > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.
> > 
> > deserializeList(KvStateRequestSerializer.java:487)
> > 
> > > at
> > > com.dataartisans.stateserver.queryclient.QueryClient.
> > 
> > deserializeResponse(QueryClient.scala:44)
> > 
> > > You can browse the exact code at: https://github.com/dawidwys/
> > 
> > flink-intro
> > 
> > > I would be grateful for any advice.
> > > 
> > > Regards
> > > Dawid Wysakowicz


signature.asc
Description: This is a digitally signed message part.


Re: Queryable State

2017-01-16 Thread Nico Kruber
Hi Dawid,
regarding the original code, I couldn't reproduce this with the Java code I 
wrote and my guess is that the second parameter of the ListStateDescriptor is 
wrong:

  .asQueryableState(
"type-time-series-count",
new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
  "type-time-series-count",
  classOf[KeyedDataPoint[java.lang.Integer]]))

this should rather be 

TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}

as in the query itself. It sounds strange to me that you don't get ant 
ClassCastException or a compile-time error due to the type being wrong but I 
lack some Scala knowledge to get to the ground of this.


Regarding the removal of the queryable list state "sink", I created a JIRA 
issue for it and will open a PR:
https://issues.apache.org/jira/browse/FLINK-5507


Nico

On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> Hi Nico,
> 
> Recently I've tried the queryable state a bit differently, by using
> ValueState with a value of a util.ArrayList and a ValueSerializer for
> util.ArrayList and it works as expected.
> 
> The non-working example you can browse here:
> https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a7
> 0a6fe2219 The working example here:
> https://github.com/dawidwys/flink-intro/tree/master
> (The QueryableJob is in module flink-queryable-job and the QueryClient in
> flink-state-server)
> 
> Sure, I am aware of the downfall of the ListState. I need it just for
> presentational purpose, but you may be right there might not be any
> production use for this state and it should be removed.
> Maybe the problem is just with the ListState and removing it would resolve
> also my problem :)
> 
> Regards
> Dawid Wysakowicz
> 
> 2017-01-13 18:50 GMT+01:00 Nico Kruber :
> > Hi Dawid,
> > I'll try to reproduce the error in the next couple of days. Can you also
> > share
> > the value deserializer you use? Also, have you tried even smaller examples
> > in
> > the meantime? Did they work?
> > 
> > As a side-note in general regarding the queryable state "sink" using
> > ListState
> > (".asQueryableState(, ListStateDescriptor)"): everything that enters
> > this operator will be stored forever and never cleaned. Eventually, it
> > will
> > pile up too much memory and is thus of limited use. Maybe it should even
> > be
> > removed from the API.
> > 
> > 
> > Nico
> > 
> > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > Hey Ufuk.
> > > Did you maybe had a while to have a look at that problem?
> > > 
> > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi :
> > > > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > > > the course of the day. From a first impression, this seems like a bug
> > > > to me.
> > > > 
> > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > > 
> > > >  wrote:
> > > > > Hi I was experimenting with the Query State feature and I have some
> > > > 
> > > > problems
> > > > 
> > > > > querying the state.
> > > > > 
> > > > > The code which I use to produce the queryable state is:
> > > > > env.addSource(kafkaConsumer).map(
> > > > > 
> > > > >   e => e match {
> > > > >   
> > > > > case LoginClickEvent(_, t) => ("login", 1, t)
> > > > > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > > case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > > >   
> > > > >   }).keyBy(0).timeWindow(Time.seconds(1))
> > > > >   .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > > >   e2._3)))
> > > > >   .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3,
> > 
> > e._2))
> > 
> > > > >   .keyBy("key")
> > > > >   .asQueryableState(
> > > > >   
> > > > > "type-time-series-count",
> > > > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > > > > 
> > > > >   "type-time-series-count",
> > > > >   classOf[KeyedDataPoint[java.lang.Integer]]))
> > > > > 
> > > > > As you see it is a rather simple job, in wh

Re: Queryable State

2017-01-25 Thread Nico Kruber
Hi Dawid,
sorry for the late reply, I was fixing some issues for queryable state and may 
now have gotten to the point of your error: you may be seeing a race condition 
with the MemoryStateBackend state backend (the default) as described here:
https://issues.apache.org/jira/browse/FLINK-5642
I'm currently working on a fix.

KvStateRequestSerializer#deserializeList(), however, is the right function to 
de-serialise list state! - KvStateRequestSerializer#deserializeValue() will 
not work!

Thanks for the tip regarding KvStateRequestSerializer#serializeList, this was 
indeed not used since the list state backends had their own serialisation 
function.
We removed KvStateRequestSerializer#serializeList as well as the queryable 
list state sink for 1.2 and up.


Nico

On Monday, 16 January 2017 14:47:59 CET Dawid Wysakowicz wrote:
> Hi Nico, Ufuk,
> 
> Thanks for diving into this issue.
> 
> @Nico
> 
> I don't think that's the problem. The code can be exactly reproduced in
> java. I am using other constructor for ListDescriptor than you did:
> 
> You used:
> > public ListStateDescriptor(String name, TypeInformation typeInfo)
> 
> While I used:
> >  public ListStateDescriptor(String name, Class typeClass)
> 
> I think the problem is with the way I deserialized the value on the
> QueryClient side as I tried to use:
> 
> 
> 
> KvStateRequestSerializer.deserializeList(serializedResult, {
> 
>   TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {})
> 
> .createSerializer(new ExecutionConfig)
> 
> })
> 
> I have not checked it, but now I suspect this code would work:
> > KvStateRequestSerializer.deserializeValue(serializedResult, {
> > 
> >   TypeInformation.of(new
> > 
> > TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
> > 
> > .createSerializer(new ExecutionConfig)
> > 
> > })
> 
> Regarding removing the queryable state list I agree, using it seems
> pointless. Moreover while removing it I would take a second look at those
> 
> functions:
> > KvStateRequestSerializer::deserializeList
> 
>  KvStateRequestSerializer.serializeList
> 
> 
> As I think they are not used at all even right now. Thanks for your time.
> 
> Regards
> Dawid Wysakowicz
> 
> 2017-01-16 13:25 GMT+01:00 Nico Kruber :
> > Hi Dawid,
> > regarding the original code, I couldn't reproduce this with the Java code
> > I
> > wrote and my guess is that the second parameter of the ListStateDescriptor
> > is
> > 
> > wrong:
> >   .asQueryableState(
> >   
> > "type-time-series-count",
> > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > 
> >   "type-time-series-count",
> >   classOf[KeyedDataPoint[java.lang.Integer]]))
> > 
> > this should rather be
> > 
> > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
> > 
> > as in the query itself. It sounds strange to me that you don't get ant
> > ClassCastException or a compile-time error due to the type being wrong but
> > I
> > lack some Scala knowledge to get to the ground of this.
> > 
> > 
> > Regarding the removal of the queryable list state "sink", I created a JIRA
> > issue for it and will open a PR:
> > https://issues.apache.org/jira/browse/FLINK-5507
> > 
> > 
> > Nico
> > 
> > On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > > Hi Nico,
> > > 
> > > Recently I've tried the queryable state a bit differently, by using
> > > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > > util.ArrayList and it works as expected.
> > > 
> > > The non-working example you can browse here:
> > > https://github.com/dawidwys/flink-intro/tree/
> > 
> > c66f01117b0fe3c0adc8923000543a7
> > 
> > > 0a6fe2219 The working example here:
> > > https://github.com/dawidwys/flink-intro/tree/master
> > > (The QueryableJob is in module flink-queryable-job and the QueryClient
> > > in
> > > flink-state-server)
> > > 
> > > Sure, I am aware of the downfall of the ListState. I need it just for
> > > presentational purpose, but you may be right there might not be any
> > > production use for this state and it should be removed.
> > > Maybe the problem is just with the ListState and removing it would
> > 
> > resolve
> > 
> > > also my problem :)
> > > 
> > > Regards
> > > Dawid Wy

Re: Inconsistent Results in Event Time based Sliding Window processing

2017-02-14 Thread Nico Kruber
Hi Sujit,
this does indeed sound strange and we are not aware of any data loss issues.
Are there any exceptions or other errors in the job/taskmanager logs?
Do you have a minimal working example? Is it that whole windows are not 
processed or just single items inside a window?


Nico

On Tuesday, 14 February 2017 16:57:31 CET Sujit Sakre wrote:
> Hi,
> 
> I have been using Flink 1.1.1 with Kafka 0.9 to process real time streams.
> We have written our own Window Function and are processing data with
> Sliding Windows. We are using Event Time and use a custom watermark
> generator.
> 
> We select a particular window out of multiple sliding windows and process
> all items in the window in an iterative loop to increment counts of the
> items selected. After this we call the sink method to log the result in a
> database.
> 
> This is working fine most of the times, i.e. it produces the expected
> result most of the times. However there are situations when certain windows
> are not processed (under same test conditions), this means the results are
> less than expected i.e. the counts are less than expected. Sometimes, items
> instead of certain windows not getting processed, certain items do not get
> processed in the window. This is unpredictable.
> 
> I wish to know what could be the cause of this inconsistent behaviour. How
> do we resolve it. We have integrated 1.2 and Kafka 0.10 now, the problem
> persists.
> 
> Please could you suggest about what the problem could be and how to resolve
> this.
> 
> Many thanks.
> 
> 
> *Sujit Sakre*



signature.asc
Description: This is a digitally signed message part.


Re: Missing test dependency in Eclipse with Flink 1.2.0

2017-02-14 Thread Nico Kruber
You do not require a plugin, but most probably this dependency was not fetched 
by Eclipse. Please try a "mvn clean package" in your project and see whether 
this helps Eclipse.

Also, you may create a clean test project with

mvn archetype:generate   \
  -DarchetypeGroupId=org.apache.flink  \
  -DarchetypeArtifactId=flink-quickstart-java  \
  -DarchetypeVersion=1.2.0

for which I could not find any dependency issues using Eclipse.

Regards,
Nico

On Tuesday, 14 February 2017 14:17:10 CET Flavio Pompermaier wrote:
> Hi to all,
> I've tried to migrate to Flink 1.2.0 and now my Eclipse projects says that
> they can't find *apacheds-jdbm1* that has packaging bundle. Should I
> install some plugin?
> 
> Best,
> Flavio



signature.asc
Description: This is a digitally signed message part.


Re: Inconsistent Results in Event Time based Sliding Window processing

2017-02-14 Thread Nico Kruber
Hmm, without any exceptions in the logs, I'd say that you may be on the right 
track with elements arriving with timestamps older than the last watermark.

You may play around with allowed lateness
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
windows.html#allowed-lateness
to see if this is actually the case.


Nico

On Tuesday, 14 February 2017 19:39:46 CET Sujit Sakre wrote:
> Hi Nico,
> 
> Thanks for the reply.
> 
> There are no exceptions or other errors in the job/task manager logs. I am
> running this example from Eclipse IDE with Kafka and Zookeeper running
> separately; in the console there are no errors shown while processing.
> Previously, we were missing some windows due to watermark exceeding the
> upcoming data timestamps, however, that is not the case anymore.
> 
> There is a working example. I will share the code and data with you
> separately on your email ID.
> 
> The results of processing are of three types:
> 1) Complete set of results is obtained without any missing calculations
> 2) A few windows (from 1 or 2 to more) are missing uniformly from
> calculations
> 3) In a particular window, only selective data is missing, whereas other
> data is processed accurately
> 
> These results are for the same set of inputs under same processing steps.
> 
> This is not predictable, and makes identification of error difficult, as
> sometimes it works i.e. results of pattern #1 and sometimes results of
> pattern #2 or #3 (#3 occurs less frequently, however it does take place.
> 
> 
> 
> 
> *Sujit Sakre*
> 
> Senior Technical Architect
> Tel: +91 22 6660 6600
> Ext:
> 247
> Direct: 6740 5247
> 
> Mobile: +91 98672 01204
> 
> www.rave-tech.com
> 
> 
> 
> Follow us on: Twitter <https://twitter.com/Rave_Tech> / LinkedIn
> <https://in.linkedin.com/in/ravetechnologies> / YouTube
> <https://www.youtube.com/channel/UCTaO1am-cm4FqnQCGdB6ExA>
> 
> 
> 
> Rave Technologies – A Northgate Public Services Company
> <https://www.google.co.in/maps/place/Rave+Technologies/@19.0058078,72.823516
> ,17z/data=!3m1!4b1!4m5!3m4!1s0x3bae17fcde71c3b9:0x1e2a8c0c4a075145!8m2!3d19.
> 0058078!4d72.8257047>
> 
> 
> 
> Please consider the environment before printing this email
> 
> On 14 February 2017 at 18:55, Nico Kruber  wrote:
> > Hi Sujit,
> > this does indeed sound strange and we are not aware of any data loss
> > issues.
> > Are there any exceptions or other errors in the job/taskmanager logs?
> > Do you have a minimal working example? Is it that whole windows are not
> > processed or just single items inside a window?
> > 
> > 
> > Nico
> > 
> > On Tuesday, 14 February 2017 16:57:31 CET Sujit Sakre wrote:
> > > Hi,
> > > 
> > > I have been using Flink 1.1.1 with Kafka 0.9 to process real time
> > 
> > streams.
> > 
> > > We have written our own Window Function and are processing data with
> > > Sliding Windows. We are using Event Time and use a custom watermark
> > > generator.
> > > 
> > > We select a particular window out of multiple sliding windows and
> > > process
> > > all items in the window in an iterative loop to increment counts of the
> > > items selected. After this we call the sink method to log the result in
> > > a
> > > database.
> > > 
> > > This is working fine most of the times, i.e. it produces the expected
> > > result most of the times. However there are situations when certain
> > 
> > windows
> > 
> > > are not processed (under same test conditions), this means the results
> > 
> > are
> > 
> > > less than expected i.e. the counts are less than expected. Sometimes,
> > 
> > items
> > 
> > > instead of certain windows not getting processed, certain items do not
> > 
> > get
> > 
> > > processed in the window. This is unpredictable.
> > > 
> > > I wish to know what could be the cause of this inconsistent behaviour.
> > 
> > How
> > 
> > > do we resolve it. We have integrated 1.2 and Kafka 0.10 now, the problem
> > > persists.
> > > 
> > > Please could you suggest about what the problem could be and how to
> > 
> > resolve
> > 
> > > this.
> > > 
> > > Many thanks.
> > > 
> > > 
> > > *Sujit Sakre*



signature.asc
Description: This is a digitally signed message part.


Re: Missing test dependency in Eclipse with Flink 1.2.0

2017-02-14 Thread Nico Kruber
I did some digging and this is actually documented in FLINK-4813 [1].

To work around this issue, add the following plugin to your build plugins:



org.apache.felix
maven-bundle-plugin
3.0.1
true
true



Nico

[1] https://issues.apache.org/jira/browse/FLINK-4813

On Tuesday, 14 February 2017 17:45:12 CET Flavio Pompermaier wrote:
> Hi Nico,
> thanks for the response. The problem is that I don't use the quickstart
> example.
> I have a working set of jobs (in Flink 1.1.4) with some unit tests.
> In the unit tests I use the following dependency that causes the problem:
> 
>
> org.apache.flink
> flink-test-utils_2.10
> 1.2.0
> test-jar
> test
> 
> 
> Best,
> Flavio
> 
> On Tue, Feb 14, 2017 at 2:51 PM, Nico Kruber  wrote:
> > You do not require a plugin, but most probably this dependency was not
> > fetched
> > by Eclipse. Please try a "mvn clean package" in your project and see
> > whether
> > this helps Eclipse.
> > 
> > Also, you may create a clean test project with
> > 
> > mvn archetype:generate   \
> > 
> >   -DarchetypeGroupId=org.apache.flink  \
> >   -DarchetypeArtifactId=flink-quickstart-java  \
> >   -DarchetypeVersion=1.2.0
> > 
> > for which I could not find any dependency issues using Eclipse.
> > 
> > Regards,
> > Nico
> > 
> > On Tuesday, 14 February 2017 14:17:10 CET Flavio Pompermaier wrote:
> > > Hi to all,
> > > I've tried to migrate to Flink 1.2.0 and now my Eclipse projects says
> > 
> > that
> > 
> > > they can't find *apacheds-jdbm1* that has packaging bundle. Should I
> > > install some plugin?
> > > 
> > > Best,
> > > Flavio



signature.asc
Description: This is a digitally signed message part.


Re: Unable to use Scala's BeanProperty with classes

2017-02-14 Thread Nico Kruber
Hi Adarsh,
thanks for reporting this. It should be fixed eventually.

@Timo: do you have an idea for a work-around or quick-fix?


Regards
Nico

On Tuesday, 14 February 2017 21:11:21 CET Adarsh Jain wrote:
> I am getting the same problem when trying to do FlatMap operation on my
> POJO class.
> 
> Exception in thread "main" java.lang.IllegalStateException: Detected
> more than one setter
> 
> 
> 
> Am using Flink 1.2, the exception is coming when using FlatMap
> 
> https://issues.apache.org/jira/browse/FLINK-5070



signature.asc
Description: This is a digitally signed message part.


Re: Running streaming job on every node of cluster

2017-02-27 Thread Nico Kruber
Hi Evgeny,
I tried to reproduce your example with the following code, having another 
console listening with "nc -l 12345"

env.setParallelism(2);
env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3))
.map(new MapFunction() {
@Override
public String map(final String s) throws Exception { 
return s; }
})
.addSink(new DiscardingSink());

This way, I do get a source with parallelism 1 and map & sink with parallelism 
2 and the whole program accompanying 2 slots as expected. You can check in the 
web interface of your cluster how many slots are taken after executing one 
instance of your program.

How do you set your parallelism?


Nico

On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote:
> Hi,
> 
> I have the simplest streaming job, and I want to distribute my job on every
> node of my Flink cluster.
> 
> Job is simple:
> 
> source (SocketTextStream) -> map -> sink (AsyncHtttpSink).
> 
> When I increase parallelism of my job when deploying or directly in code, no
> effect because source is can't work in parallel. Now I reduce "Tasks Slots"
> to 1 on ever nodes and deploy my job as many times as nodes in the cluster.
> It works when I have only one job. If I want deploy another in parallel
> there is no free slots. I hope more convenient way to do that is exists.
> Thanks.
> 
> BR,
> Evgeny



signature.asc
Description: This is a digitally signed message part.


Re: Sliding Windows Processing problem with Kafka Queue Event Time and BoundedOutOfOrdernessGenerator

2017-02-27 Thread Nico Kruber
Hi Sujit,
actually, according to
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
windows.html#allowed-lateness
the sliding window should fire each time for each element arriving late.

Did you set the following for your window operator?
.window()
.allowedLateness()

The expected behaviour should be:
1) fire once a watermark exceeding the sliding window end has been received
2) fire each time an event after the sliding window end is received as long as 
no watermark is exceeding sliding window end + allowed lateness

In your case, if I see it correctly and do not misinterpret your example, the 
late firing has nothing to do with the sliding window but rather your 
BoundedOutOfOrdernessGenerator:
Since a sliding window is only closed and fired once the watermark is received, 
its first time firing is actually 5s behind as per your maxOutOfOrderness.

It actually looks like you may be better of using allowed lateness for your 
usecase depending on what you actually need.


Regards
Nico

On Monday, 27 February 2017 15:44:20 CET Sujit Sakre wrote:
> Hi,
> 
> Hope you are well.
> 
> We have encountered an issue in processing sliding windows. Here we have
> encountered the problem that if the last record is outside of the sliding
> window end time then it does not process the record till the next sliding
> window is completely occupied and gets triggered.
> 
> Please consider the example below:
> 
> The datastream is Kafka based and Keyed on Location (1, 2, 3, 4 etc.)
> 
> Our previous sliding window
> starts at
> 09-09-2016 12:54:00,
> End at
> 09-09-2016 13:00:00
> *Key is 15 (Location)*
> 
> Records in between with timestamps:
> 09-09-2016 12:56:33
> 09-09-2016 12:56:47
> 09-09-2016 12:58:04
> 09-09-2016 12:58:39
> 09-09-2016 12:58:45
> 
> However the next window starts at
> 09-09-2016 13:04:00
> and ends at
> 09-09-2016 13:10:00
> *with Key as  16 (Location)*
> with record timestamps:
> 
> 09-09-2016 13:04:48
> 09-09-2016 13:06:07
> 09-09-2016 13:06:38
> 09-09-2016 13:07:25
> 09-09-2016 13:08:00
> 09-09-2016 13:08:20
> 09-09-2016 13:08:38
> 
> 
> is not processed until records are entered in *Location 17* with the
> timestamps:
> 09-09-2016 13:08:48
> 09-09-2016 13:08:55
> 09-09-2016 13:09:11
> 09-09-2016 13:11:48
> The window that gets formed at that time has
> Start Time: 09-09-2016 13:06:00
> End Time: 09-09-2016 13:12:00
> 
> We are using the standard BoundedOutOfOrdernessGenerator with maximum out
> of orderness of 5 seconds (we have tried various other combinations of the
> maxoutoforderness values but without success), and Event Time based
> processing.
> 
> /** * This generator generates watermarks assuming that elements
> arrive out of order, * but only to a certain degree. The latest
> elements for a certain timestamp t will arrive * at most n
> milliseconds after the earliest elements for timestamp t. */public
> class BoundedOutOfOrdernessGenerator extends
> AssignerWithPeriodicWatermarks {
> 
> private final long maxOutOfOrderness = 5000; // 5 seconds
> 
> private long currentMaxTimestamp;
> 
> @Override
> public long extractTimestamp(MyEvent element, long
> previousElementTimestamp) {
> long timestamp = element.getCreationTime();
> currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
> return timestamp;
> }
> 
> @Override
> public Watermark getCurrentWatermark() {
> // return the watermark as current highest timestamp minus the
> out-of-orderness bound
> return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
> }}
> 
> 
> Thus to summarize the problem,
> 
> the records in the window before the last window with different location
> are not processed until there is a window next with a timestamp that is
> more than the End Time timestamp of the existing sliding window.
> This means the window is not processed until next set of records arrive
> with timestamps that are more than existing end time window timestamp.
> 
> In a real situation, this means that we wait till the next set of records
> arrive, which may be after a very long duration (e.g. maybe 1 hour), and
> previous records are not processed till then.
> 
> Is this a problem that is by behavior?
> Why does the sliding window not process the record that is present even
> after not receiving a record for more than a substantial amount of time,
> e.g. 30 minutes?
> How do we resolve this situation?
> 
> Please could you suggest how to resolve this.
> 
> Many thanks.
> 
> 
> 
> *Sujit Sakre*



signature.asc
Description: This is a digitally signed message part.


Re: Running streaming job on every node of cluster

2017-02-27 Thread Nico Kruber
What about setting the parallelism[1] to the total number of slots in your 
cluster?
By default, all parts of your program are put into the same slot sharing 
group[2] and by setting the parallelism you would have this slot (with your 
whole program) in each parallel slot as well (minus/plus operators that have 
lower/higher parallelism), if I understand it correctly.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
parallel.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
datastream_api.html#task-chaining-and-resource-groups

On Monday, 27 February 2017 18:17:53 CET Evgeny Kincharov wrote:
> Thanks for your answer.
> The problem is that both slots are seized in the one node. Of course if this
> node has enough free slots. Another nodes idle. I want to utilize cluster
> resource little bit more. May be the other deployment modes allow it.
> 
> BR, Evgeny.
> 
> От: Nico Kruber<mailto:n...@data-artisans.com>
> Отправлено: 27 февраля 2017 г. в 20:07
> Кому: user@flink.apache.org<mailto:user@flink.apache.org>
> Копия: Evgeny Kincharov<mailto:evgeny_kincha...@epam.com>
> Тема: Re: Running streaming job on every node of cluster
> 
> Hi Evgeny,
> I tried to reproduce your example with the following code, having another
> console listening with "nc -l 12345"
> 
> env.setParallelism(2);
> env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3))
>.map(new MapFunction() {
>@Override
>public String map(final
> String s) throws Exception { return s; }
 })
>.addSink(new DiscardingSink());
> 
> This way, I do get a source with parallelism 1 and map & sink with
> parallelism
 2 and the whole program accompanying 2 slots as expected. You
> can check in the web interface of your cluster how many slots are taken
> after executing one instance of your program.
> 
> How do you set your parallelism?
> 
> 
> Nico
> 
> On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote:
> 
> > Hi,
> >
> >
> >
> > I have the simplest streaming job, and I want to distribute my job on
> > every
 node of my Flink cluster.
> >
> >
> >
> > Job is simple:
> >
> >
> >
> > source (SocketTextStream) -> map -> sink (AsyncHtttpSink).
> >
> >
> >
> > When I increase parallelism of my job when deploying or directly in code,
> > no
 effect because source is can't work in parallel. Now I reduce "Tasks
> > Slots" to 1 on ever nodes and deploy my job as many times as nodes in the
> > cluster. It works when I have only one job. If I want deploy another in
> > parallel there is no free slots. I hope more convenient way to do that is
> > exists. Thanks.
> >
> >
> >
> > BR,
> > Evgeny
> 
> 
> 



signature.asc
Description: This is a digitally signed message part.


  1   2   >