[ 
https://issues.apache.org/jira/browse/IGNITE-18209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Shishkov updated IGNITE-18209:
-----------------------------------
    Description: 
Currently, there is a possible bottleneck in synchronized method 
{{KafkaToIgniteMetadataUpdater#updateMetadata}} for cases, when all 
{{KafkaToIgniteCdcStreamerApplier}} instances will try to update metadata at 
the same moment.

Scenario:
 # {{KafkaToIgniteCdcStreamer}} contains multiple 
{{KafkaToIgniteCdcStreamerApplier}} which shares _single_ 
{{{}KafkaToIgniteMetadataUpdater{}}}.
 # All appliers handle corresponding partitions consequently.
 # Insertion of unknown type is performed and leads to type and mapping 
registrations on all nodes.
 # {{META_UPDATE_MARKER}} is sent twice to each Kafka partition of event topic 
{_}from every node{_}: firstly, in case of type mappings updates, secondly, in 
case of binary types update.
 # When first {{KafkaToIgniteCdcStreamerApplier}} meets {{META_UPDATE_MARKER}} 
it calls {{KafkaToIgniteMetadataUpdater#updateMetadata}} which in turn calls 
{{{}KafkaConsumer#poll{}}}, which returns immediately [1] when data is present 
in metadata topic. If there are few binary types and mappings to update, some 
{{KafkaToIgniteCdcStreamerApplier}} thread will consume all entries from 
metadata topic.
 # All other threads of all {{KafkaToIgniteCdcStreamerApplier}} will call 
{{KafkaConsumer#poll}} from empty metadata topic, which will remain blocked 
until new data becomes available or request timeout occurs [1].
 # Because of {{synchronized}} access to 
{{KafkaToIgniteMetadataUpdater#updateMetadata}} all threads of all 
{{KafkaToIgniteCdcStreamerApplier}} will form a sequence of calls. Each call 
will block remaining appliers threads for {{kafkaReqTimeout}} period (if 
metadata topic remains empty).
 # The last call, i.e. last Kafka partition polling in this chain will happen 
at least after {{clusterSize x (topicPartitions x 2 - 1) x kafkaReqTimeout}} 
period. For example for default timeout and 16 Kafka partitions _last partition 
will be consumed after 1.5 minutes_ in case of two one-node clusters.
 # Amount of threads in {{KafkaToIgniteCdcStreamer}} does not make sence.
 # Data updates are blocked for Kafka partitions with unhandled update markers.

As I understand possible solutions are:
 # Eleminate blocking in {{KafkaToIgniteMetadataUpdater#updateMetadata}} caused 
by calling {{KafkaConsumer#poll}} over empty metadata topic. We can try to 
calculate a lag for topic and poll data only when the lag is present. 
[PR#199|https://github.com/apache/ignite-extensions/pull/199]
 # -Hold information about replicated types or get it from 
{{{}BinaryContext{}}}. Information about type can be sent with 
{{{}META_UPDATE_MARKER{}}}:- see PoC 
[PR#187|https://github.com/apache/ignite-extensions/pull/187].
 # {-}Completely remove metadata topic, and send metadata merged with marker 
directly into event topic{-}: see PoC 
[PR#196|https://github.com/apache/ignite-extensions/pull/196]
 # Any other ways to sync appliers?

Links:
 # 
[https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-]

  was:
Currently, there is a possible bottleneck in synchronized method 
{{KafkaToIgniteMetadataUpdater#updateMetadata}} for cases, when all 
{{KafkaToIgniteCdcStreamerApplier}} instances will try to update metadata at 
the same moment.

Scenario:
 # {{KafkaToIgniteCdcStreamer}} contains multiple 
{{KafkaToIgniteCdcStreamerApplier}} which shares _single_ 
{{{}KafkaToIgniteMetadataUpdater{}}}.
 # All appliers handle corrsponding partitions consequently.
 # Insertion of unknown type is performed and leads to type and mapping 
registrations on all nodes.
 # {{META_UPDATE_MARKER}} is sent twice to each Kafka partition of event topic 
{_}from every node{_}: firstly, in case of type mappings updates, secondly, in 
case of binary types update.
 # When first {{KafkaToIgniteCdcStreamerApplier}} meets {{META_UPDATE_MARKER}} 
it calls {{KafkaToIgniteMetadataUpdater#updateMetadata}} which in turn calls 
{{{}KafkaConsumer#poll{}}}, which returns immediately [1] when data is present 
in metadata topic. If there are few binary types and mappings to update, some 
{{KafkaToIgniteCdcStreamerApplier}} thread will consume all entries from 
metadata topic.
 # All other threads of all {{KafkaToIgniteCdcStreamerApplier}} will call 
{{KafkaConsumer#poll}} from empty metadata topic, which will remain blocked 
until new data becomes available or request timeout occurs [1].
 # Because of {{synchronized}} access to 
{{KafkaToIgniteMetadataUpdater#updateMetadata}} all threads of all 
{{KafkaToIgniteCdcStreamerApplier}} will form a sequence of calls. Each call 
will block remaining appliers threads for {{kafkaReqTimeout}} period (if 
metadata topic remains empty).
 # The last call, i.e. last Kafka partition polling in this chain will happen 
at least after {{clusterSize x (topicPartitions x 2 - 1) x kafkaReqTimeout}} 
period. For example for default timeout and 16 Kafka partitions _last partition 
will be consumed after 1.5 minutes_ in case of two one-node clusters.
 # Amount of threads in {{KafkaToIgniteCdcStreamer}} does not make sence.
 # Data updates are blocked for Kafka partitions with unhandled update markers.

As I understand possible solutions are:
 # Eleminate blocking in {{KafkaToIgniteMetadataUpdater#updateMetadata}} caused 
by calling {{KafkaConsumer#poll}} over empty metadata topic. We can try to 
calculate a lag for topic and poll data only when the lag is present. 
[PR#199|https://github.com/apache/ignite-extensions/pull/199]
 # -Hold information about replicated types or get it from 
{{{}BinaryContext{}}}. Information about type can be sent with 
{{{}META_UPDATE_MARKER{}}}:- see PoC 
[PR#187|https://github.com/apache/ignite-extensions/pull/187].
 # {-}Completely remove metadata topic, and send metadata merged with marker 
directly into event topic{-}: see PoC 
[PR#196|https://github.com/apache/ignite-extensions/pull/196]
 # Any other ways to sync appliers?

Links:
 # 
[https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-]


> Reduce binary metadata synchronization time for CDC through Kafka
> -----------------------------------------------------------------
>
>                 Key: IGNITE-18209
>                 URL: https://issues.apache.org/jira/browse/IGNITE-18209
>             Project: Ignite
>          Issue Type: Improvement
>          Components: extensions
>            Reporter: Ilya Shishkov
>            Assignee: Ilya Shishkov
>            Priority: Minor
>              Labels: IEP-59, ise
>
> Currently, there is a possible bottleneck in synchronized method 
> {{KafkaToIgniteMetadataUpdater#updateMetadata}} for cases, when all 
> {{KafkaToIgniteCdcStreamerApplier}} instances will try to update metadata at 
> the same moment.
> Scenario:
>  # {{KafkaToIgniteCdcStreamer}} contains multiple 
> {{KafkaToIgniteCdcStreamerApplier}} which shares _single_ 
> {{{}KafkaToIgniteMetadataUpdater{}}}.
>  # All appliers handle corresponding partitions consequently.
>  # Insertion of unknown type is performed and leads to type and mapping 
> registrations on all nodes.
>  # {{META_UPDATE_MARKER}} is sent twice to each Kafka partition of event 
> topic {_}from every node{_}: firstly, in case of type mappings updates, 
> secondly, in case of binary types update.
>  # When first {{KafkaToIgniteCdcStreamerApplier}} meets 
> {{META_UPDATE_MARKER}} it calls 
> {{KafkaToIgniteMetadataUpdater#updateMetadata}} which in turn calls 
> {{{}KafkaConsumer#poll{}}}, which returns immediately [1] when data is 
> present in metadata topic. If there are few binary types and mappings to 
> update, some {{KafkaToIgniteCdcStreamerApplier}} thread will consume all 
> entries from metadata topic.
>  # All other threads of all {{KafkaToIgniteCdcStreamerApplier}} will call 
> {{KafkaConsumer#poll}} from empty metadata topic, which will remain blocked 
> until new data becomes available or request timeout occurs [1].
>  # Because of {{synchronized}} access to 
> {{KafkaToIgniteMetadataUpdater#updateMetadata}} all threads of all 
> {{KafkaToIgniteCdcStreamerApplier}} will form a sequence of calls. Each call 
> will block remaining appliers threads for {{kafkaReqTimeout}} period (if 
> metadata topic remains empty).
>  # The last call, i.e. last Kafka partition polling in this chain will happen 
> at least after {{clusterSize x (topicPartitions x 2 - 1) x kafkaReqTimeout}} 
> period. For example for default timeout and 16 Kafka partitions _last 
> partition will be consumed after 1.5 minutes_ in case of two one-node 
> clusters.
>  # Amount of threads in {{KafkaToIgniteCdcStreamer}} does not make sence.
>  # Data updates are blocked for Kafka partitions with unhandled update 
> markers.
> As I understand possible solutions are:
>  # Eleminate blocking in {{KafkaToIgniteMetadataUpdater#updateMetadata}} 
> caused by calling {{KafkaConsumer#poll}} over empty metadata topic. We can 
> try to calculate a lag for topic and poll data only when the lag is present. 
> [PR#199|https://github.com/apache/ignite-extensions/pull/199]
>  # -Hold information about replicated types or get it from 
> {{{}BinaryContext{}}}. Information about type can be sent with 
> {{{}META_UPDATE_MARKER{}}}:- see PoC 
> [PR#187|https://github.com/apache/ignite-extensions/pull/187].
>  # {-}Completely remove metadata topic, and send metadata merged with marker 
> directly into event topic{-}: see PoC 
> [PR#196|https://github.com/apache/ignite-extensions/pull/196]
>  # Any other ways to sync appliers?
> Links:
>  # 
> [https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to