Pengwei created KAFKA-2995:
------------------------------
Summary: in 0.9.0.0 Old Consumer's commitOffsets with specify
partition can submit not exists topic and partition to zk
Key: KAFKA-2995
URL: https://issues.apache.org/jira/browse/KAFKA-2995
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 0.9.0.0
Reporter: Pengwei
Assignee: Neha Narkhede
Fix For: 0.9.1.0
in 0.9.0.0 Version, the Old Consumer's commit interface is below:
def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition,
OffsetAndMetadata], isAutoCommit: Boolean) {
trace("OffsetMap: %s".format(offsetsToCommit))
var retriesRemaining = 1 + (if (isAutoCommit) 0 else
config.offsetsCommitMaxRetries) // no retries for commits from auto-commit
var done = false
while (!done) {
val committed = offsetsChannelLock synchronized {
// committed when we receive either no error codes or only
MetadataTooLarge errors
if (offsetsToCommit.size > 0) {
if (config.offsetsStorage == "zookeeper") {
offsetsToCommit.foreach { case (topicAndPartition,
offsetAndMetadata) =>
commitOffsetToZooKeeper(topicAndPartition,
offsetAndMetadata.offset)
}
........
this interface does not check the parameter offsetsToCommit, if offsetsToCommit
has some topic or partition which is not exist in the kafka. Then will create
an entry in the /consumers/[group]/offsets/[Not exists topic] directory.
We should check the offsetsToCommit's topic and partition is exists or just
check it is contain in the topicRegistry or checkpointedZkOffsets ?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)