[ https://issues.apache.org/jira/browse/FLINK-27320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser closed FLINK-27320. ---------------------------------- Resolution: Done > When using KafkaSink EXACTLY_ONCE semantics, frequent > OutOfOrderSequenceException anomalies > ------------------------------------------------------------------------------------------- > > Key: FLINK-27320 > URL: https://issues.apache.org/jira/browse/FLINK-27320 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.14.4 > Reporter: Zhengqi Zhang > Priority: Major > Attachments: image-2022-04-20-17-48-37-149.png, > image-2022-04-20-17-49-15-143.png > > > This problem does not occur when using EXACTLY_ONCE semantics in > FlinkKafkaProducer, but occurs frequently when using KafkaSink. > !image-2022-04-20-17-48-37-149.png|width=573,height=220! > !image-2022-04-20-17-49-15-143.png|width=818,height=469! > This is ProducerConfig when using KafkaSink: > {code:java} > acks = 1 > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = false > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > linger.ms = 0 > max.block.ms = 60000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2147483647 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > security.providers = null > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 300000 > transactional.id = kafka-sink-0-36 > value.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer{code} > This is ProducerConfig when using FlinkKafkaProducer: > > {code:java} > acks = 1 > batch.size = 16384 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = default > client.id = > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = false > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > linger.ms = 0 > max.block.ms = 60000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2147483647 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > security.providers = null > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 300000 > transactional.id = Sink: Unnamed-0a448493b4782967b150582570326227-2 > value.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer{code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)