Re: S3A AWSS3IOException from Flink's BucketingSink to S3

2018-12-09 Thread Flink Developer
Hi, is there any idea on what causes this and how it can be resolved? Thanks. ‐‐‐ Original Message ‐‐‐ On Wednesday, December 5, 2018 12:44 AM, Flink Developer wrote: > I have a Flink app with high parallelism (400) running in AWS EMR. It uses > Flink v1.5.2. It sources Kaf

S3A AWSS3IOException from Flink's BucketingSink to S3

2018-12-05 Thread Flink Developer
I have a Flink app with high parallelism (400) running in AWS EMR. It uses Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using RocksDb backend for checkpointing). The destination is defined using "s3a://" prefix. The Flink job is a streaming app which runs continuously. At

Re: Flink Exception - AmazonS3Exception and ExecutionGraph - Error in failover strategy

2018-12-04 Thread Flink Developer
, 2018 11:02 AM, Flink Developer wrote: > I have a Flink app on 1.5.2 which sources data from Kafka topic (400 > partitions) and runs with 400 parallelism. The sink uses bucketing sink to S3 > with rocks db. Checkpoint interval is 2 min and checkpoint timeout is 2 min. > Checkpoi

Flink Exception - AmazonS3Exception and ExecutionGraph - Error in failover strategy

2018-12-03 Thread Flink Developer
I have a Flink app on 1.5.2 which sources data from Kafka topic (400 partitions) and runs with 400 parallelism. The sink uses bucketing sink to S3 with rocks db. Checkpoint interval is 2 min and checkpoint timeout is 2 min. Checkpoint size is a few mb. After execution for a few days, I see: Org

Re: 回复:Flink job failing due to "Container is running beyond physical memory limits" error.

2018-11-26 Thread Flink Developer
Also, after the Flink job has failed from the above error, the Flink job is unable to recover from previous checkpoint. Is this the expected behavior? How can the job be recovered successfully from this? ‐‐‐ Original Message ‐‐‐ On Monday, November 26, 2018 12:35 AM, Flink Developer

Re: Flink Exception - assigned slot container was removed

2018-11-26 Thread Flink Developer
of your slots were freed during the job execution >> (possibly due to idle for too long). AFAIK the exception was thrown when a >> pending Slot request was removed. You can try increase the >> “Slot.idle.timeout” to mitigate this issue (default is 50000, try 360 or >>

Re: 回复:Flink job failing due to "Container is running beyond physical memory limits" error.

2018-11-26 Thread Flink Developer
I am also experiencing this error message "Container is running beyond physical memory limits". In my case, I am using Flink 1.5.2 with 10 task managers, with 40 slots for each task manager. The memory assigned during flink cluster creation is 1024MB per task manager. The checkpoint is using Roc

Re: Flink Exception - assigned slot container was removed

2018-11-26 Thread Flink Developer
FAIK the exception was thrown when a > pending Slot request was removed. You can try increase the > “Slot.idle.timeout” to mitigate this issue (default is 5, try 360 or > higher). > > Regards, > Qi > >> On Nov 26, 2018, at 7:36 AM, Flink Developer >> wrot

Flink Exception - assigned slot container was removed

2018-11-25 Thread Flink Developer
Hi, I have a Flink application sourcing from a topic in Kafka (400 partitions) and sinking to S3 using bucketingsink and using RocksDb for checkpointing every 2 mins. The Flink app runs with parallelism 400 so that each worker handles a partition. This is using Flink 1.5.2. The Flink cluster use

Re: How to get Kafka record's timestamp using Flink's KeyedDeserializationSchema?

2018-11-16 Thread Flink Developer
t; issues [1] and [2]. > > Best, > Andrey > > [1] https://issues.apache.org/jira/browse/FLINK-8500 > [2] https://issues.apache.org/jira/browse/FLINK-8354 > >> On 16 Nov 2018, at 09:41, Flink Developer >> wrote: >> >> Kafka timestamp

How to get Kafka record's timestamp using Flink's KeyedDeserializationSchema?

2018-11-16 Thread Flink Developer
Hi, I have a flink app which uses the FlinkKafkaConsumer. I am interested in retrieving the Kafka timestamp for a given record/offset using the *KeyedDeserializationSchema* which provides topic, partition, offset and message. How can the timestamp be obtained through this interface? Thank you

How to use multiple sources with multiple sinks

2018-11-10 Thread Flink Developer
How can I configure 1 Flink Job (stream execution environment, parallelism set to 10) to have multiple kafka sources where each has its' own sink to s3. For example, let's say the sources are: - Kafka Topic A - Consumer (10 partitions) - Kafka Topic B - Consumer (10 partitions) - Kafka Topic C -

Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-07 Thread Flink Developer
ate, no out-of-the-box S3 stuff works ATM, but that should > hopefully be fixed *soon*. If you can wait, that is the easiest, if you > can't, building either your own custom sink or your own flink with the > backport isn't a terrible option. > > Hope that helps! > > A

Recommendation for BucketingSink to GZIP compress files to S3

2018-11-04 Thread Flink Developer
Hi, what is the recommended method for using BucketingSink and compressing files using GZIP before it is uploaded to S3? I read that one way is to extend the StreamWriterBase class and wrap the stream using GZIPOutputStream. Is there an Flink example for this? If so, what would be the proper wa

Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-04 Thread Flink Developer
> than it uses configuration provided using flink configuration object by > calling setConfig method of BucketingSink. > > On Sat 3 Nov, 2018, 09:24 Flink Developer >> It seems the issue also appears when using Flink version 1.6.2 . >> ‐‐‐ Original Message ‐‐‐ >>

Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-03 Thread Flink Developer
It seems the issue also appears when using Flink version 1.6.2 . ‐‐‐ Original Message ‐‐‐ On Tuesday, October 30, 2018 10:26 PM, Flink Developer wrote: > Hi, thanks for the info Rafi, that seems to be related. I hope Flink version > 1.6.2 fixes this. Has anyone encountered this

Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-10-30 Thread Flink Developer
FLINK-9752 will be part of > 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and not > presto). > > Does anyone know if this fix would solve this issue? > > Thanks, > Rafi > > On Sun, Oct 28, 2018 at 12:08 AM Flink Developer > wrote: > >>

Flink Kafka to BucketingSink to S3 - S3Exception

2018-10-27 Thread Flink Developer
Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop 2.8.4) with flink parallelization set to 400. The source is a Kafka topic and sinks to S3 in the format of: s3:/. There's potentially 400 files writing simultaneously. Configuration: - Flink v1.5.2 - Checkpointing en

Reading multiple files from S3 source in parallel

2018-10-23 Thread Flink Developer
Hello, I'm interested in creating a Flink batch app that can process multiple files from S3 source in parallel. Let's say I have the following S3 structure and that my Flink App has Parallelism set to 3 workers. s3://bucket/data-1/worker-1/file-1.txt s3://bucket/data-1/worker-1/file-2.t