Boquan Tang created KAFKA-8876:
----------------------------------
Summary: KafkaBasedLog does not throw exception when some
partitions of the topic is offline
Key: KAFKA-8876
URL: https://issues.apache.org/jira/browse/KAFKA-8876
Project: Kafka
Issue Type: Bug
Reporter: Boquan Tang
Currently KafkaBasedLog does not check if *all* partitions in the topic is
online or not, this may result it ignoring partitions that's still recovering
and in turn report to KafkaOffsetBackingStore null offset, while in fact it
should either wait or fail the thread to prompt retry, so the offset can be
correctly loaded by the connector.
Specifically, we are using debezium mysql connector to replicate mysql binlog
to kafka.
In an attempt of restarting after a cluster downage, we observed following:
{code}
2019-08-29T19:27:32Z INFO
[org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Starting
KafkaOffsetBackingStore
2019-08-29T19:27:32Z INFO [org.apache.kafka.connect.util.KafkaBasedLog] [main]
Starting KafkaBasedLog with topic bobqueue-binlog-shovel-v1-offsets
...skipped client config logs...
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-12 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-10 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-21 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-5 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-20 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-18 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-2 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-13 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-11 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-8 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-23 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-7 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-22 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-6 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-3 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-9 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-24 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-4 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher]
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1]
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-19 to offset 0.
2019-08-29T19:27:34Z INFO
[org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Finished
reading offsets topic and starting KafkaOffsetBackingStore
{code}
bobqueue-binlog-shovel-v1-offsets-14, which contains offset for one of our
debezium connectors is not presented. This results in:
{code}
2019-08-29T19:32:09Z cmh-mexec1029 INFO
[io.debezium.connector.mysql.MySqlConnectorTask] [pool-3-thread-1] Found no
existing offset, so preparing to perform a snapshot
{code}
Which is undesirable.
Can we make KafkaBasedLog to check the completeness of partition list and
wait/fail when the check did not pass?
--
This message was sent by Atlassian Jira
(v8.3.2#803003)