> On July 22, 2014, 5:02 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/cluster/Partition.scala, lines 120-130 > > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line120> > > > > We may not be able to remove the readlock here. The issue is that this > > method accesses not only leaderReplicaIdOpt, but other internal data > > structures like assignedReplicaMap. Without the lock, the read from the Map > > could fail even it's being concurrently modified. > > > > In general, we can get away with the lock only if we want to read a > > single internal value. Perhaps we can introduce another function > > isLeaderLocal() that returns a boolean. This method will only need to > > access leaderReplicaIdOpt. Then all callers will first call > > leaderReplicaIfLocal and hold onto the leader replica. They can then use > > isLeaderLocal to see if the leader has changed subsequently. > > Guozhang Wang wrote: > Would this be the same as what we did now? In getReplica(localBrokerId), > if the replica map has changed and the id is no longer in the map, it will > return None; if the replica map has changed and the id is no longer the > leader, it is just the same as when we callled leaderReplicaIfLocal() to get > the leader, and then immediately the leader changed?
After thinking about it a third time, I feel this approach may still not work. The reason is keeping the replica objects in delayed producer is not safe: it could be the case that this replica object is already removed from the partition's assign replica map, and hence should be garbage collected if it is not references by the delayed produce request, and then later on a new replica object could be created with the same replica Id. When the delayed produce request is checked, it only check if the replica id matches, which is not sufficient. On the other hand, it seems harmless to me now having the read lock, what we can have are the following scenarios: 1. check leader replica id; 2. get the replica object from the assigned replica map. First case: writes before step 1 make it pass, but have not reach the map, in this case the function will return None => this is the same as writes goes after the read. Second case: writes between step 1 and 2 make reading from the map returns None => this is the same as writes goes before the read. So I think simply making the map concurrent is good enough, but I strongly agree that we may need a separate JIRA to clean up lock hierarchies after the kafka api refactoring. - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23767/#review48283 ----------------------------------------------------------- On July 28, 2014, 6:30 p.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/23767/ > ----------------------------------------------------------- > > (Updated July 28, 2014, 6:30 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1430 > https://issues.apache.org/jira/browse/KAFKA-1430 > > > Repository: kafka > > > Description > ------- > > Address Jun's comments: 1. Kept the first comment about removing readlock on > leaderReplicaIfLocal for further discussion; Kept the comment on whether > satisfying a delayed fetch immediately if on partition has an error for > further discussion 3. Rebased on KAFKA-1542 follow-up; > > > Diffs > ----- > > core/src/main/scala/kafka/api/FetchRequest.scala > 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 > core/src/main/scala/kafka/api/FetchResponse.scala > d117f10f724b09d6deef0df3a138d28fc91aa13a > core/src/main/scala/kafka/cluster/Partition.scala > 134aef9c88068443d4d465189f376dd78605b4f8 > core/src/main/scala/kafka/cluster/Replica.scala > 5e659b4a5c0256431aecc200a6b914472da9ecf3 > core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala > f8c1b4e674f7515c377c6c30d212130f1ff022dd > core/src/main/scala/kafka/consumer/SimpleConsumer.scala > 0e64632210385ef63c2ad3445b55ac4f37a63df2 > core/src/main/scala/kafka/log/Log.scala > b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 > core/src/main/scala/kafka/log/LogCleaner.scala > afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 > core/src/main/scala/kafka/log/LogSegment.scala > 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 > core/src/main/scala/kafka/server/AbstractFetcherThread.scala > 3b15254f32252cf824d7a292889ac7662d73ada1 > core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION > core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION > core/src/main/scala/kafka/server/DelayedRequestKey.scala PRE-CREATION > core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION > core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION > core/src/main/scala/kafka/server/KafkaApis.scala > fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 > core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION > core/src/main/scala/kafka/server/OffsetManager.scala > 0e22897cd1c7e45c58a61c3c468883611b19116d > core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala > PRE-CREATION > core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > 75ae1e161769a020a102009df416009bd6710f4a > core/src/main/scala/kafka/server/ReplicaManager.scala > 897783cb756de548a8b634876f729b63ffe9925e > core/src/main/scala/kafka/server/RequestPurgatory.scala > 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b > core/src/main/scala/kafka/tools/MetadataRequestProducer.scala PRE-CREATION > core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala > af4783646803e58714770c21f8c3352370f26854 > core/src/main/scala/kafka/tools/TestEndToEndLatency.scala > 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef > core/src/test/scala/other/kafka/StressTestLog.scala > 8fcd068b248688c40e73117dc119fa84cceb95b3 > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > cd16ced5465d098be7a60498326b2a98c248f343 > core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala > 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 > core/src/test/scala/unit/kafka/log/LogManagerTest.scala > 7d4c70ce651b1af45cf9bb69b974aa770de8e59d > core/src/test/scala/unit/kafka/log/LogSegmentTest.scala > 6b7603728ae5217565d68b92dd5349e7c6508f31 > core/src/test/scala/unit/kafka/log/LogTest.scala > 1da1393983d4b20330e7c7f374424edd1b26f2a3 > core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala > 6db245c956d2172cde916defdb0749081bf891fd > core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala > e532c2826c96bfa47d9a3c41b4d71dbf69541eac > core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala > 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 > core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala > 0ec120a4a953114e88c575dd6b583874371a09e3 > core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala > 4f61f8469df99e02d6ce7aad897d10e158cca8fd > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b > > Diff: https://reviews.apache.org/r/23767/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >