[ 
https://issues.apache.org/jira/browse/FLUME-1232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13397367#comment-13397367
 ] 

Juhani Connolly commented on FLUME-1232:
----------------------------------------

Sorry for hopping onto this with Arvind assigned but we need to have it working 
soon...

It looks to me like

    int pendingTakesSize = pendingTakes.size();
    if (pendingTakesSize > 0) {
      String msg = "Pending takes " + pendingTakesSize
          + " exist after the end of replay";
      if (LOG.isDebugEnabled()) {
        for (Long pointer : pendingTakes) {
          LOG.debug("Pending take " + FlumeEventPointer.fromLong(pointer));
        }
        Preconditions.checkState(false, msg);    // <------------------- THIS
      } else {
        LOG.error(msg + ". Duplicate messages will exist in destination.");
      }
    }
    LOG.info("Replayed " + total);

Is killing it. I'm not sure why we're checking preconditions in a 
isDebugEnabled conditional clause(I guess the intention is to kill things 
during debug?). Regardless, that cancels the replay, leaving the Log state as 
open=false, and the files are still locked. From there, the the 
FileChannel.getDepth throws in exception inside log.getFlumeEventQueue because 
the log is closed, cancelling the start(), which exits without switching to 
LifeCycleState.STARTED. From there on, further cycles simply cannot start 
because the file remains locked.

So, in summary the problem occurs under the following circumstances:
- Debug level logging is enabled
- There are uncompleted takes in the queue.

Everything else appears to be coincidence(or there may be a bug that causes 
takes to not be properly processed?)

Some general problems:
- Success of the replay isn't checked
- Locks are depending on VM shutdown to be released, unless they actually 
succeeded the first time around(in the Log constructor).
- Surely sometimes having pending takes still hanging, is unavoidable... I 
don't think we need to be busting out in that situation as I'm sure some users 
will still be running with logs on debug? I guess the 
Preconditions.chackState(false) may be a remnant from debugging?

I haven't been able to look over everything in detail but merely commenting out 
the Preconditions line was enough to make everything work and should be enough 
for a quick fix(of course one can also confirm this is the problem by running 
with a log level above debug).
                
