I have the same experience as Eleanore,


When enabling object reuse, I saw a significant performance improvement and 
in my profiling session. I saw that a lot of serialization/deserialization 
was not performed any more.



That’s why my question should originally aim a bit further: It’s good that 
Flink reuses objects, but I still need to create a new instance of my 
objects per event when parsed, which is ultimately dropped at some 
processing step in the flink pipeline later on (map, shuffle or sink). 
Wouldn’t it be possible that the “deserialize” method can have an optional 
“oldPOJO” input where Flink provides me an unused old instance of my POJO if 
it has one left? And if null, I instantiate a new instance in my code? With 
billions of small events ingested per day, I can imagine this to be another 
small performance improvement especially in terms of garbage collection…



Best regads

Theo



From: Till Rohrmann <trohrm...@apache.org>
Sent: Mittwoch, 19. Februar 2020 07:34
To: Jin Yi <eleanore....@gmail.com>
Cc: user <user@flink.apache.org>
Subject: Re: Parallelize Kafka Deserialization of a single partition?



Then my statement must be wrong. Let me double check this. Yesterday when 
checking the usage of the objectReuse field, I could only see it in the 
batch operators. I'll get back to you.



Cheers,

Till



On Wed, Feb 19, 2020, 07:05 Jin Yi <eleanore....@gmail.com 
<mailto:eleanore....@gmail.com> > wrote:

Hi Till,

I just read your comment:

Currently, enabling object reuse via ExecutionConfig.enableObjectReuse() 
only affects the DataSet API. DataStream programs will always do defensive 
copies. There is a FLIP to improve this behaviour [1].



I have an application that is written in apache beam, but the runner is 
flink, in the configuration of the pipeline, it is in streaming mode, and I 
see performance difference between enable/disable ObjectReuse, also when 
running in debugging mode, I noticed that with objectReuse set to true, 
there is no serialization/deserialization happening between operators, 
however, when set to false, in between each operator, the serialization and 
deserialization is happening. So do you have any idea why this is happening?

MyOptions options = PipelineOptionsFactory.as(MyOptions.class);
options.setStreaming(true);
options.setObjectReuse(true);

Thanks a lot!
Eleanore



On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann <trohrm...@apache.org 
<mailto:trohrm...@apache.org> > wrote:

Hi Theo,



the KafkaDeserializationSchema does not allow to return asynchronous 
results. Hence, Flink will always wait until 
KafkaDeserializationSchema.deserialize returns the parsed value. 
Consequently, the only way I can think of to offload the complex parsing 
logic would be to do it in a downstream operator where you could use 
AsyncI/O to run the parsing logic in a thread pool, for example.



Alternatively, you could think about a simple program which transforms your 
input events into another format where it is easier to extract the timestamp 
from. This comes, however, at the cost of another Kafka topic.



Currently, enabling object reuse via ExecutionConfig.enableObjectReuse() 
only affects the DataSet API. DataStream programs will always do defensive 
copies. There is a FLIP to improve this behaviour [1].



[1] 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982



Cheers,

Till



On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal 
<theo.diefent...@scoop-software.de 
<mailto:theo.diefent...@scoop-software.de> > wrote:

Hi,



As for most pipelines, our flink pipeline start with parsing source kafka 
events into POJOs. We perform this step within a KafkaDeserizationSchema so 
that we properly extract the event itme timestamp for the downstream 
Timestamp-Assigner.



Now it turned out that parsing is currently the most CPU intensive task in 
our pipeline and thus CPU bounds the number of elements we can ingest per 
second. Further splitting up the partitions will be hard as we need to 
maintain the exact order of events per partition and would also required 
quite some architectural changes for producers and the flink job.



Now I had the idea to put the parsing task into ordered Async-IO. But 
AsyncIO can only be plugged in into an existing Stream, not into the 
deserialization schema, as far as I see. So the best idea I currently have 
is to keep parsing in the DeserializationSchema as minimal as possible to 
extract the Event timestamp and do the full parsing downstream in Async IO. 
This however, seems to be a bit tedious, especially as we have to deal with 
multiple input formats and would need to develop two parsers for the heavy 
load once: a timestamp only and a full parser.



Do you know if it is somehow possible to parallelize / async IO the parsing 
within the KafkaDeserializationSchema? I don't have state access in there 
and I don't have a "collector" object in there so that one element as input 
needs to produce exactly one output element.



Another question: My parsing produces Java POJO objects via "new", which are 
sent downstream (reusePOJO setting set) and finally will be garbage 
collected once they reached the sink. Is there some mechanism in Flink so 
that I could reuse "old" sinked POJOs in the source? All tasks are chained 
so that theoretically, that could be possible?



Best regards

Theo

Reply via email to