[ 
https://issues.apache.org/jira/browse/IGNITE-18209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Shishkov updated IGNITE-18209:
-----------------------------------
    Description: 
Currently, there is a bottleneck in synchronized method 
{{KafkaToIgniteMetadataUpdater#updateMetadata}}:
# {{KafkaToIgniteCdcStreamer}} contains multiple 
{{KafkaToIgniteCdcStreamerApplier}} which shares _single_ 
{{KafkaToIgniteMetadataUpdater}}.
# All appliers handle corrsponding partitions consequently.
# {{META_UPDATE_MARKER}} is sent twice to each partition of event topic: 
firstly, in case of type mappings updates, secondly, in case of binary types 
update.
# When first {{KafkaToIgniteCdcStreamerApplier}} meets {{META_UPDATE_MARKER}} 
it calls {{KafkaToIgniteMetadataUpdater#updateMetadata}} which in turn calls 
{{KafkaConsumer#poll}}.
# {{KafkaConsumer#poll}} returns immediately [1] when there are data in 
metadata topic. If there are few binary types and mappings to update, some 
{{KafkaToIgniteCdcStreamerApplier}} thread will consume all entries from 
metadata topic.
# All other threads of all {{KafkaToIgniteCdcStreamerApplier}} will call 
{{KafkaConsumer#poll}} from empty metadata topic, which will remain blocked 
until new data becomes available or request timeout occurs [1].
# Because of {{synchronized}} access to 
{{KafkaToIgniteMetadataUpdater#updateMetadata}} all threads of all 
{{KafkaToIgniteCdcStreamerApplier}} will form a sequence of calls. Each call 
will block remaining appliers threads for {{kafkaReqTimeout}} period (if 
metadata topic remains empty).
# The last call, i.e. last partition polling in this chain will happen at least 
after {{(partitionsCount x 2 - 1) x kafkaReqTimeout}} period. For example for 
default timeout and 16 partitions _last partition will be consumed after 1.5 
minutes_. 
# Amount of threads in {{KafkaToIgniteCdcStreamer}} does not make sence.
# Data updates are in fact blocked for partitions with unhandled update markers.

Links:
# 
https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-

  was:
Currently, there is a bottleneck in synchronized method 
{{KafkaToIgniteMetadataUpdater#updateMetadata}}:
# {{KafkaToIgniteCdcStreamer}} contains multiple 
{{KafkaToIgniteCdcStreamerApplier}} which shares _single_ 
{{KafkaToIgniteMetadataUpdater}}.
# All appliers handle corrsponding partitions consequently.
# {{META_UPDATE_MARKER}} is sent twice to each partition of event topic: 
firstly, in case of type mappings updates, secondly, in case of binary types 
update.
# When first {{KafkaToIgniteCdcStreamerApplier}} meets {{META_UPDATE_MARKER}} 
it calls {{KafkaToIgniteMetadataUpdater#updateMetadata}} which in turn calls 
{{KafkaConsumer#poll}}.
# {{KafkaConsumer#poll}} returns immediately [1] when there are data in 
metadata topic. If there are few binary types and mappings to update, some 
{{KafkaToIgniteCdcStreamerApplier}} thread will consume all entries from 
metadata topic.
# All other threads of all {{KafkaToIgniteCdcStreamerApplier}} will call 
{{KafkaConsumer#poll}} from empty metadata topic, which will remain blocked 
until new data becomes available or request timeout occurs [1].
# Because of {{synchronized}} access to 
{{KafkaToIgniteMetadataUpdater#updateMetadata}} all threads of all 
{{KafkaToIgniteCdcStreamerApplier}} will form a sequence of calls. Each call 
will block remaining appliers threads for {{kafkaReqTimeout}} period (if 
metadata topic remains empty).
# The last call, i.e. last partition polling in this chain will happen at least 
after {{(partitionsCount x 2 - 1) x kafkaReqTimeout}} period. For example for 
default timeout and 16 partitions _last partition will be consumed after 1.5 
minutes_. 
# Amount of threads in {{KafkaToIgniteCdcStreamer}} does not make sence.

Links:
# 
https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-


> Reduce binary metadata synchronization time for CDC through Kafka
> -----------------------------------------------------------------
>
>                 Key: IGNITE-18209
>                 URL: https://issues.apache.org/jira/browse/IGNITE-18209
>             Project: Ignite
>          Issue Type: Improvement
>          Components: extensions
>            Reporter: Ilya Shishkov
>            Priority: Minor
>              Labels: IEP-59, ise
>
> Currently, there is a bottleneck in synchronized method 
> {{KafkaToIgniteMetadataUpdater#updateMetadata}}:
> # {{KafkaToIgniteCdcStreamer}} contains multiple 
> {{KafkaToIgniteCdcStreamerApplier}} which shares _single_ 
> {{KafkaToIgniteMetadataUpdater}}.
> # All appliers handle corrsponding partitions consequently.
> # {{META_UPDATE_MARKER}} is sent twice to each partition of event topic: 
> firstly, in case of type mappings updates, secondly, in case of binary types 
> update.
> # When first {{KafkaToIgniteCdcStreamerApplier}} meets {{META_UPDATE_MARKER}} 
> it calls {{KafkaToIgniteMetadataUpdater#updateMetadata}} which in turn calls 
> {{KafkaConsumer#poll}}.
> # {{KafkaConsumer#poll}} returns immediately [1] when there are data in 
> metadata topic. If there are few binary types and mappings to update, some 
> {{KafkaToIgniteCdcStreamerApplier}} thread will consume all entries from 
> metadata topic.
> # All other threads of all {{KafkaToIgniteCdcStreamerApplier}} will call 
> {{KafkaConsumer#poll}} from empty metadata topic, which will remain blocked 
> until new data becomes available or request timeout occurs [1].
> # Because of {{synchronized}} access to 
> {{KafkaToIgniteMetadataUpdater#updateMetadata}} all threads of all 
> {{KafkaToIgniteCdcStreamerApplier}} will form a sequence of calls. Each call 
> will block remaining appliers threads for {{kafkaReqTimeout}} period (if 
> metadata topic remains empty).
> # The last call, i.e. last partition polling in this chain will happen at 
> least after {{(partitionsCount x 2 - 1) x kafkaReqTimeout}} period. For 
> example for default timeout and 16 partitions _last partition will be 
> consumed after 1.5 minutes_. 
> # Amount of threads in {{KafkaToIgniteCdcStreamer}} does not make sence.
> # Data updates are in fact blocked for partitions with unhandled update 
> markers.
> Links:
> # 
> https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to