j-riebe opened a new issue, #16689:
URL: https://github.com/apache/druid/issues/16689

   ### Affected Version
   
   Reproduced with:
   * v29.0.1
   * v30.0.0
   
   ### Description
   
   We observed that the `KafkaSupervisor` won't enter idle mode after 
processing all messages even though no new messages are produced in 
`idleAfterMillis` duration.
   We traced this problem down to cases were the last message has been produced 
inside a kafka transaction.
   
   In this case, the offset lag reported in the supervisor status only reduces 
to a minium of 1 instead of 0 when all messages have been processed.
   This is caused by `currentOffsets` being less than `latestOffsets` in the 
supervisor status.
   Our assumption is that the offset lag is wrongly computed because the 
`latestOffsets` might point to kafka transaction markers (more on that in 
section Mutual error cause).
   
   **Note:** This issue **only appears** when the **last message** has been 
written as part of a **kafka transaction**. The `lastestOffsets` will point to 
**the offset after** the last written (possibly aborted) message which 
corresponds to the transaction marker.
   When producing a non-transactional message after an arbitrary amount of 
transactional messages the lag is correctly calculated as 0 and idle mode is 
entered.
   
   ### Minimal example
   _The following is tested against a single topic & single partition 
configuration.
   See last section for a full reproduction of this behaviour._
   
   Consider we insert only one single message into a kafka topic as part of a 
kafka transaction.
   This message will have the offset 0.
   As the kafka supervisor reports the next offset to be processed we would 
expect the supervisor to report `currentOffsets == latestOffsets == 1`.
   
   But the status is actually reported as:
   * `currentOffsets` (actually next to-be-processed offset): 1
   * `latestOffsets` (expected next to-be-processed offset): 2
   
   This leads to a lag of 1 which should indicate that there is 1 message to 
consume.
   But as there are no more messages this is unexpected behaviour.
   
   <details><summary>Example status after single message with offset 0</summary>
   <p>
   
   ```json
   {
     "dataSource": "test-idle-mode",
     "stream": "test-idle-mode",
     "partitions": 1,
     "replicas": 1,
     "durationSeconds": 3600,
     "activeTasks": [
       {
         "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
         "startingOffsets": {
           "0": 0
         },
         "startTime": "2024-07-03T15:04:34.022Z",
         "remainingSeconds": 3568,
         "type": "ACTIVE",
         "currentOffsets": {
           "0": 1
         },
         "lag": {
           "0": 1
         }
       }
     ],
     "publishingTasks": [],
     "latestOffsets": {
       "0": 2
     },
     "minimumLag": {
       "0": 1
     },
     "aggregateLag": 1,
     "offsetsLastUpdated": "2024-07-03T15:05:02.179Z",
     "suspended": false,
     "healthy": true,
     "state": "RUNNING",
     "detailedState": "RUNNING",
     "recentErrors": []
   }
   ```
   
   </p>
   </details> 
   
   We further found out that it is possible to further increment the 
`latestOffsets` when producing messages in an aborted transaction.
   The `currentOffsets` are not changed by this as they seems to correctly 
report the offset of the last processed message +1.
   But `latestOffsets` is now reported as 4, leading to a lag of 3 but there 
are still no new messages to process.
   
   <details><summary>Updated status after producing another message and 
aborting transaction</summary>
   <p>
   
   ```json
   {
     "dataSource": "test-idle-mode",
     "stream": "test-idle-mode",
     "partitions": 1,
     "replicas": 1,
     "durationSeconds": 3600,
     "activeTasks": [
       {
         "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
         "startingOffsets": {
           "0": 0
         },
         "startTime": "2024-07-03T15:04:34.022Z",
         "remainingSeconds": 1607,
         "type": "ACTIVE",
         "currentOffsets": {
           "0": 1
         },
         "lag": {
           "0": 3
         }
       }
     ],
     "publishingTasks": [],
     "latestOffsets": {
       "0": 4
     },
     "minimumLag": {
       "0": 3
     },
     "aggregateLag": 3,
     "offsetsLastUpdated": "2024-07-03T15:37:32.514Z",
     "suspended": false,
     "healthy": true,
     "state": "RUNNING",
     "detailedState": "RUNNING",
     "recentErrors": []
   }
   ```
   
   </p>
   </details> 
   
   We can **rule out issues with the `isolation.level`** because the 
`latestOffsets` only get updated once the transaction is either committed or 
aborted. So the process seems to adhere to `isolation.level = read_committed`.
   
   By producing a new message without a transaction it is now possible to 
