zhangyue19921010 edited a comment on pull request #10551:
URL: https://github.com/apache/druid/pull/10551#issuecomment-736260071


   @himanshug Thanks for your review. 
   > does the problem happen if `isolation.level=read_uncommitted` ?
   
   No problem will happen . When set `isolation.level` `read_uncommitted`, 
Druid can consume higher version Kafka un-transactionally and Druid also can 
consume lower version Kafka.
   
   I just change the strategy for setting `isolation.level` in 
`KafkaConsumerConfigs.getConsumerProperties()`. 
   
   `props.put("isolation.level", 
customerConsumerProperties.getOrDefault("isolation.level", "read_committed"));`
   
   If users ignore this parameter, Druid will set `isolation.level` 
`read_committed` by default.
   If users set `isolation.level` `read_committed` or set `isolation.level` 
`read_uncommitted` in `ConsumerProperties`, Druid will set this parameter as 
required by users. In other words, user-property has a higher priority.
   
   **This solution makes code changes lighter, safer and no need to add new 
parameters. This solution also does not change current behavior of Kafka 
consumption and provide a way to make Druid has the ability to consume lower 
versions of Kafka.**
   
   All the changes have been tested.
   <img width="1911" alt="屏幕快照 2020-12-01 下午2 56 07" 
src="https://user-images.githubusercontent.com/69956021/100707471-b14be300-33e5-11eb-849a-d828ea1fdc40.png";>
   ```
   010 means Kafka version 0.10.2.1
   241 means Kafka version 2.4.1
   committed means ignore isolation.level
   uncommitted means set isolation.level=read_uncommitted in ConsumerProperties
   ```
   xxx__dev__010__committed supervisor got errors as expected while others work 
fine. Here is the error message :
   ```
   2020-12-01T06:56:38,634 WARN [KafkaSupervisor-xxxx__dev__010__committed] 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - 
Exception in supervisor run loop for dataSource [xxxx__dev__010__committed]
   org.apache.druid.indexing.seekablestream.common.StreamException: 
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not 
support LIST_OFFSETS with version in range [2,5]. The supported range is [0,1].
        at 
org.apache.druid.indexing.kafka.KafkaRecordSupplier.wrapExceptions(KafkaRecordSupplier.java:261)
 ~[?:?]
        at 
org.apache.druid.indexing.kafka.KafkaRecordSupplier.getPosition(KafkaRecordSupplier.java:158)
 ~[?:?]
        at 
org.apache.druid.indexing.kafka.KafkaRecordSupplier.getLatestSequenceNumber(KafkaRecordSupplier.java:138)
 ~[?:?]
        at 
org.apache.druid.indexing.kafka.KafkaRecordSupplier.getLatestSequenceNumber(KafkaRecordSupplier.java:53)
 ~[?:?]
        at 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.getOffsetFromStreamForPartition(SeekableStreamSupervisor.java:3081)
 ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
        at 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.getOffsetFromStorageForPartition(SeekableStreamSupervisor.java:3031)
 ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
        at 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.generateStartingSequencesForPartitionGroup(SeekableStreamSupervisor.java:2973)
 ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
        at 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.createNewTasks(SeekableStreamSupervisor.java:2844)
 ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
        at 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.runInternal(SeekableStreamSupervisor.java:1066)
 ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
        at 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor$RunNotice.handle(SeekableStreamSupervisor.java:316)
 ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
        at 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.lambda$tryInit$3(SeekableStreamSupervisor.java:746)
 ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_221]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_221]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_221]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_221]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
   Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The 
broker does not support LIST_OFFSETS with version in range [2,5]. The supported 
range is [0,1].
   ```
    
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to