[ https://issues.apache.org/jira/browse/FLINK-20060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282128#comment-17282128 ]
Blake Wilson edited comment on FLINK-20060 at 2/10/21, 2:29 AM: ---------------------------------------------------------------- Voicing support for addressing this. Deserialization with a Collector would be a great usability enhancement for applications which encode a series of values in a single Kinesis record. It would also save a great deal of serialization since the current API requires adding a separate task to flatten kinesis records: KinesisConsumer — DataStream<List<T>> FlattenItems ------ DataStream<T> ----> The serialization cost of introducing this extra task is quite dramatic when using a durable backend like the RocksDB one used in Kinesis Data Analytics was (Author: blake wilson): Voicing support for addressing this. Deserialization with a Collector would be a great usability enhancement for applications which encode a series of values in a single Kinesis record. It would also save a great deal of serialization since the current API requires adding a separate task to flatten kinesis records: KinesisConsumer — DataStream<List<T>> FlattenItems ------ DataStream<T>-> The serialization cost of introducing this extra task is quite dramatic when using a durable backend like the RocksDB one used in Kinesis Data Analytics > Add a Collector to KinsesisDeserializationSchema > ------------------------------------------------ > > Key: FLINK-20060 > URL: https://issues.apache.org/jira/browse/FLINK-20060 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kinesis > Reporter: Timo Walther > Priority: Major > > We did not add support for a collector in the KinesisDeserializationSchema. > The problem with that connector lays in the threading model, where there is a > pool of threads that read and deserialize records and then they handover the > deserialized messages through a queue to the main thread. The problem is that > we would need to create many temporary collections to put the deserialized > records into the handover queue, which potentially would significantly affect > performance, especially in the usual case of deserializing a single record > from a single message. > This means that we can currently not support the Debezium format for in the > SQL connector if the Debezium record needs to emit 2 rows (UPDATE_BEFORE and > UPDATE_AFTER). -- This message was sent by Atlassian Jira (v8.3.4#803005)