> Cannot start agent a 3rd time when using FileChannel
> ----------------------------------------------------
>
>                 Key: FLUME-1232
>                 URL: https://issues.apache.org/jira/browse/FLUME-1232
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: v1.2.0
>         Environment: RHEL 5.6 64-bit
>            Reporter: Will McQueen
>            Assignee: Arvind Prabhakar
>            Priority: Blocker
>             Fix For: v1.2.0
>
>
> Steps:
> 1) Start clean by wiping-out FileChannel's existing checkpoint dir and data 
> dir
> 2) Configure the agent to use filechannel (type = FILE). THe config file I 
> used is at the end of this text.
> 3) Start the agent, confirm lock files exist in data and checkpoint dirs, 
> stop agent, confirm lock files are remove from data and checkpoint dirs.
> 4) Repeat step 3
> 5) Start the agent. The following exceptions are shown in the logs:
> 2012-05-29 03:15:36,453 DEBUG file.ReplayHandler: record.getTimestamp() = 
> 1338286275813, lastCheckpoint = 1338286279596, fileId = 1, offset = 1924, 
> type = Commit, transaction 1338285619250
> 2012-05-29 03:15:36,453 DEBUG file.ReplayHandler: record.getTimestamp() = 
> 1338286279783, lastCheckpoint = 1338286279596, fileId = 1, offset = 1949, 
> type = Take, transaction 1338285619251
> 2012-05-29 03:15:36,453 DEBUG file.ReplayHandler: record.getTimestamp() = 
> 1338286279784, lastCheckpoint = 1338286279596, fileId = 1, offset = 1980, 
> type = Commit, transaction 1338285619251
> 2012-05-29 03:15:36,453 DEBUG file.ReplayHandler: Processing commit of Take
> 2012-05-29 03:15:36,453 INFO file.ReplayHandler: Replayed 1 from 
> /var/run/flume-ng/.flume/file-channel/data/log-1
> 2012-05-29 03:15:36,453 INFO file.ReplayHandler: Replaying 
> /var/run/flume-ng/.flume/file-channel/data/log-2
> 2012-05-29 03:15:36,454 DEBUG file.ReplayHandler: record.getTimestamp() = 
> 1338286370280, lastCheckpoint = 1338286279596, fileId = 2, offset = 8, type = 
> Take, transaction 1338286369982
> 2012-05-29 03:15:36,454 DEBUG file.ReplayHandler: record.getTimestamp() = 
> 1338286370287, lastCheckpoint = 1338286279596, fileId = 2, offset = 39, type 
> = Commit, transaction 1338286369982
> 2012-05-29 03:15:36,454 DEBUG file.ReplayHandler: Processing commit of Take
> 2012-05-29 03:15:36,454 INFO file.ReplayHandler: Unable to remove 
> FlumeEventPointer [fileID=1, offset=1853] added to pending list
> 2012-05-29 03:15:36,454 INFO file.ReplayHandler: Replayed 1 from 
> /var/run/flume-ng/.flume/file-channel/data/log-2
> 2012-05-29 03:15:36,454 DEBUG file.ReplayHandler: Pending take 
> FlumeEventPointer [fileID=1, offset=1853]
> 2012-05-29 03:15:36,455 ERROR file.Log: Failed to initialize Log
> java.lang.IllegalStateException: Pending takes 1 exist after the end of replay
>         at 
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>         at 
> org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:137)
>         at org.apache.flume.channel.file.Log.replay(Log.java:205)
>         at 
> org.apache.flume.channel.file.FileChannel.start(FileChannel.java:180)
>         at 
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:228)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>         at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> 2012-05-29 03:15:36,457 ERROR lifecycle.LifecycleSupervisor: Unable to start 
> org.apache.flume.channel.file.FileChannel@1ac88440 - Exception follows.
> java.lang.IllegalStateException: Log is closed
>         at 
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>         at org.apache.flume.channel.file.Log.getFlumeEventQueue(Log.java:226)
>         at 
> org.apache.flume.channel.file.FileChannel.getDepth(FileChannel.java:253)
>         at 
> org.apache.flume.channel.file.FileChannel.start(FileChannel.java:187)
>         at 
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:228)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>         at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> 2012-05-29 03:15:36,458 ERROR flume.SinkRunner: Unhandled exception, logging 
> and sleeping for 5000ms
> java.lang.IllegalStateException: Channel closed
>         at 
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>         at 
> org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:237)
>         at 
> org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:118)
>         at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:61)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:662)
> 2012-05-29 03:15:39,460 INFO file.FileChannel: Starting FileChannel with 
> dataDir [/var/run/flume-ng/.flume/file-channel/data]
> 2012-05-29 03:15:39,460 INFO file.Log: Cannot lock 
> /var/run/flume-ng/.flume/file-channel/checkpoint. The directory is already 
> locked.
> 2012-05-29 03:15:39,461 ERROR lifecycle.LifecycleSupervisor: Unable to start 
> org.apache.flume.channel.file.FileChannel@1ac88440 - Exception follows.
> java.lang.RuntimeException: java.io.IOException: Cannot lock 
> /var/run/flume-ng/.flume/file-channel/checkpoint. The directory is already 
> locked.
>         at com.google.common.base.Throwables.propagate(Throwables.java:156)
>         at 
> org.apache.flume.channel.file.FileChannel.start(FileChannel.java:182)
>         at 
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:228)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>         at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> Config file I used (flume.conf):
> agent.channels = c1
> agent.sources = r1
> agent.sinks = k1
> #
> agent.channels.c1.type = FILE
> #
> agent.sources.r1.channels = c1
> agent.sources.r1.type = NETCAT
> agent.sources.r1.bind = 0.0.0.0
> agent.sources.r1.port = 41414
> #
> agent.sinks.k1.channel = c1
> agent.sinks.k1.type = LOGGER

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to