Hi!

The sink is merely a union of the result of the co-group and the one data
source.
Can't you just make to distinct pipelines out of that? One with co-group ->
data sink pipeline and one with the source->sink pipeline?
They could even be part of the same job...

Best,
Stephan


On Wed, Aug 23, 2017 at 5:51 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> The reason is that there are two (or more) different Threads doing the
> reading. As an illustration, consider first this case:
>
> DataSet input = ...
> input.map(new MapA()).map(new MapB())
>
> Here, MapB is technically "wrapped" by MapA and when MapA emits data this
> is directly going the the map() method of MapB. The two functions are
> chained.
>
> Now, in this other case the methods cannot be chained:
>
> DataSet input1= ...
> DataSet input2
> DataSet mappedA = input1.map(new MapA())
> DataSet mappedB = input2.map(new MapB())
>
> mappedA.union(mappedB).map(new MapC())
>
> Here, there is (at least) one thread per map because none of MapA or MapB
> could wrap MapC such that the other one (either MapA or MapB) can still
> send data into MapC. Data is sent across a channel between the Threads and
> whenever that happens the data is serialised.
>
> Technically, we could avoid serialization if we knew that two Threads are
> running in the same JVM but this is not something that Flink currently does.
>
> Best,
> Aljoscha
>
>
> On 23. Aug 2017, at 17:12, Newport, Billy <billy.newp...@gs.com> wrote:
>
> Thanks Aljoscha for the prompt response.
>
> Can you explain the technical reason for the single predecessor rule? This
> makes what we are trying to do much more expensive. Really what we’re doing
> is reading a parquet file, doing several maps/filters on the records and
> writing to the parquet. There is no serialization besides the parquet
> operations needed at all. The current flink implementation adds an
> expensive serialize/deserialize for no apparent purpose in the code.
>
> Billy
>
>
>
> *From:* Aljoscha Krettek [mailto:aljos...@apache.org <aljos...@apache.org>
> ]
> *Sent:* Saturday, August 19, 2017 1:45 AM
> *To:* Chan, Regina [Tech]
> *Cc:* Newport, Billy [Tech]; user@flink.apache.org
> *Subject:* Re: Flink parquet read.write performance
>
> Hi,
>
> The Sink cannot be chained to the previous two operations because there
> are two operations. Chaining only works if there is one predecessor
> operation. Data transfer should still be pipelined but you will see
> serialisation overhead. What kind of TypeSerializer is used at that
> boundary?
>
> Best,
> Aljoscha
>
> On 18. Aug 2017, at 21:15, Chan, Regina <regina.c...@gs.com> wrote:
>
> We profiled it and it looks like its sending the output of the
> datastoure->filter->map->map to the an intermediate result partition
> instead of writing directly to the data sink. Because of this we think it’s
> slow because it’s spending its time serializing it for no reason. Why does
> it do the forward rather than chain to the datasink?
>
> <image001.png><image002.png>
>
> Thanks,
> Regina
>
> *From:* Aljoscha Krettek [mailto:aljos...@apache.org <aljos...@apache.org>
> ]
> *Sent:* Friday, August 18, 2017 12:14 PM
> *To:* Newport, Billy [Tech]
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink parquet read.write performance
>
> Hi Billy,
>
> Do you also have the data (picture) from the "Timeline" tab of the
> completed job? This would give some hints about how long that other
> DataSource (with chain) was active. It might be that the sink is waiting
> for the other input to become online.
>
> Best,
> Aljoscha
>
>
> On 18. Aug 2017, at 14:45, Newport, Billy <billy.newp...@gs.com> wrote:
>
> Hi,
>
> I’m trying to figure out why reading and writing ~5GB worth of parquet
> files seems to take 3-4 minutes with 10 TaskManagers, 2 slots, 20GB memory,
> 20 Parallelism. I’ve copied in the execution plan the taskmanager times
> below. Other details include that we’re reading 20 snappy compresed parquet
> files each ~240MB each. (see below)
>
> I’m trying to use this for a milestoning logic where we take new avro
> files from staging and join with the existing milestoned parquet data. I
> have a small staging file with only about 1500 records inside so I reduce
> the number of records sent to the cogroup in order to make this faster. To
> do this, I’m basically reading in GenericRecords from parquet files twice,
> once to filter out for “live” records where we then further filter the
> records for ones with keys matching what we found in a separate avro file.
> This is so reduction of records makes that part of the plan total to 1
> minute 58 secs.
>
> The concern is the other records with non-live/not-matching-keys. In
> theory, I expect this to be fast since it’s just chaining the operations
> across all the way through to the sink. However, this part takes about 4
> minutes. We’re not doing anything different from the other Datasource aside
> from mapping a DataSet<GenericRecord> to a Tuple2<Short,GenericRecord>
> where the short is a bitmap value mapping to where the record needs to be
> written.
>
> Other notes:
> I checked the backpressure on the datasource->filter->map->map and it was
> OK. I’m not sure what else could be holding it up.
> I also profiled it when I ran it on a single task manager single slot and
> it seems to spend most of the time waiting.
>
> Any ideas? Instead of truly chaining is it writing to disk and serializing
> multiple times inside each operation?
>
> Data Source :
> hdfs dfs -du -h <folder_name>
> 240.2 M  <folder_name>/0_partMapper-m-00013.snappy.parquet
> 237.2 M  <folder_name>/10_partMapper-m-00019.snappy.parquet
> 241.9 M  <folder_name>/11_partMapper-m-00002.snappy.parquet
> 243.3 M  <folder_name>/12_partMapper-m-00000.snappy.parquet
> 238.2 M  <folder_name>/13_partMapper-m-00016.snappy.parquet
> 241.7 M  <folder_name>/14_partMapper-m-00003.snappy.parquet
> 241.0 M  <folder_name>/15_partMapper-m-00006.snappy.parquet
> 240.3 M  <folder_name>/16_partMapper-m-00012.snappy.parquet
> 240.3 M  <folder_name>/17_partMapper-m-00011.snappy.parquet
> 239.5 M  <folder_name>/18_partMapper-m-00014.snappy.parquet
> 237.6 M  <folder_name>/19_partMapper-m-00018.snappy.parquet
> 240.7 M  <folder_name>/1_partMapper-m-00009.snappy.parquet
> 240.7 M  <folder_name>/20_partMapper-m-00008.snappy.parquet
> 236.5 M  <folder_name>/2_partMapper-m-00020.snappy.parquet
> 242.1 M  <folder_name>/3_partMapper-m-00001.snappy.parquet
> 241.7 M  <folder_name>/4_partMapper-m-00004.snappy.parquet
> 240.5 M  <folder_name>/5_partMapper-m-00010.snappy.parquet
> 241.7 M  <folder_name>/6_partMapper-m-00005.snappy.parquet
> 239.1 M  <folder_name>/7_partMapper-m-00015.snappy.parquet
> 237.9 M  <folder_name>/8_partMapper-m-00017.snappy.parquet
> 240.8 M  <folder_name>/9_partMapper-m-00007.snappy.parquet
>
>
> yarn-session.sh -nm "delp_uat-IMD_Trading_v1_PROD_PerfTest-REFINER_INGEST"
> -jm 4096 -tm 20480 -s 2 -n 10  -d]
>
>
> <image001.png>
>
> <image002.png>
>
>
> Thanks,
>
> Regina
>
>
>

Reply via email to