zhangyue19921010 edited a comment on pull request #10551: URL: https://github.com/apache/druid/pull/10551#issuecomment-736260071
@himanshug Thanks for your review. > does the problem happen if `isolation.level=read_uncommitted` ? No problem will happen . When set `isolation.level` `read_uncommitted`, Druid can consume higher version Kafka un-transactionally and Druid also can consume lower version Kafka. I just change the strategy for setting `isolation.level` in `KafkaConsumerConfigs.getConsumerProperties()`. `props.put("isolation.level", customerConsumerProperties.getOrDefault("isolation.level", "read_committed"));` If users ignore this parameter, Druid will set `isolation.level` `read_committed` by default. If users set `isolation.level` `read_committed` or set `isolation.level` `read_uncommitted` in `ConsumerProperties`, Druid will set this parameter as required by users. In other words, user-property has a higher priority. **This solution makes code changes lighter, safer and no need to add new parameters. This solution also does not change current behavior of Kafka consumption and provide a way to make Druid has the ability to consume lower versions of Kafka.** All the changes have been tested. <img width="1911" alt="屏幕快照 2020-12-01 下午2 56 07" src="https://user-images.githubusercontent.com/69956021/100707471-b14be300-33e5-11eb-849a-d828ea1fdc40.png"> ``` 010 means Kafka version 0.10.2.1 241 means Kafka version 2.4.1 committed means ignore isolation.level uncommitted means set isolation.level=read_uncommitted in ConsumerProperties ``` xxx__dev__010__committed supervisor got errors as expected while others work fine. Here is the error message : ``` 2020-12-01T06:56:38,634 WARN [KafkaSupervisor-xxxx__dev__010__committed] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Exception in supervisor run loop for dataSource [xxxx__dev__010__committed] org.apache.druid.indexing.seekablestream.common.StreamException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [2,5]. The supported range is [0,1]. at org.apache.druid.indexing.kafka.KafkaRecordSupplier.wrapExceptions(KafkaRecordSupplier.java:261) ~[?:?] at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getPosition(KafkaRecordSupplier.java:158) ~[?:?] at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getLatestSequenceNumber(KafkaRecordSupplier.java:138) ~[?:?] at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getLatestSequenceNumber(KafkaRecordSupplier.java:53) ~[?:?] at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.getOffsetFromStreamForPartition(SeekableStreamSupervisor.java:3081) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT] at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.getOffsetFromStorageForPartition(SeekableStreamSupervisor.java:3031) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT] at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.generateStartingSequencesForPartitionGroup(SeekableStreamSupervisor.java:2973) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT] at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.createNewTasks(SeekableStreamSupervisor.java:2844) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT] at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.runInternal(SeekableStreamSupervisor.java:1066) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT] at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor$RunNotice.handle(SeekableStreamSupervisor.java:316) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT] at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.lambda$tryInit$3(SeekableStreamSupervisor.java:746) ~[druid-indexing-service-0.21.0-SNAPSHOT.jar:0.21.0-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_221] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_221] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_221] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [2,5]. The supported range is [0,1]. ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org