reduce the lag to 0.
   This will result in `currentOffsets == latestOffsets == 5` (after processing 
just **2** valid messages!).
   
   <details><summary>Status after producing non-transactional message</summary>
   <p>
   
   ```json
   {
     "dataSource": "test-idle-mode",
     "stream": "test-idle-mode",
     "partitions": 1,
     "replicas": 1,
     "durationSeconds": 3600,
     "activeTasks": [
       {
         "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
         "startingOffsets": {
           "0": 0
         },
         "startTime": "2024-07-03T15:04:34.022Z",
         "remainingSeconds": 917,
         "type": "ACTIVE",
         "currentOffsets": {
           "0": 5
         },
         "lag": {
           "0": 0
         }
       }
     ],
     "publishingTasks": [],
     "latestOffsets": {
       "0": 5
     },
     "minimumLag": {
       "0": 0
     },
     "aggregateLag": 0,
     "offsetsLastUpdated": "2024-07-03T15:49:02.588Z",
     "suspended": false,
     "healthy": true,
     "state": "RUNNING",
     "detailedState": "RUNNING",
     "recentErrors": []
   }
   ```
   
   </p>
   </details> 
   
   ### Mutual error cause
   
   Based on the sudden jumps in the offset we repeated the experiment consuming 
the topic in `isolation.level = read_uncommitted` mode in a separate process.
   
   This results in the following offsets after the respective produce calls:
   
       Produce call                                    Offset   
       (1) first message (transaction committed)       [0]
       (2) failed message (transaction aborted)        [2]
       (3) non-transactional message (no transaction)  [4]
   
   This raises the question what happened with the offsets 1 and 3 which is 
where the Kafka transaction comes in.
   These offsets are used by kafka for the **transaction markers** which signal 
if the transaction was committed or aborted.
   These messages are not intended to be processed by the consumer as a usual 
message.
   
   So actually the topic looks like this:
       
       Produce call                                    Offset    Relevant 
position for current/lastestOffsets @ produce call
       (1) first message (transaction committed)       [0]       currentOffset 
(1, 2)
       (1) [TRANSACTION COMMITTED]                     [1]       latestOffset 
(1)
       (2) failed message (transaction aborted)        [2]
       (2) [TRANSACTION ABORTED]                       [3]       latestOffset 
(2)
       (3) non-transactional message (no transaction)  [4]       currentOffset 
(3), lastOffset (3)
   
   
   Unfortunately the `KafkaSupervisor` seems to use the offsets of the 
transaction markers to determine the `latestOffsets`.
   **The transaction marker offsets will never be processed by the ingestion 
task and can therefore never be considered for the `currentOffsets`**.
   This is why we are left with a partition lag > 0 even though there aren't 
any messages left to process.
   
   This makes it impossible for the current implementation of the idle mode to 
detect that there are no more messages to process (if the `lastestOffsets` 
point to a transaction marker). The supervisor will just keep running and 
spawning tasks forever.
   
   ### Possible solution
   
   The method for determining `latestOffsets` needs to be updated so that it 
determines the latest available offset that can be consumed by the indexing 
task (especially ignoring transaction markers).
   
   Unfortunately I was not able to identify the exact code sections that would 
need to be adapted. 
   But it will very likely have something to do with 
