I think I worked out the answer to question 1.
java.lang.IllegalMonitorStateException
was thrown due to no ownership of ReentrantLock when trying to call await()
on the lock condition.

Here is the code snippet from the AbstractFetcherThread.scala in trunk

partitionMapLock synchronized {
            partitionsWithError ++= partitionMap.keys
            // there is an error occurred while fetching partitions, sleep
a while
            partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}

as shown above partitionMapLock is not acquired before calling
partitionMapCond.await

we can fix this by explicitly calling partitionMapLock.lock(). below code
block should work

inLock(partitionMapLock) {
            partitionsWithError ++= partitionMap.keys
            // there is an error occurred while fetching partitions, sleep
a while
            partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}

On Mon, Mar 23, 2015 at 1:50 PM, tao xiao <xiaotao...@gmail.com> wrote:

> Hi,
>
> I was running a mirror maker and got
>  java.lang.IllegalMonitorStateException that caused the underlying fetcher
> thread completely stopped. Here is the log from mirror maker.
>
> [2015-03-21 02:11:53,069] INFO Reconnect due to socket error:
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed. (kafka.consumer.SimpleConsumer)
> [2015-03-21 02:11:53,081] WARN
> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in
> fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId:
> phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes;
> RequestInfo: [test.topic,0] -> PartitionFetchInfo(3766065,1048576).
> Possible cause: java.nio.channels.ClosedChannelException
> (kafka.consumer.ConsumerFetcherThread)
> [2015-03-21 02:11:53,083] ERROR
> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error due to
>  (kafka.consumer.ConsumerFetcherThread)
> java.lang.IllegalMonitorStateException
>         at
> java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:155)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1260)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1723)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2166)
>         at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106)
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> [2015-03-21 02:11:53,083] INFO
> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped
>  (kafka.consumer.ConsumerFetcherThread)
>
> I am still investigating what caused the connection error on server side
> but I have a couple of questions related to mirror maker itself
>
> 1. What is root cause of java.lang.IllegalMonitorStateException? As shown
> in the AbstractFetcherThread source the fetcher thread should catch the
> java.io.EOFException thrown from underlying simplyConsumer and sleep a
> while before next run.
> 2. Mirror maker is unaware of the termination of fetcher thread. That
> makes it unable to detect the failure and trigger rebalancing. I have 3
> mirror maker instances running in 3 different machines listening to the
> same topic. I would expect the mirror maker will release the partition
> ownership when underlying fetcher thread terminates so that rebalancing can
> be triggered.but in fact this is not the case. is this expected behavior or
> do I miss configure anything?
>
> I am running the trunk version as of commit
> 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d
>
> --
> Regards,
> Tao
>



-- 
Regards,
Tao

Reply via email to