[
https://issues.apache.org/jira/browse/KAFKA-640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Swapnil Ghike updated KAFKA-640:
--------------------------------
Attachment: kafka-640.patch
This happened because ClientId.validate(clientId) in SimpleConsumer did not
validate "." in the clientId passed from ReplicaFetcherThread.
This patch fixes another bug - AbstractFetcherThread would create
SimpleConsumer and pass "%s-host_%s-port_%s" as the clientId to the
SimpleConsumer. SimpleConsumer would append the host-port string again to the
clientId while instantiating FetchRequestAndResponseStats.
Changes:
- The fix is to not include the host-port string while initializing
AbstractFetcherThread.clientId in ReplicaFetcherThread, the host and port are
already passed through the sourceBroker argument. This new clientId is passed
to SimpleConsumer in AbstractFetcherThread and it should validate successfully.
- Pass clientId + host-port string while instantiating *Stats in
AbstractFetcherThread and SimpleConsumer. Since the host-port string gives
information about the client, I think it should be ok to append it to clientId
and not create a separate case class like ClientIdAndTopic.
- Pass clientId + host-port string while instantiating FetchRequestBuilder in
AbstractFetcherThread.
- Pass clientId + host-port string to the constructors of ProducerRequestStats,
FetchRequestAndResponseStats, FetcherStats and FetcherLagStats to maintain
uniformity with passing clientId + host-port to FetchRequestBuilder in
AbstractFetcherThread.doWork().
- Removed a line in ClientIdTest.scala, it was redundant.
The validation criteria for clientId string that comes from the client is
unchanged. Ideally I would like to validate the clientId that *includes* the
host-port string, but that would require an introduction of '.' in the legal
characters set which would be inconsistent with legal chars for Topic. Instead,
we can maintain the same legal chars set and take care that the host-port
string doesn't change format within the code.
> System Test Failures : kafka.common.InvalidClientIdException in broker log4j
> messages
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-640
> URL: https://issues.apache.org/jira/browse/KAFKA-640
> Project: Kafka
> Issue Type: Bug
> Reporter: John Fung
> Labels: replication-testing
> Attachments: kafka-640.patch
>
>
> * To reproduce the issue, download and build the latest Kafka 0.8 branch and
> execute this command: "<kafka_home>/system_test $ python -B
> system_test_runner.py"
> * The following exception is found in the broker log4j messages in most
> System Test cases:
> [2012-11-29 09:06:21,322] WARN No previously checkpointed highwatermark value
> found for topic test_1 partition 1. Returning 0 as the highwatermark
> (kafka.server.HighwaterMarkCheckpoint)
> [2012-11-29 09:06:21,326] INFO [Kafka Log on Broker 1], Truncated log segment
> /tmp/kafka_server_1_logs/test_1-1/00000000000000000000.log to target offset 0
> (kafka.log.Log)
> [2012-11-29 09:06:21,333] ERROR Replica Manager on Broker 1: Error processing
> leaderAndISR request LeaderAndIsrRequest(1,,1000,Map((test_1,1) ->
> PartitionStateInfo(LeaderIsrAndControllerEpoch({
> "ISR":"2,3,1","leader":"2","leaderEpoch":"0" },1),3), (test_1,0) ->
> PartitionStateInfo(LeaderIsrAndControllerEpoch({
> "ISR":"1,2,3","leader":"1","leaderEpoch":"0"
> },1),3)),Set(id:2,creatorId:127.0.0.1-1354208764997,host:127.0.0.1,port:9092,
> id:1,creatorId:127.0.0.1-1354208760105,host:127.0.0.1,port:9091),1)
> (kafka.server.ReplicaManager)
> kafka.common.InvalidClientIdException: ClientId
> replica-fetcher-host_127.0.0.1-port_9092 is illegal, contains a character
> other than ASCII alphanumerics, _ and -
> at kafka.utils.ClientId$.validate(ClientIdAndTopic.scala:36)
> at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:81)
> at
> kafka.server.AbstractFetcherThread.<init>(AbstractFetcherThread.scala:44)
> at
> kafka.server.ReplicaFetcherThread.<init>(ReplicaFetcherThread.scala:26)
> at
> kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:26)
> at
> kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:44)
> at kafka.cluster.Partition.makeFollower(Partition.scala:190)
> at
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$makeFollower(ReplicaManager.scala:236)
> at
> kafka.server.ReplicaManager$$anonfun$becomeLeaderOrFollower$3.apply(ReplicaManager.scala:201)
> at
> kafka.server.ReplicaManager$$anonfun$becomeLeaderOrFollower$3.apply(ReplicaManager.scala:191)
> at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
> at
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:191)
> at
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:129)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
> at java.lang.Thread.run(Thread.java:662)
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira