[ https://issues.apache.org/jira/browse/KAFKA-2995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Manikumar resolved KAFKA-2995. ------------------------------ Resolution: Auto Closed Closing inactive issue. The old consumer is no longer supported, please upgrade to the Java consumer whenever possible. > in 0.9.0.0 Old Consumer's commitOffsets with specify partition can submit not > exists topic and partition to zk > -------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-2995 > URL: https://issues.apache.org/jira/browse/KAFKA-2995 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.0 > Reporter: Pengwei > Assignee: Neha Narkhede > > in 0.9.0.0 Version, the Old Consumer's commit interface is below: > def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, > OffsetAndMetadata], isAutoCommit: Boolean) { > trace("OffsetMap: %s".format(offsetsToCommit)) > var retriesRemaining = 1 + (if (isAutoCommit) 0 else > config.offsetsCommitMaxRetries) // no retries for commits from auto-commit > var done = false > while (!done) { > val committed = offsetsChannelLock synchronized { > // committed when we receive either no error codes or only > MetadataTooLarge errors > if (offsetsToCommit.size > 0) { > if (config.offsetsStorage == "zookeeper") { > offsetsToCommit.foreach { case (topicAndPartition, > offsetAndMetadata) => > commitOffsetToZooKeeper(topicAndPartition, > offsetAndMetadata.offset) > } > ........ > this interface does not check the parameter offsetsToCommit, if > offsetsToCommit has some topic or partition which is not exist in the kafka. > Then will create an entry in the /consumers/[group]/offsets/[Not exists > topic] directory. > We should check the offsetsToCommit's topic and partition is exists or just > check it is contain in the topicRegistry or checkpointedZkOffsets ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)