Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket? On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin <j...@linkedin.com.invalid> wrote:
> Hi Xiao, > > I think the fix for IllegalStateExcepetion is correct. > Can you also create a ticket and submit a patch? > > Thanks. > > Jiangjie (Becket) Qin > > On 3/24/15, 4:31 PM, "tao xiao" <xiaotao...@gmail.com> wrote: > > >Hi community, > > > >I wanted to know if the solution I supplied can fix the > >IllegalMonitorStateException > >issue. Our work is pending on this and we'd like to proceed ASAP. Sorry > >for > >bothering. > > > >On Mon, Mar 23, 2015 at 4:32 PM, tao xiao <xiaotao...@gmail.com> wrote: > > > >> I think I worked out the answer to question 1. > >>java.lang.IllegalMonitorStateException > >> was thrown due to no ownership of ReentrantLock when trying to call > >>await() > >> on the lock condition. > >> > >> Here is the code snippet from the AbstractFetcherThread.scala in trunk > >> > >> partitionMapLock synchronized { > >> partitionsWithError ++= partitionMap.keys > >> // there is an error occurred while fetching partitions, > >>sleep > >> a while > >> partitionMapCond.await(fetchBackOffMs, > >>TimeUnit.MILLISECONDS) > >> } > >> > >> as shown above partitionMapLock is not acquired before calling > >> partitionMapCond.await > >> > >> we can fix this by explicitly calling partitionMapLock.lock(). below > >>code > >> block should work > >> > >> inLock(partitionMapLock) { > >> partitionsWithError ++= partitionMap.keys > >> // there is an error occurred while fetching partitions, > >>sleep > >> a while > >> partitionMapCond.await(fetchBackOffMs, > >>TimeUnit.MILLISECONDS) > >> } > >> > >> On Mon, Mar 23, 2015 at 1:50 PM, tao xiao <xiaotao...@gmail.com> wrote: > >> > >>> Hi, > >>> > >>> I was running a mirror maker and got > >>> java.lang.IllegalMonitorStateException that caused the underlying > >>>fetcher > >>> thread completely stopped. Here is the log from mirror maker. > >>> > >>> [2015-03-21 02:11:53,069] INFO Reconnect due to socket error: > >>> java.io.EOFException: Received -1 when reading from channel, socket has > >>> likely been closed. (kafka.consumer.SimpleConsumer) > >>> [2015-03-21 02:11:53,081] WARN > >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in > >>> fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId: > >>> phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; > >>> RequestInfo: [test.topic,0] -> PartitionFetchInfo(3766065,1048576). > >>> Possible cause: java.nio.channels.ClosedChannelException > >>> (kafka.consumer.ConsumerFetcherThread) > >>> [2015-03-21 02:11:53,083] ERROR > >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error > >>>due to > >>> (kafka.consumer.ConsumerFetcherThread) > >>> java.lang.IllegalMonitorStateException > >>> at > >>> > >>>java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.j > >>>ava:155) > >>> at > >>> > >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQu > >>>euedSynchronizer.java:1260) > >>> at > >>> > >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Abstr > >>>actQueuedSynchronizer.java:1723) > >>> at > >>> > >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw > >>>ait(AbstractQueuedSynchronizer.java:2166) > >>> at > >>> > >>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh > >>>read.scala:106) > >>> at > >>> > >>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90 > >>>) > >>> at > >>>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > >>> [2015-03-21 02:11:53,083] INFO > >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped > >>> (kafka.consumer.ConsumerFetcherThread) > >>> > >>> I am still investigating what caused the connection error on server > >>>side > >>> but I have a couple of questions related to mirror maker itself > >>> > >>> 1. What is root cause of java.lang.IllegalMonitorStateException? As > >>>shown > >>> in the AbstractFetcherThread source the fetcher thread should catch the > >>> java.io.EOFException thrown from underlying simplyConsumer and sleep a > >>> while before next run. > >>> 2. Mirror maker is unaware of the termination of fetcher thread. That > >>> makes it unable to detect the failure and trigger rebalancing. I have 3 > >>> mirror maker instances running in 3 different machines listening to the > >>> same topic. I would expect the mirror maker will release the partition > >>> ownership when underlying fetcher thread terminates so that > >>>rebalancing can > >>> be triggered.but in fact this is not the case. is this expected > >>>behavior or > >>> do I miss configure anything? > >>> > >>> I am running the trunk version as of commit > >>> 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d > >>> > >>> -- > >>> Regards, > >>> Tao > >>> > >> > >> > >> > >> -- > >> Regards, > >> Tao > >> > > > > > > > >-- > >Regards, > >Tao > > -- Regards, Tao