[ https://issues.apache.org/jira/browse/IGNITE-18209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ilya Shishkov updated IGNITE-18209: ----------------------------------- Description: Currently, there is a possible bottleneck in synchronized method {{KafkaToIgniteMetadataUpdater#updateMetadata}} for cases, when all {{KafkaToIgniteCdcStreamerApplier}} instances will try to update metadata at the same moment. Scenario: # {{KafkaToIgniteCdcStreamer}} contains multiple {{KafkaToIgniteCdcStreamerApplier}} which shares _single_ {{KafkaToIgniteMetadataUpdater}}. # All appliers handle corrsponding partitions consequently. # Insertion of unknown type is performed and leads to type and mapping registrations on all nodes. # {{META_UPDATE_MARKER}} is sent twice to each Kafka partition of event topic _from every node_: firstly, in case of type mappings updates, secondly, in case of binary types update. # When first {{KafkaToIgniteCdcStreamerApplier}} meets {{META_UPDATE_MARKER}} it calls {{KafkaToIgniteMetadataUpdater#updateMetadata}} which in turn calls {{KafkaConsumer#poll}}, which returns immediately [1] when data is present in metadata topic. If there are few binary types and mappings to update, some {{KafkaToIgniteCdcStreamerApplier}} thread will consume all entries from metadata topic. # All other threads of all {{KafkaToIgniteCdcStreamerApplier}} will call {{KafkaConsumer#poll}} from empty metadata topic, which will remain blocked until new data becomes available or request timeout occurs [1]. # Because of {{synchronized}} access to {{KafkaToIgniteMetadataUpdater#updateMetadata}} all threads of all {{KafkaToIgniteCdcStreamerApplier}} will form a sequence of calls. Each call will block remaining appliers threads for {{kafkaReqTimeout}} period (if metadata topic remains empty). # The last call, i.e. last Kafka partition polling in this chain will happen at least after {{clusterSize x (topicPartitions x 2 - 1) x kafkaReqTimeout}} period. For example for default timeout and 16 Kafka partitions _last partition will be consumed after 1.5 minutes_ in case of 1 node cluser. # Amount of threads in {{KafkaToIgniteCdcStreamer}} does not make sence. # Data updates are blocked for Kafka partitions with unhandled update markers. As I understand possible solutions are: # Eleminate blocking in {{KafkaToIgniteMetadataUpdater#updateMetadata}} caused by calling {{KafkaConsumer#poll}} over empty metadata topic. We can try to calculate a lag for topic and poll data only when the lag is present. # Hold information about replicated types or get it from {{BinaryContext}}. Information about type can be sent with {{META_UPDATE_MARKER}}: see PoC [PR#187|https://github.com/apache/ignite-extensions/pull/187]. # Completely remove metadata topic, and send metadata merged with marker directly into event topic: see PoC [PR#196|https://github.com/apache/ignite-extensions/pull/196] # Any other ways to sync appliers? As a PoC of approach with {{BinaryContext}} I have prepared PR [2]. Links: # https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration- # https://github.com/apache/ignite-extensions/pull/187 # https://github.com/apache/ignite-extensions/pull/196 was: Currently, there is a possible bottleneck in synchronized method {{KafkaToIgniteMetadataUpdater#updateMetadata}} for cases, when all {{KafkaToIgniteCdcStreamerApplier}} instances will try to update metadata at the same moment. Scenario: # {{KafkaToIgniteCdcStreamer}} contains multiple {{KafkaToIgniteCdcStreamerApplier}} which shares _single_ {{KafkaToIgniteMetadataUpdater}}. # All appliers handle corrsponding partitions consequently. # Insertion of unknown type is performed and leads to type and mapping registrations on all nodes. # {{META_UPDATE_MARKER}} is sent twice to each Kafka partition of event topic _from every node_: firstly, in case of type mappings updates, secondly, in case of binary types update. # When first {{KafkaToIgniteCdcStreamerApplier}} meets {{META_UPDATE_MARKER}} it calls {{KafkaToIgniteMetadataUpdater#updateMetadata}} which in turn calls {{KafkaConsumer#poll}}, which returns immediately [1] when data is present in metadata topic. If there are few binary types and mappings to update, some {{KafkaToIgniteCdcStreamerApplier}} thread will consume all entries from metadata topic. # All other threads of all {{KafkaToIgniteCdcStreamerApplier}} will call {{KafkaConsumer#poll}} from empty metadata topic, which will remain blocked until new data becomes available or request timeout occurs [1]. # Because of {{synchronized}} access to {{KafkaToIgniteMetadataUpdater#updateMetadata}} all threads of all {{KafkaToIgniteCdcStreamerApplier}} will form a sequence of calls. Each call will block remaining appliers threads for {{kafkaReqTimeout}} period (if metadata topic remains empty). # The last call, i.e. last Kafka partition polling in this chain will happen at least after {{clusterSize x (topicPartitions x 2 - 1) x kafkaReqTimeout}} period. For example for default timeout and 16 Kafka partitions _last partition will be consumed after 1.5 minutes_ in case of 1 node cluser. # Amount of threads in {{KafkaToIgniteCdcStreamer}} does not make sence. # Data updates are blocked for Kafka partitions with unhandled update markers. As I understand possible solutions are: # Eleminate blocking in {{KafkaToIgniteMetadataUpdater#updateMetadata}} caused by calling {{KafkaConsumer#poll}} over empty metadata topic. # Hold information about replicated types or get it from {{BinaryContext}}. Information about type can be sent with {{META_UPDATE_MARKER}}: see PoC [PR#187|https://github.com/apache/ignite-extensions/pull/187]. # Completely remove metadata topic, and send metadata merged with marker directly into event topic: see PoC [PR#196|https://github.com/apache/ignite-extensions/pull/196] # Any other ways to sync appliers? As a PoC of approach with {{BinaryContext}} I have prepared PR [2]. Links: # https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration- # https://github.com/apache/ignite-extensions/pull/187 # https://github.com/apache/ignite-extensions/pull/196 > Reduce binary metadata synchronization time for CDC through Kafka > ----------------------------------------------------------------- > > Key: IGNITE-18209 > URL: https://issues.apache.org/jira/browse/IGNITE-18209 > Project: Ignite > Issue Type: Improvement > Components: extensions > Reporter: Ilya Shishkov > Assignee: Ilya Shishkov > Priority: Minor > Labels: IEP-59, ise > > Currently, there is a possible bottleneck in synchronized method > {{KafkaToIgniteMetadataUpdater#updateMetadata}} for cases, when all > {{KafkaToIgniteCdcStreamerApplier}} instances will try to update metadata at > the same moment. > Scenario: > # {{KafkaToIgniteCdcStreamer}} contains multiple > {{KafkaToIgniteCdcStreamerApplier}} which shares _single_ > {{KafkaToIgniteMetadataUpdater}}. > # All appliers handle corrsponding partitions consequently. > # Insertion of unknown type is performed and leads to type and mapping > registrations on all nodes. > # {{META_UPDATE_MARKER}} is sent twice to each Kafka partition of event topic > _from every node_: firstly, in case of type mappings updates, secondly, in > case of binary types update. > # When first {{KafkaToIgniteCdcStreamerApplier}} meets {{META_UPDATE_MARKER}} > it calls {{KafkaToIgniteMetadataUpdater#updateMetadata}} which in turn calls > {{KafkaConsumer#poll}}, which returns immediately [1] when data is present in > metadata topic. If there are few binary types and mappings to update, some > {{KafkaToIgniteCdcStreamerApplier}} thread will consume all entries from > metadata topic. > # All other threads of all {{KafkaToIgniteCdcStreamerApplier}} will call > {{KafkaConsumer#poll}} from empty metadata topic, which will remain blocked > until new data becomes available or request timeout occurs [1]. > # Because of {{synchronized}} access to > {{KafkaToIgniteMetadataUpdater#updateMetadata}} all threads of all > {{KafkaToIgniteCdcStreamerApplier}} will form a sequence of calls. Each call > will block remaining appliers threads for {{kafkaReqTimeout}} period (if > metadata topic remains empty). > # The last call, i.e. last Kafka partition polling in this chain will happen > at least after {{clusterSize x (topicPartitions x 2 - 1) x kafkaReqTimeout}} > period. For example for default timeout and 16 Kafka partitions _last > partition will be consumed after 1.5 minutes_ in case of 1 node cluser. > # Amount of threads in {{KafkaToIgniteCdcStreamer}} does not make sence. > # Data updates are blocked for Kafka partitions with unhandled update markers. > As I understand possible solutions are: > # Eleminate blocking in {{KafkaToIgniteMetadataUpdater#updateMetadata}} > caused by calling {{KafkaConsumer#poll}} over empty metadata topic. We can > try to calculate a lag for topic and poll data only when the lag is present. > # Hold information about replicated types or get it from {{BinaryContext}}. > Information about type can be sent with {{META_UPDATE_MARKER}}: see PoC > [PR#187|https://github.com/apache/ignite-extensions/pull/187]. > # Completely remove metadata topic, and send metadata merged with marker > directly into event topic: see PoC > [PR#196|https://github.com/apache/ignite-extensions/pull/196] > # Any other ways to sync appliers? > As a PoC of approach with {{BinaryContext}} I have prepared PR [2]. > Links: > # > https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration- > # https://github.com/apache/ignite-extensions/pull/187 > # https://github.com/apache/ignite-extensions/pull/196 -- This message was sent by Atlassian Jira (v8.20.10#820010)