jeremy-degroot commented on code in PR #20: URL: https://github.com/apache/flink-connector-kafka/pull/20#discussion_r1160774759
########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java: ########## @@ -80,11 +80,20 @@ public KafkaPartitionSplitReader( Properties props, SourceReaderContext context, KafkaSourceReaderMetrics kafkaSourceReaderMetrics) { + this(props, context, kafkaSourceReaderMetrics, () -> null); + } + + public KafkaPartitionSplitReader( + Properties props, + SourceReaderContext context, + KafkaSourceReaderMetrics kafkaSourceReaderMetrics, + Supplier<String> rackIdSupplier) { this.subtaskId = context.getIndexOfSubtask(); this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics; Properties consumerProps = new Properties(); consumerProps.putAll(props); consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, createConsumerClientId(props)); + setConsumerClientRack(consumerProps, rackIdSupplier); Review Comment: I looked into this, and while it's certainly possible to do it that way the testing path is much more complex. Since the Supplier would have to be resolved in another Supplier<KafkaPartitionSplitReader> that's passed to the Reader, testing that the behavior is as expected in the actual execution path is difficult. In KafkaPartitionSplitReader, we can call the constructor directly and verify that the rackIdSupplier is called, and then also verify it does what we need it to by verifying the behavior of the helper method you noted further down. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org