[ https://issues.apache.org/jira/browse/FLINK-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104766#comment-16104766 ]
ASF GitHub Bot commented on FLINK-6998: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r130058480 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java --- @@ -346,15 +346,21 @@ protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition parti // ------------------------------------------------------------------------ @Override - public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception { ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler; if (zkHandler != null) { try { // the ZK handler takes care of incrementing the offsets by 1 before committing zkHandler.prepareAndCommitOffsets(offsets); + if (commitCallback != null) { + commitCallback.onSuccess(); + } } catch (Exception e) { if (running) { + if (commitCallback != null) { --- End diff -- See my comment above. Would like to remove these null checks. > Kafka connector needs to expose metrics for failed/successful offset commits > in the Kafka Consumer callback > ----------------------------------------------------------------------------------------------------------- > > Key: FLINK-6998 > URL: https://issues.apache.org/jira/browse/FLINK-6998 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Zhenzhong Xu > Assignee: Zhenzhong Xu > > Propose to add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in > KafkaConsumerThread class. -- This message was sent by Atlassian JIRA (v6.4.14#64029)