Thank you for the explanation. Patch submitted https://issues.apache.org/jira/browse/KAFKA-2048
On Wed, Mar 25, 2015 at 8:29 AM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > It should be another ticket. This is a AbstractFetcherThread issue rather > than a mirror maker issue. > > I kind of think this case you saw was a special case as it¹s not actually > a runtime error but a coding bug. Fetcher thread should not die by design. > So I don¹t think we have a way to restart fetchers without code change if > they die accidentally. One way to do this is to add liveness check in > LeaderFinderThread. But I don¹t know if this is a necessary change just > because of the case you saw. > > Jiangjie (Becket) Qin > > On 3/24/15, 5:05 PM, "tao xiao" <xiaotao...@gmail.com> wrote: > > >The other question I have is the fact that consumer client is unaware of > >the health status of underlying fetcher thread. If the fetcher thread dies > >like the case I encountered is there a way that consumer can restart the > >fetcher thread or release ownership of partitions so that other consumers > >can pick them up while fetcher thread is down. > > > >On Wed, Mar 25, 2015 at 8:00 AM, tao xiao <xiaotao...@gmail.com> wrote: > > > >> Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket? > >> > >> On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin > >><j...@linkedin.com.invalid> > >> wrote: > >> > >>> Hi Xiao, > >>> > >>> I think the fix for IllegalStateExcepetion is correct. > >>> Can you also create a ticket and submit a patch? > >>> > >>> Thanks. > >>> > >>> Jiangjie (Becket) Qin > >>> > >>> On 3/24/15, 4:31 PM, "tao xiao" <xiaotao...@gmail.com> wrote: > >>> > >>> >Hi community, > >>> > > >>> >I wanted to know if the solution I supplied can fix the > >>> >IllegalMonitorStateException > >>> >issue. Our work is pending on this and we'd like to proceed ASAP. > >>>Sorry > >>> >for > >>> >bothering. > >>> > > >>> >On Mon, Mar 23, 2015 at 4:32 PM, tao xiao <xiaotao...@gmail.com> > >>>wrote: > >>> > > >>> >> I think I worked out the answer to question 1. > >>> >>java.lang.IllegalMonitorStateException > >>> >> was thrown due to no ownership of ReentrantLock when trying to call > >>> >>await() > >>> >> on the lock condition. > >>> >> > >>> >> Here is the code snippet from the AbstractFetcherThread.scala in > >>>trunk > >>> >> > >>> >> partitionMapLock synchronized { > >>> >> partitionsWithError ++= partitionMap.keys > >>> >> // there is an error occurred while fetching partitions, > >>> >>sleep > >>> >> a while > >>> >> partitionMapCond.await(fetchBackOffMs, > >>> >>TimeUnit.MILLISECONDS) > >>> >> } > >>> >> > >>> >> as shown above partitionMapLock is not acquired before calling > >>> >> partitionMapCond.await > >>> >> > >>> >> we can fix this by explicitly calling partitionMapLock.lock(). below > >>> >>code > >>> >> block should work > >>> >> > >>> >> inLock(partitionMapLock) { > >>> >> partitionsWithError ++= partitionMap.keys > >>> >> // there is an error occurred while fetching partitions, > >>> >>sleep > >>> >> a while > >>> >> partitionMapCond.await(fetchBackOffMs, > >>> >>TimeUnit.MILLISECONDS) > >>> >> } > >>> >> > >>> >> On Mon, Mar 23, 2015 at 1:50 PM, tao xiao <xiaotao...@gmail.com> > >>> wrote: > >>> >> > >>> >>> Hi, > >>> >>> > >>> >>> I was running a mirror maker and got > >>> >>> java.lang.IllegalMonitorStateException that caused the underlying > >>> >>>fetcher > >>> >>> thread completely stopped. Here is the log from mirror maker. > >>> >>> > >>> >>> [2015-03-21 02:11:53,069] INFO Reconnect due to socket error: > >>> >>> java.io.EOFException: Received -1 when reading from channel, socket > >>> has > >>> >>> likely been closed. (kafka.consumer.SimpleConsumer) > >>> >>> [2015-03-21 02:11:53,081] WARN > >>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], > >>>Error in > >>> >>> fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; > >>> ClientId: > >>> >>> phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 > >>>bytes; > >>> >>> RequestInfo: [test.topic,0] -> PartitionFetchInfo(3766065,1048576). > >>> >>> Possible cause: java.nio.channels.ClosedChannelException > >>> >>> (kafka.consumer.ConsumerFetcherThread) > >>> >>> [2015-03-21 02:11:53,083] ERROR > >>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error > >>> >>>due to > >>> >>> (kafka.consumer.ConsumerFetcherThread) > >>> >>> java.lang.IllegalMonitorStateException > >>> >>> at > >>> >>> > >>> > >>> > >>>>>>java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLoc > >>>>>>k.j > >>> >>>ava:155) > >>> >>> at > >>> >>> > >>> > >>> > >>>>>>java.util.concurrent.locks.AbstractQueuedSynchronizer.release(Abstrac > >>>>>>tQu > >>> >>>euedSynchronizer.java:1260) > >>> >>> at > >>> >>> > >>> > >>> > >>>>>>java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Ab > >>>>>>str > >>> >>>actQueuedSynchronizer.java:1723) > >>> >>> at > >>> >>> > >>> > >>> > >>>>>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject > >>>>>>.aw > >>> >>>ait(AbstractQueuedSynchronizer.java:2166) > >>> >>> at > >>> >>> > >>> > >>> > >>>>>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetche > >>>>>>rTh > >>> >>>read.scala:106) > >>> >>> at > >>> >>> > >>> > >>> > >>>>>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala > >>>>>>:90 > >>> >>>) > >>> >>> at > >>> >>>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > >>> >>> [2015-03-21 02:11:53,083] INFO > >>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], > >>>Stopped > >>> >>> (kafka.consumer.ConsumerFetcherThread) > >>> >>> > >>> >>> I am still investigating what caused the connection error on server > >>> >>>side > >>> >>> but I have a couple of questions related to mirror maker itself > >>> >>> > >>> >>> 1. What is root cause of java.lang.IllegalMonitorStateException? As > >>> >>>shown > >>> >>> in the AbstractFetcherThread source the fetcher thread should catch > >>> the > >>> >>> java.io.EOFException thrown from underlying simplyConsumer and > >>>sleep a > >>> >>> while before next run. > >>> >>> 2. Mirror maker is unaware of the termination of fetcher thread. > >>>That > >>> >>> makes it unable to detect the failure and trigger rebalancing. I > >>>have > >>> 3 > >>> >>> mirror maker instances running in 3 different machines listening to > >>> the > >>> >>> same topic. I would expect the mirror maker will release the > >>>partition > >>> >>> ownership when underlying fetcher thread terminates so that > >>> >>>rebalancing can > >>> >>> be triggered.but in fact this is not the case. is this expected > >>> >>>behavior or > >>> >>> do I miss configure anything? > >>> >>> > >>> >>> I am running the trunk version as of commit > >>> >>> 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d > >>> >>> > >>> >>> -- > >>> >>> Regards, > >>> >>> Tao > >>> >>> > >>> >> > >>> >> > >>> >> > >>> >> -- > >>> >> Regards, > >>> >> Tao > >>> >> > >>> > > >>> > > >>> > > >>> >-- > >>> >Regards, > >>> >Tao > >>> > >>> > >> > >> > >> -- > >> Regards, > >> Tao > >> > > > > > > > >-- > >Regards, > >Tao > > -- Regards, Tao