Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-10-18 Thread Etienne Chauchot

Hi Sandeep, hi all,

I'm getting back to you on this question after a check on Beam:

ParquetIO has a block group size support (see for ex 
https://www.dremio.com/tuning-parquet/).


Default group size is 1024*1024 bytes. Have you tried playing with 
different values of it using Sink#withRowGroupSize(), in particular 
matching your S3 filesystem block size?


Also you should put your Beam logs in debug level as the IO has 
"allocated memory" and "too much memory used" debug and warning log 
messages.


Best

Etienne Chauchot

On 15/10/2021 13:02, Etienne Chauchot wrote:

Hi Sandeep and community,

I'm a PMC member of the Beam community and also a contributor on 
Flink. I also used parquet in conjunction with Beam pipelines running 
with Flink Beam runner. So I should be able to help:


I don't think it is a Flink issue as you have a Beam pipeline so you 
use only Beam IOs that are wrapped by Beam runner inside a Flink 
operator at (Beam) translation time.


I don't think It is related to S3 either but much more to ParquetIO in 
Beam (read and write connector).


I'll check at the Beam side and get back to you.

Best

Etienne.

On 30/09/2021 14:42, Till Rohrmann wrote:

Hi Sandeep,

I am not a Beam expert. The problem might be caused by the used S3
filesystem implementation. Have you tried whether the same problem 
occurs

when using vanilla Flink's latest version? Alternatively, you could also
reach out to the Beam community or ask on Flink's user ML whether people
have experience with such a problem.

Some of the exceptions look as if your network is a bit flakey. You 
might

wanna look into the infrastructure you are running on.

Cheers,
Till

On Tue, Sep 14, 2021 at 5:22 PM Kathula, Sandeep
 wrote:


Hi,
    We have a simple Beam application which reads from Kafka, 
converts to
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 
1.12). We

have a fixed window of 5 minutes after conversion to
PCollection and then writing to S3. We have around 320
columns in our data. Our intention is to write large files of size 
128MB or

more so that it won’t have a small file problem when reading back from
Hive. But from what we observed it is taking too much memory to 
write to S3
(giving memory of 8GB to heap is not enough to write 50 MB files and 
it is

going OOM). When I increase memory for heap to 32GB then it take lot of
time to write records to s3.
For instance it takes:

20 MB file - 30 sec
50 MB file - 1 min 16 sec
75 MB file - 2 min 15 sec
83 MB file - 2 min 40 sec

Code block to write to S3:
PCollection parquetRecord = …….

parquetRecord.apply(FileIO.write()
 .via(ParquetIO.sink(getOutput_schema()))
 .to(outputPath.isEmpty() ? outputPath() : outputPath)
 .withNumShards(5)
 .withNaming(new CustomFileNaming("snappy.parquet")));


We are also getting different exceptions like:


   1.  UserCodeException:

Caused by: org.apache.beam.sdk.util.UserCodeException:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 


Could not forward element to next operator
 at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) 


 at
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown 


Source)
 at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 


 at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 


 at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 


 at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 


 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 


 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 


 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 


 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 


 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 


 at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 


 at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 


 at
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 


 at
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:7

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-10-15 Thread Etienne Chauchot

Hi Sandeep and community,

I'm a PMC member of the Beam community and also a contributor on Flink. 
I also used parquet in conjunction with Beam pipelines running with 
Flink Beam runner. So I should be able to help:


I don't think it is a Flink issue as you have a Beam pipeline so you use 
only Beam IOs that are wrapped by Beam runner inside a Flink operator at 
(Beam) translation time.


I don't think It is related to S3 either but much more to ParquetIO in 
Beam (read and write connector).


I'll check at the Beam side and get back to you.

Best

Etienne.

On 30/09/2021 14:42, Till Rohrmann wrote:

Hi Sandeep,

I am not a Beam expert. The problem might be caused by the used S3
filesystem implementation. Have you tried whether the same problem occurs
when using vanilla Flink's latest version? Alternatively, you could also
reach out to the Beam community or ask on Flink's user ML whether people
have experience with such a problem.

Some of the exceptions look as if your network is a bit flakey. You might
wanna look into the infrastructure you are running on.

Cheers,
Till

On Tue, Sep 14, 2021 at 5:22 PM Kathula, Sandeep
 wrote:


Hi,
We have a simple Beam application which reads from Kafka, converts to
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
have a fixed window of 5 minutes after conversion to
PCollection and then writing to S3. We have around 320
columns in our data. Our intention is to write large files of size 128MB or
more so that it won’t have a small file problem when reading back from
Hive. But from what we observed it is taking too much memory to write to S3
(giving memory of 8GB to heap is not enough to write 50 MB files and it is
going OOM). When I increase memory for heap to 32GB then it take lot of
time to write records to s3.
For instance it takes:

20 MB file - 30 sec
50 MB file - 1 min 16 sec
75 MB file - 2 min 15 sec
83 MB file - 2 min 40 sec

Code block to write to S3:
PCollection parquetRecord = …….

parquetRecord.apply(FileIO.write()
 .via(ParquetIO.sink(getOutput_schema()))
 .to(outputPath.isEmpty() ? outputPath() : outputPath)
 .withNumShards(5)
 .withNaming(new CustomFileNaming("snappy.parquet")));


