It should be another ticket. This is a AbstractFetcherThread issue rather
than a mirror maker issue.

I kind of think this case you saw was a special case as it¹s not actually
a runtime error but a coding bug. Fetcher thread should not die by design.
So I don¹t think we have a way to restart fetchers without code change if
they die accidentally. One way to do this is to add liveness check in
LeaderFinderThread. But I don¹t know if this is a necessary change just
because of the case you saw.

Jiangjie (Becket) Qin

On 3/24/15, 5:05 PM, "tao xiao" <xiaotao...@gmail.com> wrote:

>The other question I have is the fact that consumer client is unaware of
>the health status of underlying fetcher thread. If the fetcher thread dies
>like the case I encountered is there a way that consumer can restart the
>fetcher thread or release ownership of partitions so that other consumers
>can pick them up while fetcher thread is down.
>
>On Wed, Mar 25, 2015 at 8:00 AM, tao xiao <xiaotao...@gmail.com> wrote:
>
>> Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket?
>>
>> On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin
>><j...@linkedin.com.invalid>
>> wrote:
>>
>>> Hi Xiao,
>>>
>>> I think the fix for IllegalStateExcepetion is correct.
>>> Can you also create a ticket and submit a patch?
>>>
>>> Thanks.
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On 3/24/15, 4:31 PM, "tao xiao" <xiaotao...@gmail.com> wrote:
>>>
>>> >Hi community,
>>> >
>>> >I wanted to know if the solution I supplied can fix the
>>> >IllegalMonitorStateException
>>> >issue. Our work is pending on this and we'd like to proceed ASAP.
>>>Sorry
>>> >for
>>> >bothering.
>>> >
>>> >On Mon, Mar 23, 2015 at 4:32 PM, tao xiao <xiaotao...@gmail.com>
>>>wrote:
>>> >
>>> >> I think I worked out the answer to question 1.
>>> >>java.lang.IllegalMonitorStateException
>>> >> was thrown due to no ownership of ReentrantLock when trying to call
>>> >>await()
>>> >> on the lock condition.
>>> >>
>>> >> Here is the code snippet from the AbstractFetcherThread.scala in
>>>trunk
>>> >>
>>> >> partitionMapLock synchronized {
>>> >>             partitionsWithError ++= partitionMap.keys
>>> >>             // there is an error occurred while fetching partitions,
>>> >>sleep
>>> >> a while
>>> >>             partitionMapCond.await(fetchBackOffMs,
>>> >>TimeUnit.MILLISECONDS)
>>> >> }
>>> >>
>>> >> as shown above partitionMapLock is not acquired before calling
>>> >> partitionMapCond.await
>>> >>
>>> >> we can fix this by explicitly calling partitionMapLock.lock(). below
>>> >>code
>>> >> block should work
>>> >>
>>> >> inLock(partitionMapLock) {
>>> >>             partitionsWithError ++= partitionMap.keys
>>> >>             // there is an error occurred while fetching partitions,
>>> >>sleep
>>> >> a while
>>> >>             partitionMapCond.await(fetchBackOffMs,
>>> >>TimeUnit.MILLISECONDS)
>>> >> }
>>> >>
>>> >> On Mon, Mar 23, 2015 at 1:50 PM, tao xiao <xiaotao...@gmail.com>
>>> wrote:
>>> >>
>>> >>> Hi,
>>> >>>
>>> >>> I was running a mirror maker and got
>>> >>>  java.lang.IllegalMonitorStateException that caused the underlying
>>> >>>fetcher
>>> >>> thread completely stopped. Here is the log from mirror maker.
>>> >>>
>>> >>> [2015-03-21 02:11:53,069] INFO Reconnect due to socket error:
>>> >>> java.io.EOFException: Received -1 when reading from channel, socket
>>> has
>>> >>> likely been closed. (kafka.consumer.SimpleConsumer)
>>> >>> [2015-03-21 02:11:53,081] WARN
>>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6],
>>>Error in
>>> >>> fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588;
>>> ClientId:
>>> >>> phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1
>>>bytes;
>>> >>> RequestInfo: [test.topic,0] -> PartitionFetchInfo(3766065,1048576).
>>> >>> Possible cause: java.nio.channels.ClosedChannelException
>>> >>> (kafka.consumer.ConsumerFetcherThread)
>>> >>> [2015-03-21 02:11:53,083] ERROR
>>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error
>>> >>>due to
>>> >>>  (kafka.consumer.ConsumerFetcherThread)
>>> >>> java.lang.IllegalMonitorStateException
>>> >>>         at
>>> >>>
>>>
>>> 
>>>>>>java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLoc
>>>>>>k.j
>>> >>>ava:155)
>>> >>>         at
>>> >>>
>>>
>>> 
>>>>>>java.util.concurrent.locks.AbstractQueuedSynchronizer.release(Abstrac
>>>>>>tQu
>>> >>>euedSynchronizer.java:1260)
>>> >>>         at
>>> >>>
>>>
>>> 
>>>>>>java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Ab
>>>>>>str
>>> >>>actQueuedSynchronizer.java:1723)
>>> >>>         at
>>> >>>
>>>
>>> 
>>>>>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject
>>>>>>.aw
>>> >>>ait(AbstractQueuedSynchronizer.java:2166)
>>> >>>         at
>>> >>>
>>>
>>> 
>>>>>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetche
>>>>>>rTh
>>> >>>read.scala:106)
>>> >>>         at
>>> >>>
>>>
>>> 
>>>>>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala
>>>>>>:90
>>> >>>)
>>> >>>         at
>>> >>>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>>> >>> [2015-03-21 02:11:53,083] INFO
>>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6],
>>>Stopped
>>> >>>  (kafka.consumer.ConsumerFetcherThread)
>>> >>>
>>> >>> I am still investigating what caused the connection error on server
>>> >>>side
>>> >>> but I have a couple of questions related to mirror maker itself
>>> >>>
>>> >>> 1. What is root cause of java.lang.IllegalMonitorStateException? As
>>> >>>shown
>>> >>> in the AbstractFetcherThread source the fetcher thread should catch
>>> the
>>> >>> java.io.EOFException thrown from underlying simplyConsumer and
>>>sleep a
>>> >>> while before next run.
>>> >>> 2. Mirror maker is unaware of the termination of fetcher thread.
>>>That
>>> >>> makes it unable to detect the failure and trigger rebalancing. I
>>>have
>>> 3
>>> >>> mirror maker instances running in 3 different machines listening to
>>> the
>>> >>> same topic. I would expect the mirror maker will release the
>>>partition
>>> >>> ownership when underlying fetcher thread terminates so that
>>> >>>rebalancing can
>>> >>> be triggered.but in fact this is not the case. is this expected
>>> >>>behavior or
>>> >>> do I miss configure anything?
>>> >>>
>>> >>> I am running the trunk version as of commit
>>> >>> 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d
>>> >>>
>>> >>> --
>>> >>> Regards,
>>> >>> Tao
>>> >>>
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> Regards,
>>> >> Tao
>>> >>
>>> >
>>> >
>>> >
>>> >--
>>> >Regards,
>>> >Tao
>>>
>>>
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
>
>-- 
>Regards,
>Tao

Reply via email to