Adding headers to tuples before writing to S3

2017-10-23 Thread ShB
Hi, I'm working with Flink for data analytics and reporting. The use case is that, when a user requests a report, a Flink cluster does some computations on the data, generates the final report(a DataSet of tuples) and uploads the report to S3, post which an email is sent to the corresponding

Re: Task Manager was lost/killed due to full GC

2017-10-17 Thread ShB
I just wanted to leave an update about this issue, for someone else who might come across it. The problem was with memory, but it was disk memory and not heap/off-heap memory. Yarn was killing off my containers as they exceeded the threshold for disk utilization and this was manifesting as Task

Re: Task Manager was lost/killed due to full GC

2017-10-12 Thread ShB
On further investigation, seems to me the I/O exception I posted previously is not the cause of the TM being lost. it's the after effect of the TM being shut down and the channel being closed after a record is emitted but before it's processed. Previously, the logs didn't throw up this error and

Re: Task Manager was lost/killed due to full GC

2017-10-12 Thread ShB
Hi Stephan, Apologies, I hit send too soon on the last email. So, while trying to debug this, I ran it multiple times on different instance types(to increase RAM available) and while digging into the logs, I found this to be the error in the task manager logs: / java.lang.RuntimeException:

Re: Task Manager was lost/killed due to full GC

2017-10-12 Thread ShB
Hi Stephan, Thanks for your response! Task manager lost/killed has been a recurring problem I've had with Flink for the last few months, as I try to scale to larger and larger amounts of data. I would be very grateful for some help figuring out how I can avoid this. The program is set up

Re: Task Manager was lost/killed due to full GC

2017-09-19 Thread ShB
Thanks for your response! Recommendation to decrease allotted memory? Which allotted memory should be decreased? I tried decreasing taskmanager.memory.fraction to give more memory to user managed operations, that doesn't work beyond a point. Also tried increasing containerized.heap-cutoff-ratio,

Task Manager was lost/killed due to full GC

2017-09-05 Thread ShB
Hi, I'm running a Flink batch job that reads almost 1 TB of data from S3 and then performs operations on it. A list of filenames are distributed among the TM's and each subset of files is read from S3 from each TM. This job errors out at the read step due to the following error:

Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

2017-08-31 Thread ShB
Hi Fabian, Thanks for your response. If I implemented my own InputFormat, how would I read a specific list of files from S3? Assuming I need to use readFile(), below would read all of the files from the specified S3 bucket or path: env.readFile(MyInputFormat, "s3://my-bucket/") Is there a way

Re: Distributed reading and parsing of protobuf files from S3 in Apache Flink

2017-08-30 Thread ShB
Hi Fabian, Thank you so much for your quick response, I appreciate it. Since I'm working with a very large number of files of small sizes, I don't necessarily need to read each file in parallel. I need to read a my large list of files in parallel - that is, split up my list of files into

Distributed reading and parsing of protobuf files from S3 in Apache Flink

2017-07-26 Thread ShB
I'm working with Apache Flink on reading, parsing and processing data from S3. I'm using the DataSet API, as my data is bounded and doesn't need streaming semantics. My data is on S3 in binary protobuf format in the form of a large number of timestamped files. Each of these files have to be