We are also getting different exceptions like:


   1.  UserCodeException:

Caused by: org.apache.beam.sdk.util.UserCodeException:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
 at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
 at
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
 at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
 at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
 at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
 at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
 at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
 at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
 at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
 at
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
 at
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
 at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
 at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
 at
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)
 at java.lang.Iterable.forEach(Iterable.java:75)
 at
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)
 at
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoke

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-30 Thread Till Rohrmann
Hi Sandeep,

I am not a Beam expert. The problem might be caused by the used S3
filesystem implementation. Have you tried whether the same problem occurs
when using vanilla Flink's latest version? Alternatively, you could also
reach out to the Beam community or ask on Flink's user ML whether people
have experience with such a problem.

Some of the exceptions look as if your network is a bit flakey. You might
wanna look into the infrastructure you are running on.

Cheers,
Till

On Tue, Sep 14, 2021 at 5:22 PM Kathula, Sandeep
 wrote:

> Hi,
>We have a simple Beam application which reads from Kafka, converts to
> parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
> have a fixed window of 5 minutes after conversion to
> PCollection and then writing to S3. We have around 320
> columns in our data. Our intention is to write large files of size 128MB or
> more so that it won’t have a small file problem when reading back from
> Hive. But from what we observed it is taking too much memory to write to S3
> (giving memory of 8GB to heap is not enough to write 50 MB files and it is
> going OOM). When I increase memory for heap to 32GB then it take lot of
> time to write records to s3.
> For instance it takes:
>
> 20 MB file - 30 sec
> 50 MB file - 1 min 16 sec
> 75 MB file - 2 min 15 sec
> 83 MB file - 2 min 40 sec
>
> Code block to write to S3:
> PCollection parquetRecord = …….
>
> parquetRecord.apply(FileIO.write()
> .via(ParquetIO.sink(getOutput_schema()))
> .to(outputPath.isEmpty() ? outputPath() : outputPath)
> .withNumShards(5)
> .withNaming(new CustomFileNaming("snappy.parquet")));
>
>
> We are also getting different exceptions like:
>
>
>   1.  UserCodeException:
>
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
> at
> com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
> at
> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at
> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)
> at
> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread David Morávek
Hi Sandeep,

Jan has already provided pretty good guidelines for getting more context on
the issue ;)

Because this is not for the first time, I would like to raise awareness,
that it's not OK to send a user related question to four Apache mailing
list (that I know of). Namely:

- u...@flink.apache.org
- dev@flink.apache.org
- u...@beam.apache.org
- d...@beam.apache.org

Community focus is a very precious resource, that should be used wisely.
All of these mailings lists are answering many complex questions each day
and it's very unfortunate if any of this work needs to be duplicated. Next
time please focus Beam related user questions solely to u...@beam.apache.org
.

Thanks for your understanding. You can consult community guidelines [1][2]
if you are not sure where the particular question belongs to.

[1] https://flink.apache.org/community.html#mailing-lists
[2] https://beam.apache.org/community/contact-us/

Best,
D.

On Tue, Sep 14, 2021 at 5:47 PM Jan Lukavský  wrote:

> Hi Sandeep,
> a few questions:
>  a) which state backend do you use for Flink?
>  b) what is your checkpointingInterval set for FlinkRunner?
>  c) how much data is there in your input Kafka topic(s)?
>
> FileIO has to buffer all elements per window (by default) into state, so
> this might create a high pressure on state backend and/or heap, which could
> result in suboptimal performance. Due to the "connection loss" and timeout
> exceptions you describe I'd suppose there might be a lot of GC pressure.
>
>  Jan
> On 9/14/21 5:20 PM, Kathula, Sandeep wrote:
>
> Hi,
>
>We have a simple Beam application which reads from Kafka, converts to
> parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
> have a fixed window of 5 minutes after conversion to
> PCollection and then writing to S3. We have around 320
> columns in our data. Our intention is to write large files of size 128MB or
> more so that it won’t have a small file problem when reading back from
> Hive. But from what we observed it is taking too much memory to write to S3
> (giving memory of 8GB to heap is not enough to write 50 MB files and it is
> going OOM). When I increase memory for heap to 32GB then it take lot of
> time to write records to s3.
>
> For instance it takes:
>
>
>
> 20 MB file - 30 sec
>
> 50 MB file - 1 min 16 sec
>
> 75 MB file - 2 min 15 sec
>
> 83 MB file - 2 min 40 sec
>
>
>
> Code block to write to S3:
>
> PCollection parquetRecord = …….
>
>
>
> parquetRecord.apply(FileIO.*write*()
> .via(ParquetIO.*sink*(getOutput_schema()))
> .to(outputPath.isEmpty() ? outputPath() : outputPath)
> .withNumShards(5)
> .withNaming(new CustomFileNaming("snappy.parquet")));
>
>
>
>
>
> We are also getting different exceptions like:
>
>
>
>1. *UserCodeException*:
>
>
>
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>
> at
> com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>
> at
> org.apache.beam.runners.core.Simp