Hi Till,
I just read your comment:
Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
only affects the DataSet API. DataStream programs will always do defensive
copies. There is a FLIP to improve this behaviour [1].

I have an application that is written in apache beam, but the runner is
flink, in the configuration of the pipeline, it is in streaming mode, and I
see performance difference between enable/disable ObjectReuse, also when
running in debugging mode, I noticed that with objectReuse set to true,
there is no serialization/deserialization happening between operators,
however, when set to false, in between each operator, the serialization and
deserialization is happening. So do you have any idea why this is happening?

MyOptions options = PipelineOptionsFactory.as(MyOptions.class);

options.setStreaming(true);

options.setObjectReuse(true);


Thanks a lot!

Eleanore


On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Theo,
>
> the KafkaDeserializationSchema does not allow to return asynchronous
> results. Hence, Flink will always wait until
> KafkaDeserializationSchema.deserialize returns the parsed value.
> Consequently, the only way I can think of to offload the complex parsing
> logic would be to do it in a downstream operator where you could use
> AsyncI/O to run the parsing logic in a thread pool, for example.
>
> Alternatively, you could think about a simple program which transforms
> your input events into another format where it is easier to extract the
> timestamp from. This comes, however, at the cost of another Kafka topic.
>
> Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
> only affects the DataSet API. DataStream programs will always do defensive
> copies. There is a FLIP to improve this behaviour [1].
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
>
> Cheers,
> Till
>
> On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi,
>>
>> As for most pipelines, our flink pipeline start with parsing source kafka
>> events into POJOs. We perform this step within a KafkaDeserizationSchema so
>> that we properly extract the event itme timestamp for the downstream
>> Timestamp-Assigner.
>>
>> Now it turned out that parsing is currently the most CPU intensive task
>> in our pipeline and thus CPU bounds the number of elements we can ingest
>> per second. Further splitting up the partitions will be hard as we need to
>> maintain the exact order of events per partition and would also required
>> quite some architectural changes for producers and the flink job.
>>
>> Now I had the idea to put the parsing task into ordered Async-IO. But
>> AsyncIO can only be plugged in into an existing Stream, not into the
>> deserialization schema, as far as I see. So the best idea I currently have
>> is to keep parsing in the DeserializationSchema as minimal as possible to
>> extract the Event timestamp and do the full parsing downstream in Async IO.
>> This however, seems to be a bit tedious, especially as we have to deal with
>> multiple input formats and would need to develop two parsers for the heavy
>> load once: a timestamp only and a full parser.
>>
>> Do you know if it is somehow possible to parallelize / async IO the
>> parsing within the KafkaDeserializationSchema? I don't have state access in
>> there and I don't have a "collector" object in there so that one element as
>> input needs to produce exactly one output element.
>>
>> Another question: My parsing produces Java POJO objects via "new", which
>> are sent downstream (reusePOJO setting set) and finally will be garbage
>> collected once they reached the sink. Is there some mechanism in Flink so
>> that I could reuse "old" sinked POJOs in the source? All tasks are chained
>> so that theoretically, that could be possible?
>>
>> Best regards
>> Theo
>>
>

Reply via email to