Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket?

On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Hi Xiao,
>
> I think the fix for IllegalStateExcepetion is correct.
> Can you also create a ticket and submit a patch?
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
> On 3/24/15, 4:31 PM, "tao xiao" <xiaotao...@gmail.com> wrote:
>
> >Hi community,
> >
> >I wanted to know if the solution I supplied can fix the
> >IllegalMonitorStateException
> >issue. Our work is pending on this and we'd like to proceed ASAP. Sorry
> >for
> >bothering.
> >
> >On Mon, Mar 23, 2015 at 4:32 PM, tao xiao <xiaotao...@gmail.com> wrote:
> >
> >> I think I worked out the answer to question 1.
> >>java.lang.IllegalMonitorStateException
> >> was thrown due to no ownership of ReentrantLock when trying to call
> >>await()
> >> on the lock condition.
> >>
> >> Here is the code snippet from the AbstractFetcherThread.scala in trunk
> >>
> >> partitionMapLock synchronized {
> >>             partitionsWithError ++= partitionMap.keys
> >>             // there is an error occurred while fetching partitions,
> >>sleep
> >> a while
> >>             partitionMapCond.await(fetchBackOffMs,
> >>TimeUnit.MILLISECONDS)
> >> }
> >>
> >> as shown above partitionMapLock is not acquired before calling
> >> partitionMapCond.await
> >>
> >> we can fix this by explicitly calling partitionMapLock.lock(). below
> >>code
> >> block should work
> >>
> >> inLock(partitionMapLock) {
> >>             partitionsWithError ++= partitionMap.keys
> >>             // there is an error occurred while fetching partitions,
> >>sleep
> >> a while
> >>             partitionMapCond.await(fetchBackOffMs,
> >>TimeUnit.MILLISECONDS)
> >> }
> >>
> >> On Mon, Mar 23, 2015 at 1:50 PM, tao xiao <xiaotao...@gmail.com> wrote:
> >>
> >>> Hi,
> >>>
> >>> I was running a mirror maker and got
> >>>  java.lang.IllegalMonitorStateException that caused the underlying
> >>>fetcher
> >>> thread completely stopped. Here is the log from mirror maker.
> >>>
> >>> [2015-03-21 02:11:53,069] INFO Reconnect due to socket error:
> >>> java.io.EOFException: Received -1 when reading from channel, socket has
> >>> likely been closed. (kafka.consumer.SimpleConsumer)
> >>> [2015-03-21 02:11:53,081] WARN
> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in
> >>> fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId:
> >>> phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes;
> >>> RequestInfo: [test.topic,0] -> PartitionFetchInfo(3766065,1048576).
> >>> Possible cause: java.nio.channels.ClosedChannelException
> >>> (kafka.consumer.ConsumerFetcherThread)
> >>> [2015-03-21 02:11:53,083] ERROR
> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error
> >>>due to
> >>>  (kafka.consumer.ConsumerFetcherThread)
> >>> java.lang.IllegalMonitorStateException
> >>>         at
> >>>
> >>>java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.j
> >>>ava:155)
> >>>         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQu
> >>>euedSynchronizer.java:1260)
> >>>         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Abstr
> >>>actQueuedSynchronizer.java:1723)
> >>>         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:2166)
> >>>         at
> >>>
> >>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh
> >>>read.scala:106)
> >>>         at
> >>>
> >>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90
> >>>)
> >>>         at
> >>>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> >>> [2015-03-21 02:11:53,083] INFO
> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped
> >>>  (kafka.consumer.ConsumerFetcherThread)
> >>>
> >>> I am still investigating what caused the connection error on server
> >>>side
> >>> but I have a couple of questions related to mirror maker itself
> >>>
> >>> 1. What is root cause of java.lang.IllegalMonitorStateException? As
> >>>shown
> >>> in the AbstractFetcherThread source the fetcher thread should catch the
> >>> java.io.EOFException thrown from underlying simplyConsumer and sleep a
> >>> while before next run.
> >>> 2. Mirror maker is unaware of the termination of fetcher thread. That
> >>> makes it unable to detect the failure and trigger rebalancing. I have 3
> >>> mirror maker instances running in 3 different machines listening to the
> >>> same topic. I would expect the mirror maker will release the partition
> >>> ownership when underlying fetcher thread terminates so that
> >>>rebalancing can
> >>> be triggered.but in fact this is not the case. is this expected
> >>>behavior or
> >>> do I miss configure anything?
> >>>
> >>> I am running the trunk version as of commit
> >>> 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d
> >>>
> >>> --
> >>> Regards,
> >>> Tao
> >>>
> >>
> >>
> >>
> >> --
> >> Regards,
> >> Tao
> >>
> >
> >
> >
> >--
> >Regards,
> >Tao
>
>


-- 
Regards,
Tao

Reply via email to