It should be another ticket. This is a AbstractFetcherThread issue rather than a mirror maker issue.
I kind of think this case you saw was a special case as it¹s not actually a runtime error but a coding bug. Fetcher thread should not die by design. So I don¹t think we have a way to restart fetchers without code change if they die accidentally. One way to do this is to add liveness check in LeaderFinderThread. But I don¹t know if this is a necessary change just because of the case you saw. Jiangjie (Becket) Qin On 3/24/15, 5:05 PM, "tao xiao" <xiaotao...@gmail.com> wrote: >The other question I have is the fact that consumer client is unaware of >the health status of underlying fetcher thread. If the fetcher thread dies >like the case I encountered is there a way that consumer can restart the >fetcher thread or release ownership of partitions so that other consumers >can pick them up while fetcher thread is down. > >On Wed, Mar 25, 2015 at 8:00 AM, tao xiao <xiaotao...@gmail.com> wrote: > >> Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket? >> >> On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin >><j...@linkedin.com.invalid> >> wrote: >> >>> Hi Xiao, >>> >>> I think the fix for IllegalStateExcepetion is correct. >>> Can you also create a ticket and submit a patch? >>> >>> Thanks. >>> >>> Jiangjie (Becket) Qin >>> >>> On 3/24/15, 4:31 PM, "tao xiao" <xiaotao...@gmail.com> wrote: >>> >>> >Hi community, >>> > >>> >I wanted to know if the solution I supplied can fix the >>> >IllegalMonitorStateException >>> >issue. Our work is pending on this and we'd like to proceed ASAP. >>>Sorry >>> >for >>> >bothering. >>> > >>> >On Mon, Mar 23, 2015 at 4:32 PM, tao xiao <xiaotao...@gmail.com> >>>wrote: >>> > >>> >> I think I worked out the answer to question 1. >>> >>java.lang.IllegalMonitorStateException >>> >> was thrown due to no ownership of ReentrantLock when trying to call >>> >>await() >>> >> on the lock condition. >>> >> >>> >> Here is the code snippet from the AbstractFetcherThread.scala in >>>trunk >>> >> >>> >> partitionMapLock synchronized { >>> >> partitionsWithError ++= partitionMap.keys >>> >> // there is an error occurred while fetching partitions, >>> >>sleep >>> >> a while >>> >> partitionMapCond.await(fetchBackOffMs, >>> >>TimeUnit.MILLISECONDS) >>> >> } >>> >> >>> >> as shown above partitionMapLock is not acquired before calling >>> >> partitionMapCond.await >>> >> >>> >> we can fix this by explicitly calling partitionMapLock.lock(). below >>> >>code >>> >> block should work >>> >> >>> >> inLock(partitionMapLock) { >>> >> partitionsWithError ++= partitionMap.keys >>> >> // there is an error occurred while fetching partitions, >>> >>sleep >>> >> a while >>> >> partitionMapCond.await(fetchBackOffMs, >>> >>TimeUnit.MILLISECONDS) >>> >> } >>> >> >>> >> On Mon, Mar 23, 2015 at 1:50 PM, tao xiao <xiaotao...@gmail.com> >>> wrote: >>> >> >>> >>> Hi, >>> >>> >>> >>> I was running a mirror maker and got >>> >>> java.lang.IllegalMonitorStateException that caused the underlying >>> >>>fetcher >>> >>> thread completely stopped. Here is the log from mirror maker. >>> >>> >>> >>> [2015-03-21 02:11:53,069] INFO Reconnect due to socket error: >>> >>> java.io.EOFException: Received -1 when reading from channel, socket >>> has >>> >>> likely been closed. (kafka.consumer.SimpleConsumer) >>> >>> [2015-03-21 02:11:53,081] WARN >>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], >>>Error in >>> >>> fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; >>> ClientId: >>> >>> phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 >>>bytes; >>> >>> RequestInfo: [test.topic,0] -> PartitionFetchInfo(3766065,1048576). >>> >>> Possible cause: java.nio.channels.ClosedChannelException >>> >>> (kafka.consumer.ConsumerFetcherThread) >>> >>> [2015-03-21 02:11:53,083] ERROR >>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error >>> >>>due to >>> >>> (kafka.consumer.ConsumerFetcherThread) >>> >>> java.lang.IllegalMonitorStateException >>> >>> at >>> >>> >>> >>> >>>>>>java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLoc >>>>>>k.j >>> >>>ava:155) >>> >>> at >>> >>> >>> >>> >>>>>>java.util.concurrent.locks.AbstractQueuedSynchronizer.release(Abstrac >>>>>>tQu >>> >>>euedSynchronizer.java:1260) >>> >>> at >>> >>> >>> >>> >>>>>>java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Ab >>>>>>str >>> >>>actQueuedSynchronizer.java:1723) >>> >>> at >>> >>> >>> >>> >>>>>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject >>>>>>.aw >>> >>>ait(AbstractQueuedSynchronizer.java:2166) >>> >>> at >>> >>> >>> >>> >>>>>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetche >>>>>>rTh >>> >>>read.scala:106) >>> >>> at >>> >>> >>> >>> >>>>>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala >>>>>>:90 >>> >>>) >>> >>> at >>> >>>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) >>> >>> [2015-03-21 02:11:53,083] INFO >>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], >>>Stopped >>> >>> (kafka.consumer.ConsumerFetcherThread) >>> >>> >>> >>> I am still investigating what caused the connection error on server >>> >>>side >>> >>> but I have a couple of questions related to mirror maker itself >>> >>> >>> >>> 1. What is root cause of java.lang.IllegalMonitorStateException? As >>> >>>shown >>> >>> in the AbstractFetcherThread source the fetcher thread should catch >>> the >>> >>> java.io.EOFException thrown from underlying simplyConsumer and >>>sleep a >>> >>> while before next run. >>> >>> 2. Mirror maker is unaware of the termination of fetcher thread. >>>That >>> >>> makes it unable to detect the failure and trigger rebalancing. I >>>have >>> 3 >>> >>> mirror maker instances running in 3 different machines listening to >>> the >>> >>> same topic. I would expect the mirror maker will release the >>>partition >>> >>> ownership when underlying fetcher thread terminates so that >>> >>>rebalancing can >>> >>> be triggered.but in fact this is not the case. is this expected >>> >>>behavior or >>> >>> do I miss configure anything? >>> >>> >>> >>> I am running the trunk version as of commit >>> >>> 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d >>> >>> >>> >>> -- >>> >>> Regards, >>> >>> Tao >>> >>> >>> >> >>> >> >>> >> >>> >> -- >>> >> Regards, >>> >> Tao >>> >> >>> > >>> > >>> > >>> >-- >>> >Regards, >>> >Tao >>> >>> >> >> >> -- >> Regards, >> Tao >> > > > >-- >Regards, >Tao