`KafkaRecordSupplier.seekToLatest` 
([src](https://github.com/apache/druid/blob/586c713d12964ebe69ce222f9c5ebcc8e8c28403/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java#L135-L142))
 or [where it is called by the 
supervisor](https://github.com/apache/druid/blob/f290cf083a5ab045ec1a780671f7f58a5dea5652/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java#L412).
   
   ### Reproduction
   _Code examples (python) have authentication details redacted. Adapt the 
snippets according to your setup._
   
   1. Create an empty kafka topic `test-idle-mode`
   2. Insert single message in transaction -> it will have offset 0
       
       <details><summary>[`Python`] Produce message in transaction</summary>
       <p>
   
       ```python
       # Produce one message in a transaction
       import json
       from confluent_kafka import Producer
       
       producer = Producer({
           "bootstrap.servers": "<redacted>",
           ...,  # auth redacted
           "transactional.id": "idletest"
       })
       
       producer.init_transactions()
       producer.begin_transaction()
       
       producer.produce(
           topic="test-idle-mode",
           key="",        
           value=json.dumps(
               {"__time": 1577833200000, "value": 1}
           ),
           on_delivery=print,
       )
       producer.commit_transaction()
       
       producer.flush()
       producer.poll(0)
       ```
       </p>
       </details> 
   
   3. Create a KafkaSupervisor based on the topic `test-idle-mode`
   
       <details><summary>Ingestion spec</summary>
       <p>
       
       ```json
       {
         "type": "kafka",
         "spec": {
           "dataSchema": {
             "dataSource": "test-idle-mode",
             "timestampSpec": {
               "column": "__time",
               "format": "millis",
               "missingValue": null
             },
             "dimensionsSpec": {
               "dimensions": [
                 {
                   "type": "long",
                   "name": "value",
                   "multiValueHandling": "SORTED_ARRAY",
                   "createBitmapIndex": false
                 }
               ],
               "dimensionExclusions": [
                 "__time"
               ],
               "includeAllDimensions": false,
               "useSchemaDiscovery": false
             },
             "metricsSpec": [],
             "granularitySpec": {
               "type": "uniform",
               "segmentGranularity": "DAY",
               "queryGranularity": {
                 "type": "none"
               },
               "rollup": false,
               "intervals": []
             },
             "transformSpec": {
               "filter": null,
               "transforms": []
             }
           },
           "ioConfig": {
             "topic": "test-idle-mode",
             "topicPattern": null,
             "inputFormat": {
               "type": "json",
               "keepNullColumns": false,
               "assumeNewlineDelimited": false,
               "useJsonNodeReader": false
             },
             "replicas": 1,
             "taskCount": 1,
             "taskDuration": "PT3600S",
             "consumerProperties": {
               "bootstrap.servers": <redacted>,
               "druid.dynamic.config.provider": {
                 "type": "environment",
                 "variables": {
                   <auth - redacted>
                 }
               },
               "idleConfig": {
                 "enabled": true,
                 "inactiveAfterMillis": 60000
               }
             },
             "autoScalerConfig": null,
             "pollTimeout": 100,
             "startDelay": "PT5S",
             "period": "PT30S",
             "useEarliestOffset": true,
             "completionTimeout": "PT1800S",
             "lateMessageRejectionPeriod": null,
             "earlyMessageRejectionPeriod": null,
             "lateMessageRejectionStartDateTime": null,
             "configOverrides": null,
             "idleConfig": null,
             "stopTaskCount": null,
             "stream": "test-idle-mode",
             "useEarliestSequenceNumber": true
           },
           "tuningConfig": {
             "type": "kafka",
             "appendableIndexSpec": {
               "type": "onheap",
               "preserveExistingMetrics": false
             },
             "maxRowsInMemory": 150000,
             "maxBytesInMemory": 0,
             "skipBytesInMemoryOverheadCheck": false,
             "maxRowsPerSegment": 5000000,
             "maxTotalRows": null,
             "intermediatePersistPeriod": "PT10M",
             "maxPendingPersists": 0,
             "indexSpec": {
               "bitmap": {
                 "type": "roaring"
               },
               "dimensionCompression": "lz4",
               "stringDictionaryEncoding": {
                 "type": "utf8"
               },
               "metricCompression": "lz4",
               "longEncoding": "longs"
             },
             "indexSpecForIntermediatePersists": {
               "bitmap": {
                 "type": "roaring"
               },
               "dimensionCompression": "lz4",
               "stringDictionaryEncoding": {
                 "type": "utf8"
               },
               "metricCompression": "lz4",
               "longEncoding": "longs"
             },
             "reportParseExceptions": false,
             "handoffConditionTimeout": 900000,
             "resetOffsetAutomatically": false,
             "segmentWriteOutMediumFactory": null,
             "workerThreads": null,
             "chatRetries": 8,
             "httpTimeout": "PT10S",
             "shutdownTimeout": "PT80S",
             "offsetFetchPeriod": "PT30S",
             "intermediateHandoffPeriod": "P2147483647D",
             "logParseExceptions": false,
             "maxParseExceptions": 2147483647,
             "maxSavedParseExceptions": 0,
             "numPersistThreads": 1,
             "skipSequenceNumberAvailabilityCheck": false,
             "repartitionTransitionDuration": "PT120S"
           }
         },
         "context": null,
         "suspended": false
       }
       ```
       </p>
       </details> 
   
   4.  When checking the supervisor after the task started, the 
`currentOffsets` are 1 (last message offset 0 + 1 = 1)
        but `latestOffsets` is 2 instead of 1.
   
       <details><summary>[`Status`] Example status after single message with 
offset 0</summary>
       <p>
       This is the same as in section "Example".
   
       ```json
       {
         "dataSource": "test-idle-mode",
         "stream": "test-idle-mode",
         "partitions": 1,
         "replicas": 1,
         "durationSeconds": 3600,
         "activeTasks": [
           {
             "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
             "startingOffsets": {
               "0": 0
             },
             "startTime": "2024-07-03T15:04:34.022Z",
             "remainingSeconds": 3568,
             "type": "ACTIVE",
             "currentOffsets": {
               "0": 1
             },
             "lag": {
               "0": 1
             }
           }
         ],
         "publishingTasks": [],
         "latestOffsets": {
           "0": 2
         },
         "minimumLag": {
           "0": 1
         },
         "aggregateLag": 1,
         "offsetsLastUpdated": "2024-07-03T15:05:02.179Z",
         "suspended": false,
         "healthy": true,
         "state": "RUNNING",
         "detailedState": "RUNNING",
         "recentErrors": []
       }
       ```
       
       </p>
       </details> 
   
   
   5. Insert single message in transaction but abort it -> unavailable message 
will have offset 2
       
       <details><summary>[`Python`] Produce message and abort 
transaction</summary>
       <p>
   
       ```python
       # Produce one message in a transaction
       import json
       from confluent_kafka import Producer
       
       producer = Producer({
           "bootstrap.servers": "<redacted>",
           ...,  # auth redacted
           "transactional.id": "idletest"
       })
       
       producer.init_transactions()
       producer.begin_transaction()
       
       producer.produce(
           topic="test-idle-mode",
           key="",        
           value=json.dumps(
               {"__time": 1577833201000, "value": 1}
           ),
           on_delivery=print,
       )
       # Make sure message is sent to broker before aborting the transaction
       producer.flush()
       producer.poll(0)
   
       producer.abort_transaction()
       
       producer.flush()
       producer.poll(0)
       ```
       </p>
       </details> 
   
   6. The supervisor status still reports `currentOffsets == 1` but 
`latestOffsets` is now 4.
       <details><summary>[`Status`] Updated status after producing another 
message and aborting transaction</summary>
       <p>
       
       ```json
       {
         "dataSource": "test-idle-mode",
         "stream": "test-idle-mode",
         "partitions": 1,
         "replicas": 1,
         "durationSeconds": 3600,
         "activeTasks": [
           {
             "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
             "startingOffsets": {
               "0": 0
             },
             "startTime": "2024-07-03T15:04:34.022Z",
             "remainingSeconds": 1607,
             "type": "ACTIVE",
             "currentOffsets": {
               "0": 1
             },
             "lag": {
               "0": 3
             }
           }
         ],
         "publishingTasks": [],
         "latestOffsets": {
           "0": 4
         },
         "minimumLag": {
           "0": 3
         },
         "aggregateLag": 3,
         "offsetsLastUpdated": "2024-07-03T15:37:32.514Z",
         "suspended": false,
         "healthy": true,
         "state": "RUNNING",
         "detailedState": "RUNNING",
         "recentErrors": []
       }
       ```
       
       </p>
       </details> 
   
   7. Produce a non-transactional message (will have offset 4)
   
       <details><summary>[`Python`] Produce message without 
transaction</summary>
       <p>
   
       ```python
       # Produce one message without transaction
       import json
       from confluent_kafka import Producer
       
       producer = Producer({
           "bootstrap.servers": "<redacted>",
           ...,  # auth redacted
       })
   
       producer.produce(
           topic="test-idle-mode",
           key="",        
           value=json.dumps(
               {"__time": 1577833202000, "value": 1}
           ),
           on_delivery=print,
       )    
       producer.flush()
       producer.poll(0)
       ```
       </p>
       </details> 
   
   8. New supervisor status with `currentOffsets == latestOffsets == 5` (now 
finally able to enter idle mode)
       <details><summary>[`Status`] Status after producing non-transactional 
message</summary>
       <p>
   
       ```json
       {
         "dataSource": "test-idle-mode",
         "stream": "test-idle-mode",
         "partitions": 1,
         "replicas": 1,
         "durationSeconds": 3600,
         "activeTasks": [
           {
             "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
             "startingOffsets": {
               "0": 0
             },
             "startTime": "2024-07-03T15:04:34.022Z",
             "remainingSeconds": 917,
             "type": "ACTIVE",
             "currentOffsets": {
               "0": 5
             },
             "lag": {
               "0": 0
             }
           }
         ],
         "publishingTasks": [],
         "latestOffsets": {
           "0": 5
         },
         "minimumLag": {
           "0": 0
         },
         "aggregateLag": 0,
         "offsetsLastUpdated": "2024-07-03T15:49:02.588Z",
         "suspended": false,
         "healthy": true,
         "state": "RUNNING",
         "detailedState": "RUNNING",
         "recentErrors": []
       }
       ```
       </p>
       </details> 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to