[ https://issues.apache.org/jira/browse/KAFKA-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035770#comment-16035770 ]
ASF GitHub Bot commented on KAFKA-5355: --------------------------------------- GitHub user hachikuji opened a pull request: https://github.com/apache/kafka/pull/3221 KAFKA-5355: DelayedFetch should propagate isolation level to log read You can merge this pull request into a Git repository by running: $ git pull https://github.com/hachikuji/kafka KAFKA-5355 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3221.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3221 ---- commit 82430eefb669718a35b93a34a2c8afec2230351b Author: Jason Gustafson <ja...@confluent.io> Date: 2017-06-03T02:26:39Z KAFKA-5355: DelayedFetch should propagate isolation level to log read ---- > Broker returns messages beyond "latest stable offset" to transactional > consumer in read_committed mode > ------------------------------------------------------------------------------------------------------ > > Key: KAFKA-5355 > URL: https://issues.apache.org/jira/browse/KAFKA-5355 > Project: Kafka > Issue Type: Sub-task > Components: core > Affects Versions: 0.11.0.0 > Reporter: Matthias J. Sax > Assignee: Jason Gustafson > Priority: Blocker > Labels: exactly-once > Fix For: 0.11.0.0 > > Attachments: test.log > > > This issue is exposed by the new Streams EOS integration test. > Streams has two tasks (ie, two producers with {{pid}} 0 and 2000) both > writing to output topic {{output}} with one partition (replication factor 1). > The test uses an transactional consumer with {{group.id=readCommitted}} to > read the data from {{output}} topic. When it read the data, each producer has > committed 10 records (one producer write messages with {{key=0}} and the > other with {{key=1}}). Furthermore, each producer has an open transaction and > 5 uncommitted records written. > The test fails, as we expect to see 10 records per key, but we get 15 for > key=1: > {noformat} > java.lang.AssertionError: > Expected: <[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), > KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), > KeyValue(1, 36), KeyValue(1, 45)]> > but: was <[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, > 6), KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), > KeyValue(1, 36), KeyValue(1, 45), KeyValue(1, 55), KeyValue(1, 66), > KeyValue(1, 78), KeyValue(1, 91), KeyValue(1, 105)]> > {noformat} > Dumping the segment shows, that there are two commit markers (one for each > producer) for the first 10 messages written. Furthermore, there are 5 pending > records. Thus, "latest stable offset" should be 21 (20 messages plus 2 commit > markers) and not data should be returned beyond this offset. > Dumped Log Segment {{output-0}} > {noformat} > Starting offset: 0 > baseOffset: 0 lastOffset: 9 baseSequence: 0 lastSequence: 9 producerId: 0 > producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 0 > CreateTime: 1496255947332 isvalid: true size: 291 magic: 2 compresscodec: > NONE crc: 600535135 > baseOffset: 10 lastOffset: 10 baseSequence: -1 lastSequence: -1 producerId: 0 > producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 291 > CreateTime: 1496256005429 isvalid: true size: 78 magic: 2 compresscodec: NONE > crc: 3458060752 > baseOffset: 11 lastOffset: 20 baseSequence: 0 lastSequence: 9 producerId: > 2000 producerEpoch: 2 partitionLeaderEpoch: 0 isTransactional: true position: > 369 CreateTime: 1496255947322 isvalid: true size: 291 magic: 2 compresscodec: > NONE crc: 3392915713 > baseOffset: 21 lastOffset: 25 baseSequence: 10 lastSequence: 14 producerId: 0 > producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 660 > CreateTime: 1496255947342 isvalid: true size: 176 magic: 2 compresscodec: > NONE crc: 3513911368 > baseOffset: 26 lastOffset: 26 baseSequence: -1 lastSequence: -1 producerId: > 2000 producerEpoch: 2 partitionLeaderEpoch: 0 isTransactional: true position: > 836 CreateTime: 1496256011784 isvalid: true size: 78 magic: 2 compresscodec: > NONE crc: 1619151485 > {noformat} > Dump with {{--deep-iteration}} > {noformat} > Starting offset: 0 > offset: 0 position: 0 CreateTime: 1496255947323 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 0 > headerKeys: [] key: 1 payload: 0 > offset: 1 position: 0 CreateTime: 1496255947324 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 1 > headerKeys: [] key: 1 payload: 1 > offset: 2 position: 0 CreateTime: 1496255947325 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 2 > headerKeys: [] key: 1 payload: 3 > offset: 3 position: 0 CreateTime: 1496255947326 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 3 > headerKeys: [] key: 1 payload: 6 > offset: 4 position: 0 CreateTime: 1496255947327 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 4 > headerKeys: [] key: 1 payload: 10 > offset: 5 position: 0 CreateTime: 1496255947328 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 5 > headerKeys: [] key: 1 payload: 15 > offset: 6 position: 0 CreateTime: 1496255947329 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 6 > headerKeys: [] key: 1 payload: 21 > offset: 7 position: 0 CreateTime: 1496255947330 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 7 > headerKeys: [] key: 1 payload: 28 > offset: 8 position: 0 CreateTime: 1496255947331 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 8 > headerKeys: [] key: 1 payload: 36 > offset: 9 position: 0 CreateTime: 1496255947332 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 9 > headerKeys: [] key: 1 payload: 45 > offset: 10 position: 291 CreateTime: 1496256005429 isvalid: true keysize: 4 > valuesize: 6 magic: 2 compresscodec: NONE crc: 3458060752 sequence: -1 > headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0 > offset: 11 position: 369 CreateTime: 1496255947313 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 0 > headerKeys: [] key: 0 payload: 0 > offset: 12 position: 369 CreateTime: 1496255947314 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 1 > headerKeys: [] key: 0 payload: 1 > offset: 13 position: 369 CreateTime: 1496255947315 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 2 > headerKeys: [] key: 0 payload: 3 > offset: 14 position: 369 CreateTime: 1496255947316 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 3 > headerKeys: [] key: 0 payload: 6 > offset: 15 position: 369 CreateTime: 1496255947317 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 4 > headerKeys: [] key: 0 payload: 10 > offset: 16 position: 369 CreateTime: 1496255947318 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 5 > headerKeys: [] key: 0 payload: 15 > offset: 17 position: 369 CreateTime: 1496255947319 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 6 > headerKeys: [] key: 0 payload: 21 > offset: 18 position: 369 CreateTime: 1496255947320 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 7 > headerKeys: [] key: 0 payload: 28 > offset: 19 position: 369 CreateTime: 1496255947321 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 8 > headerKeys: [] key: 0 payload: 36 > offset: 20 position: 369 CreateTime: 1496255947322 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 9 > headerKeys: [] key: 0 payload: 45 > offset: 21 position: 660 CreateTime: 1496255947338 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 10 > headerKeys: [] key: 1 payload: 55 > offset: 22 position: 660 CreateTime: 1496255947339 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 11 > headerKeys: [] key: 1 payload: 66 > offset: 23 position: 660 CreateTime: 1496255947340 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 12 > headerKeys: [] key: 1 payload: 78 > offset: 24 position: 660 CreateTime: 1496255947341 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 13 > headerKeys: [] key: 1 payload: 91 > offset: 25 position: 660 CreateTime: 1496255947342 isvalid: true keysize: 8 > valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 14 > headerKeys: [] key: 1 payload: 105 > offset: 26 position: 836 CreateTime: 1496256011784 isvalid: true keysize: 4 > valuesize: 6 magic: 2 compresscodec: NONE crc: 1619151485 sequence: -1 > headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0 > {noformat} > The client log shows, that the reading and writing happen concurrently. > {noformat} > [2017-05-31 11:40:11,642] DEBUG Resetting offset for partition outputTopic-0 > to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:418) > [2017-05-31 11:40:11,642] DEBUG Added fetch request for partition > outputTopic-0 at offset 0 to node 127.0.0.1:64267 (id: 0 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher:793) > [2017-05-31 11:40:11,642] DEBUG Sending fetch for partitions [outputTopic-0] > to broker 127.0.0.1:64267 (id: 0 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher:203) > [2017-05-31 11:40:11,643] DEBUG Added fetch request for partition > outputTopic-0 at offset 11 to node 127.0.0.1:64267 (id: 0 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher:793) > [2017-05-31 11:40:11,643] DEBUG Sending fetch for partitions [outputTopic-0] > to broker 127.0.0.1:64267 (id: 0 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher:203) > [2017-05-31 11:40:11,673] DEBUG TransactionalId: appId-1-0_1 -- Sending > transactional request (type=FindCoordinatorRequest, coordinatorKey=appId-1, > coordinatorType=GROUP) > (org.apache.kafka.clients.producer.internals.Sender:314) > [2017-05-31 11:40:11,674] DEBUG TransactionalId appId-1-0_1 -- Received > FindCoordinator response with error NONE > (org.apache.kafka.clients.producer.internals.TransactionManager:738) > [2017-05-31 11:40:11,674] DEBUG TransactionalId: appId-1-0_1 -- Sending > transactional request (transactionalId=appId-1-0_1, producerId=2000, > producerEpoch=2, consumerGroupId=appId-1, > offsets={inputTopic-1=CommittedOffset(offset=10, metadata='')}) > (org.apache.kafka.clients.producer.internals.Sender:314) > [2017-05-31 11:40:11,717] DEBUG Dest: 127.0.0.1:64267 (id: 0 rack: null) : > producerId: 0, epoch: 6, Assigning sequence for outputTopic-0: 10 > (org.apache.kafka.clients.producer.internals.RecordAccumulator:488) > [2017-05-31 11:40:11,717] DEBUG Dest: 127.0.0.1:64273 (id: 2 rack: null) : > producerId: 0, epoch: 6, Assigning sequence for appId-1-store-changelog-0: 10 > (org.apache.kafka.clients.producer.internals.RecordAccumulator:488) > [2017-05-31 11:40:11,718] DEBUG Incremented sequence number for > topic-partition appId-1-store-changelog-0 to 15 > (org.apache.kafka.clients.producer.internals.Sender:555) > [2017-05-31 11:40:11,718] DEBUG Incremented sequence number for > topic-partition outputTopic-0 to 15 > (org.apache.kafka.clients.producer.internals.Sender:555) > [2017-05-31 11:40:11,780] DEBUG TransactionalId appId-1-0_1 -- Received > TxnOffsetCommit response with errors {inputTopic-1=NONE} > (org.apache.kafka.clients.producer.internals.TransactionManager:900) > [2017-05-31 11:40:11,780] DEBUG TransactionalId appId-1-0_1 -- Transition > from state IN_TRANSACTION to COMMITTING_TRANSACTION > (org.apache.kafka.clients.producer.internals.TransactionManager:427) > [2017-05-31 11:40:11,780] DEBUG TransactionalId: appId-1-0_1 -- Sending > transactional request (transactionalId=appId-1-0_1, producerId=2000, > producerEpoch=2, result=COMMIT) > (org.apache.kafka.clients.producer.internals.Sender:314) > [2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Received > EndTxn response with error NONE > (org.apache.kafka.clients.producer.internals.TransactionManager:792) > [2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Transition > from state COMMITTING_TRANSACTION to READY > (org.apache.kafka.clients.producer.internals.TransactionManager:427) > [2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Transition > from state READY to IN_TRANSACTION > (org.apache.kafka.clients.producer.internals.TransactionManager:427) > [2017-05-31 11:40:11,782] DEBUG Initiating connection to node 0 at > 127.0.0.1:64267. (org.apache.kafka.clients.NetworkClient:707) > [2017-05-31 11:40:11,782] DEBUG Added fetch request for partition > inputTopic-1 at offset 11 to node 127.0.0.1:64267 (id: 0 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher:793) > [2017-05-31 11:40:11,782] DEBUG Sending fetch for partitions [inputTopic-1] > to broker 127.0.0.1:64267 (id: 0 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher:203) > [2017-05-31 11:40:11,783] DEBUG Completed connection to node 0. Ready. > (org.apache.kafka.clients.NetworkClient:672) > [2017-05-31 11:40:11,783] DEBUG TransactionalId: appId-1-0_1 -- Sending > transactional request (transactionalId=appId-1-0_1, producerId=2000, > producerEpoch=2, partitions=[outputTopic-0, appId-1-store-changelog-1]) > (org.apache.kafka.clients.producer.internals.Sender:314) > [2017-05-31 11:40:11,784] DEBUG Added fetch request for partition > inputTopic-1 at offset 15 to node 127.0.0.1:64267 (id: 0 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher:793) > [2017-05-31 11:40:11,784] DEBUG Sending fetch for partitions [inputTopic-1] > to broker 127.0.0.1:64267 (id: 0 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher:203) > [2017-05-31 11:40:11,784] DEBUG TransactionalId appId-1-0_1 -- Received > AddPartitionsToTxn response with errors > {outputTopic-0=CONCURRENT_TRANSACTIONS, > appId-1-store-changelog-1=CONCURRENT_TRANSACTIONS} > (org.apache.kafka.clients.producer.internals.TransactionManager:658) > [2017-05-31 11:40:11,784] DEBUG TransactionalId: appId-1-0_1 -- Sending > transactional request (transactionalId=appId-1-0_1, producerId=2000, > producerEpoch=2, partitions=[outputTopic-0, appId-1-store-changelog-1]) > (org.apache.kafka.clients.producer.internals.Sender:314) > [2017-05-31 11:40:11,784] DEBUG Added fetch request for partition > outputTopic-0 at offset 27 to node 127.0.0.1:64267 (id: 0 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher:793) > [2017-05-31 11:40:11,785] DEBUG Sending fetch for partitions [outputTopic-0] > to broker 127.0.0.1:64267 (id: 0 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher:203) > {noformat} > (full log attached) -- This message was sent by Atlassian JIRA (v6.3.15#6346)