We saw the function wrapping when we were debugging it and that’s what 
surprised us when it suddenly serialized rather than called the writer and 
physically wrote the records in a separate jvm.


From: Aljoscha Krettek [mailto:aljos...@apache.org]
Sent: Wednesday, August 23, 2017 11:51 AM
To: Newport, Billy [Tech]
Cc: Chan, Regina [Tech]; user@flink.apache.org
Subject: Re: Flink parquet read.write performance

Hi,

The reason is that there are two (or more) different Threads doing the reading. 
As an illustration, consider first this case:

DataSet input = ...
input.map(new MapA()).map(new MapB())

Here, MapB is technically "wrapped" by MapA and when MapA emits data this is 
directly going the the map() method of MapB. The two functions are chained.

Now, in this other case the methods cannot be chained:

DataSet input1= ...
DataSet input2
DataSet mappedA = input1.map(new MapA())
DataSet mappedB = input2.map(new MapB())

mappedA.union(mappedB).map(new MapC())

Here, there is (at least) one thread per map because none of MapA or MapB could 
wrap MapC such that the other one (either MapA or MapB) can still send data 
into MapC. Data is sent across a channel between the Threads and whenever that 
happens the data is serialised.

Technically, we could avoid serialization if we knew that two Threads are 
running in the same JVM but this is not something that Flink currently does.

Best,
Aljoscha


On 23. Aug 2017, at 17:12, Newport, Billy 
<billy.newp...@gs.com<mailto:billy.newp...@gs.com>> wrote:

Thanks Aljoscha for the prompt response.

Can you explain the technical reason for the single predecessor rule? This 
makes what we are trying to do much more expensive. Really what we’re doing is 
reading a parquet file, doing several maps/filters on the records and writing 
to the parquet. There is no serialization besides the parquet operations needed 
at all. The current flink implementation adds an expensive 
serialize/deserialize for no apparent purpose in the code.

Billy



From: Aljoscha Krettek [mailto:aljos...@apache.org]
Sent: Saturday, August 19, 2017 1:45 AM
To: Chan, Regina [Tech]
Cc: Newport, Billy [Tech]; user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Flink parquet read.write performance

Hi,

The Sink cannot be chained to the previous two operations because there are two 
operations. Chaining only works if there is one predecessor operation. Data 
transfer should still be pipelined but you will see serialisation overhead. 
What kind of TypeSerializer is used at that boundary?

Best,
Aljoscha
On 18. Aug 2017, at 21:15, Chan, Regina 
<regina.c...@gs.com<mailto:regina.c...@gs.com>> wrote:

We profiled it and it looks like its sending the output of the 
datastoure->filter->map->map to the an intermediate result partition instead of 
writing directly to the data sink. Because of this we think it’s slow because 
it’s spending its time serializing it for no reason. Why does it do the forward 
rather than chain to the datasink?

<image001.png><image002.png>

Thanks,
Regina

From: Aljoscha Krettek [mailto:aljos...@apache.org]
Sent: Friday, August 18, 2017 12:14 PM
To: Newport, Billy [Tech]
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Flink parquet read.write performance

Hi Billy,

Do you also have the data (picture) from the "Timeline" tab of the completed 
job? This would give some hints about how long that other DataSource (with 
chain) was active. It might be that the sink is waiting for the other input to 
become online.

Best,
Aljoscha

On 18. Aug 2017, at 14:45, Newport, Billy 
<billy.newp...@gs.com<mailto:billy.newp...@gs.com>> wrote:

Hi,

I’m trying to figure out why reading and writing ~5GB worth of parquet files 
seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory, 20 
Parallelism. I’ve copied in the execution plan the taskmanager times below. 
Other details include that we’re reading 20 snappy compresed parquet files each 
~240MB each. (see below)

I’m trying to use this for a milestoning logic where we take new avro files 
from staging and join with the existing milestoned parquet data. I have a small 
staging file with only about 1500 records inside so I reduce the number of 
records sent to the cogroup in order to make this faster. To do this, I’m 
basically reading in GenericRecords from parquet files twice, once to filter 
out for “live” records where we then further filter the records for ones with 
keys matching what we found in a separate avro file. This is so reduction of 
records makes that part of the plan total to 1 minute 58 secs.

The concern is the other records with non-live/not-matching-keys. In theory, I 
expect this to be fast since it’s just chaining the operations across all the 
way through to the sink. However, this part takes about 4 minutes. We’re not 
doing anything different from the other Datasource aside from mapping a 
DataSet<GenericRecord> to a Tuple2<Short,GenericRecord> where the short is a 
bitmap value mapping to where the record needs to be written.

Other notes:
I checked the backpressure on the datasource->filter->map->map and it was OK. 
I’m not sure what else could be holding it up.
I also profiled it when I ran it on a single task manager single slot and it 
seems to spend most of the time waiting.

Any ideas? Instead of truly chaining is it writing to disk and serializing 
multiple times inside each operation?

Data Source :
hdfs dfs -du -h <folder_name>
240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet
237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet
241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet
243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet
238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet
241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet
241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet
240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet
240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet
239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet
237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet
240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet
240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet
236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet
242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet
241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet
240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet
241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet
239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet
237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet
240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet


yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"  -jm 
4096 -tm 20480 -s 2 -n 10  -d]


<image001.png>

<image002.png>


Thanks,

Regina

Reply via email to