The other question I have is the fact that consumer client is unaware of
the health status of underlying fetcher thread. If the fetcher thread dies
like the case I encountered is there a way that consumer can restart the
fetcher thread or release ownership of partitions so that other consumers
can pick them up while fetcher thread is down.

On Wed, Mar 25, 2015 at 8:00 AM, tao xiao <xiaotao...@gmail.com> wrote:

> Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket?
>
> On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin <j...@linkedin.com.invalid>
> wrote:
>
>> Hi Xiao,
>>
>> I think the fix for IllegalStateExcepetion is correct.
>> Can you also create a ticket and submit a patch?
>>
>> Thanks.
>>
>> Jiangjie (Becket) Qin
>>
>> On 3/24/15, 4:31 PM, "tao xiao" <xiaotao...@gmail.com> wrote:
>>
>> >Hi community,
>> >
>> >I wanted to know if the solution I supplied can fix the
>> >IllegalMonitorStateException
>> >issue. Our work is pending on this and we'd like to proceed ASAP. Sorry
>> >for
>> >bothering.
>> >
>> >On Mon, Mar 23, 2015 at 4:32 PM, tao xiao <xiaotao...@gmail.com> wrote:
>> >
>> >> I think I worked out the answer to question 1.
>> >>java.lang.IllegalMonitorStateException
>> >> was thrown due to no ownership of ReentrantLock when trying to call
>> >>await()
>> >> on the lock condition.
>> >>
>> >> Here is the code snippet from the AbstractFetcherThread.scala in trunk
>> >>
>> >> partitionMapLock synchronized {
>> >>             partitionsWithError ++= partitionMap.keys
>> >>             // there is an error occurred while fetching partitions,
>> >>sleep
>> >> a while
>> >>             partitionMapCond.await(fetchBackOffMs,
>> >>TimeUnit.MILLISECONDS)
>> >> }
>> >>
>> >> as shown above partitionMapLock is not acquired before calling
>> >> partitionMapCond.await
>> >>
>> >> we can fix this by explicitly calling partitionMapLock.lock(). below
>> >>code
>> >> block should work
>> >>
>> >> inLock(partitionMapLock) {
>> >>             partitionsWithError ++= partitionMap.keys
>> >>             // there is an error occurred while fetching partitions,
>> >>sleep
>> >> a while
>> >>             partitionMapCond.await(fetchBackOffMs,
>> >>TimeUnit.MILLISECONDS)
>> >> }
>> >>
>> >> On Mon, Mar 23, 2015 at 1:50 PM, tao xiao <xiaotao...@gmail.com>
>> wrote:
>> >>
>> >>> Hi,
>> >>>
>> >>> I was running a mirror maker and got
>> >>>  java.lang.IllegalMonitorStateException that caused the underlying
>> >>>fetcher
>> >>> thread completely stopped. Here is the log from mirror maker.
>> >>>
>> >>> [2015-03-21 02:11:53,069] INFO Reconnect due to socket error:
>> >>> java.io.EOFException: Received -1 when reading from channel, socket
>> has
>> >>> likely been closed. (kafka.consumer.SimpleConsumer)
>> >>> [2015-03-21 02:11:53,081] WARN
>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in
>> >>> fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588;
>> ClientId:
>> >>> phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes;
>> >>> RequestInfo: [test.topic,0] -> PartitionFetchInfo(3766065,1048576).
>> >>> Possible cause: java.nio.channels.ClosedChannelException
>> >>> (kafka.consumer.ConsumerFetcherThread)
>> >>> [2015-03-21 02:11:53,083] ERROR
>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error
>> >>>due to
>> >>>  (kafka.consumer.ConsumerFetcherThread)
>> >>> java.lang.IllegalMonitorStateException
>> >>>         at
>> >>>
>>
>> >>>java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.j
>> >>>ava:155)
>> >>>         at
>> >>>
>>
>> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQu
>> >>>euedSynchronizer.java:1260)
>> >>>         at
>> >>>
>>
>> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Abstr
>> >>>actQueuedSynchronizer.java:1723)
>> >>>         at
>> >>>
>>
>> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
>> >>>ait(AbstractQueuedSynchronizer.java:2166)
>> >>>         at
>> >>>
>>
>> >>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh
>> >>>read.scala:106)
>> >>>         at
>> >>>
>>
>> >>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90
>> >>>)
>> >>>         at
>> >>>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>> >>> [2015-03-21 02:11:53,083] INFO
>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped
>> >>>  (kafka.consumer.ConsumerFetcherThread)
>> >>>
>> >>> I am still investigating what caused the connection error on server
>> >>>side
>> >>> but I have a couple of questions related to mirror maker itself
>> >>>
>> >>> 1. What is root cause of java.lang.IllegalMonitorStateException? As
>> >>>shown
>> >>> in the AbstractFetcherThread source the fetcher thread should catch
>> the
>> >>> java.io.EOFException thrown from underlying simplyConsumer and sleep a
>> >>> while before next run.
>> >>> 2. Mirror maker is unaware of the termination of fetcher thread. That
>> >>> makes it unable to detect the failure and trigger rebalancing. I have
>> 3
>> >>> mirror maker instances running in 3 different machines listening to
>> the
>> >>> same topic. I would expect the mirror maker will release the partition
>> >>> ownership when underlying fetcher thread terminates so that
>> >>>rebalancing can
>> >>> be triggered.but in fact this is not the case. is this expected
>> >>>behavior or
>> >>> do I miss configure anything?
>> >>>
>> >>> I am running the trunk version as of commit
>> >>> 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d
>> >>>
>> >>> --
>> >>> Regards,
>> >>> Tao
>> >>>
>> >>
>> >>
>> >>
>> >> --
>> >> Regards,
>> >> Tao
>> >>
>> >
>> >
>> >
>> >--
>> >Regards,
>> >Tao
>>
>>
>
>
> --
> Regards,
> Tao
>



-- 
Regards,
Tao

Reply via email to