Thank you for the explanation.

Patch submitted https://issues.apache.org/jira/browse/KAFKA-2048

On Wed, Mar 25, 2015 at 8:29 AM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> It should be another ticket. This is a AbstractFetcherThread issue rather
> than a mirror maker issue.
>
> I kind of think this case you saw was a special case as it¹s not actually
> a runtime error but a coding bug. Fetcher thread should not die by design.
> So I don¹t think we have a way to restart fetchers without code change if
> they die accidentally. One way to do this is to add liveness check in
> LeaderFinderThread. But I don¹t know if this is a necessary change just
> because of the case you saw.
>
> Jiangjie (Becket) Qin
>
> On 3/24/15, 5:05 PM, "tao xiao" <xiaotao...@gmail.com> wrote:
>
> >The other question I have is the fact that consumer client is unaware of
> >the health status of underlying fetcher thread. If the fetcher thread dies
> >like the case I encountered is there a way that consumer can restart the
> >fetcher thread or release ownership of partitions so that other consumers
> >can pick them up while fetcher thread is down.
> >
> >On Wed, Mar 25, 2015 at 8:00 AM, tao xiao <xiaotao...@gmail.com> wrote:
> >
> >> Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket?
> >>
> >> On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin
> >><j...@linkedin.com.invalid>
> >> wrote:
> >>
> >>> Hi Xiao,
> >>>
> >>> I think the fix for IllegalStateExcepetion is correct.
> >>> Can you also create a ticket and submit a patch?
> >>>
> >>> Thanks.
> >>>
> >>> Jiangjie (Becket) Qin
> >>>
> >>> On 3/24/15, 4:31 PM, "tao xiao" <xiaotao...@gmail.com> wrote:
> >>>
> >>> >Hi community,
> >>> >
> >>> >I wanted to know if the solution I supplied can fix the
> >>> >IllegalMonitorStateException
> >>> >issue. Our work is pending on this and we'd like to proceed ASAP.
> >>>Sorry
> >>> >for
> >>> >bothering.
> >>> >
> >>> >On Mon, Mar 23, 2015 at 4:32 PM, tao xiao <xiaotao...@gmail.com>
> >>>wrote:
> >>> >
> >>> >> I think I worked out the answer to question 1.
> >>> >>java.lang.IllegalMonitorStateException
> >>> >> was thrown due to no ownership of ReentrantLock when trying to call
> >>> >>await()
> >>> >> on the lock condition.
> >>> >>
> >>> >> Here is the code snippet from the AbstractFetcherThread.scala in
> >>>trunk
> >>> >>
> >>> >> partitionMapLock synchronized {
> >>> >>             partitionsWithError ++= partitionMap.keys
> >>> >>             // there is an error occurred while fetching partitions,
> >>> >>sleep
> >>> >> a while
> >>> >>             partitionMapCond.await(fetchBackOffMs,
> >>> >>TimeUnit.MILLISECONDS)
> >>> >> }
> >>> >>
> >>> >> as shown above partitionMapLock is not acquired before calling
> >>> >> partitionMapCond.await
> >>> >>
> >>> >> we can fix this by explicitly calling partitionMapLock.lock(). below
> >>> >>code
> >>> >> block should work
> >>> >>
> >>> >> inLock(partitionMapLock) {
> >>> >>             partitionsWithError ++= partitionMap.keys
> >>> >>             // there is an error occurred while fetching partitions,
> >>> >>sleep
> >>> >> a while
> >>> >>             partitionMapCond.await(fetchBackOffMs,
> >>> >>TimeUnit.MILLISECONDS)
> >>> >> }
> >>> >>
> >>> >> On Mon, Mar 23, 2015 at 1:50 PM, tao xiao <xiaotao...@gmail.com>
> >>> wrote:
> >>> >>
> >>> >>> Hi,
> >>> >>>
> >>> >>> I was running a mirror maker and got
> >>> >>>  java.lang.IllegalMonitorStateException that caused the underlying
> >>> >>>fetcher
> >>> >>> thread completely stopped. Here is the log from mirror maker.
> >>> >>>
> >>> >>> [2015-03-21 02:11:53,069] INFO Reconnect due to socket error:
> >>> >>> java.io.EOFException: Received -1 when reading from channel, socket
> >>> has
> >>> >>> likely been closed. (kafka.consumer.SimpleConsumer)
> >>> >>> [2015-03-21 02:11:53,081] WARN
> >>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6],
> >>>Error in
> >>> >>> fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588;
> >>> ClientId:
> >>> >>> phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1
> >>>bytes;
> >>> >>> RequestInfo: [test.topic,0] -> PartitionFetchInfo(3766065,1048576).
> >>> >>> Possible cause: java.nio.channels.ClosedChannelException
> >>> >>> (kafka.consumer.ConsumerFetcherThread)
> >>> >>> [2015-03-21 02:11:53,083] ERROR
> >>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error
> >>> >>>due to
> >>> >>>  (kafka.consumer.ConsumerFetcherThread)
> >>> >>> java.lang.IllegalMonitorStateException
> >>> >>>         at
> >>> >>>
> >>>
> >>>
> >>>>>>java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLoc
> >>>>>>k.j
> >>> >>>ava:155)
> >>> >>>         at
> >>> >>>
> >>>
> >>>
> >>>>>>java.util.concurrent.locks.AbstractQueuedSynchronizer.release(Abstrac
> >>>>>>tQu
> >>> >>>euedSynchronizer.java:1260)
> >>> >>>         at
> >>> >>>
> >>>
> >>>
> >>>>>>java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Ab
> >>>>>>str
> >>> >>>actQueuedSynchronizer.java:1723)
> >>> >>>         at
> >>> >>>
> >>>
> >>>
> >>>>>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject
> >>>>>>.aw
> >>> >>>ait(AbstractQueuedSynchronizer.java:2166)
> >>> >>>         at
> >>> >>>
> >>>
> >>>
> >>>>>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetche
> >>>>>>rTh
> >>> >>>read.scala:106)
> >>> >>>         at
> >>> >>>
> >>>
> >>>
> >>>>>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala
> >>>>>>:90
> >>> >>>)
> >>> >>>         at
> >>> >>>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> >>> >>> [2015-03-21 02:11:53,083] INFO
> >>> >>> [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6],
> >>>Stopped
> >>> >>>  (kafka.consumer.ConsumerFetcherThread)
> >>> >>>
> >>> >>> I am still investigating what caused the connection error on server
> >>> >>>side
> >>> >>> but I have a couple of questions related to mirror maker itself
> >>> >>>
> >>> >>> 1. What is root cause of java.lang.IllegalMonitorStateException? As
> >>> >>>shown
> >>> >>> in the AbstractFetcherThread source the fetcher thread should catch
> >>> the
> >>> >>> java.io.EOFException thrown from underlying simplyConsumer and
> >>>sleep a
> >>> >>> while before next run.
> >>> >>> 2. Mirror maker is unaware of the termination of fetcher thread.
> >>>That
> >>> >>> makes it unable to detect the failure and trigger rebalancing. I
> >>>have
> >>> 3
> >>> >>> mirror maker instances running in 3 different machines listening to
> >>> the
> >>> >>> same topic. I would expect the mirror maker will release the
> >>>partition
> >>> >>> ownership when underlying fetcher thread terminates so that
> >>> >>>rebalancing can
> >>> >>> be triggered.but in fact this is not the case. is this expected
> >>> >>>behavior or
> >>> >>> do I miss configure anything?
> >>> >>>
> >>> >>> I am running the trunk version as of commit
> >>> >>> 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d
> >>> >>>
> >>> >>> --
> >>> >>> Regards,
> >>> >>> Tao
> >>> >>>
> >>> >>
> >>> >>
> >>> >>
> >>> >> --
> >>> >> Regards,
> >>> >> Tao
> >>> >>
> >>> >
> >>> >
> >>> >
> >>> >--
> >>> >Regards,
> >>> >Tao
> >>>
> >>>
> >>
> >>
> >> --
> >> Regards,
> >> Tao
> >>
> >
> >
> >
> >--
> >Regards,
> >Tao
>
>


-- 
Regards,
Tao

Reply via email to