[
https://issues.apache.org/jira/browse/FLUME-757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13096887#comment-13096887
]
Jonathan Hsieh commented on FLUME-757:
--------------------------------------
I looks like there is a race condition in CustomDFSSink where pathManager is
null when it is attempting to close. The only way this could happen is if the
open call and close call are being called concurrently.
{code}
@Override
public void close() throws IOException {
LOG.info("Closing HDFS file: " + pathManager.getOpenPath()); /// NPE here.
writer.flush();
LOG.info("done writing raw file to hdfs");
writer.close();
pathManager.close();
writer = null;
}
{code}
First thought is that we need to get a local instance of the pathManager and
then do null checks on it, or to possibly put synchronization on open and close
calls.
{code}
@Override
public void close() throws IOException {
LOG.info("Closing HDFS file: " + pathManager.getOpenPath());
writer.flush();
LOG.info("done writing raw file to hdfs");
writer.close();
pathManager.close();
writer = null;
}
{code}
Want to give it a try?
> Flume Collector Sink Fails Due to Driver Exception
> --------------------------------------------------
>
> Key: FLUME-757
> URL: https://issues.apache.org/jira/browse/FLUME-757
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: v0.9.4
> Reporter: Cameron Gandevia
>
> I am experiencing a weird issue with my Flume configuration. Occasionally
> when I start my agents they start delivering messages to the collectors and
> after awhile I receive the following exceptions and stop receiving messages.
> The collectors are configured as follows.
> Source: collectorSource(36892)
> Sink: {regex("^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)", 1,
> "date") => {exDate("date", "yyyy-MM-dd HH:mm") =>
> collectorSink("hdfs://hadoop-namenode1:8020/logs/%{dateyear}-%{datemonth}-%{dateday}/%{datehr}00/%{host}",
> "log-")}}
> The agents are configured as follows.
> Source: tailDir("/data1/logs", delim="\\n\\d\\d\\d\\d", delimMode="next")
> Sink: agentDFOChain("flume-collector1:36892", "flume-collector2:36892")
> The following exceptions are from my collectors.
>
> INFO com.cloudera.flume.handlers.debug.InsistentAppendDecorator - Failed due
> to unexpected runtime exception during append attempt
> java.lang.NullPointerException
> at
> com.cloudera.flume.handlers.hdfs.CustomDfsSink.close(CustomDfsSink.java:88)
> at
> com.cloudera.flume.handlers.hdfs.EscapedCustomDfsSink.close(EscapedCustomDfsSink.java:132)
> at com.cloudera.flume.core.CompositeSink.close(CompositeSink.java:56)
> at
> com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
> at
> com.cloudera.flume.collector.CollectorSink$RollDetectDeco.close(CollectorSink.java:171)
> at
> com.cloudera.flume.handlers.rolling.RollSink.close(RollSink.java:331)
> at
> com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
> at
> com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
> at
> com.cloudera.flume.handlers.debug.InsistentOpenDecorator.close(InsistentOpenDecorator.java:175)
> at
> com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
> at
> com.cloudera.flume.handlers.debug.StubbornAppendSink.append(StubbornAppendSink.java:96)
> at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.handlers.debug.InsistentAppendDecorator.append(InsistentAppendDecorator.java:110)
> at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.handlers.endtoend.AckChecksumChecker.append(AckChecksumChecker.java:113)
> at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.handlers.batch.UnbatchingDecorator.append(UnbatchingDecorator.java:62)
> at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.handlers.batch.GunzipDecorator.append(GunzipDecorator.java:81)
> at
> com.cloudera.flume.collector.CollectorSink.append(CollectorSink.java:222)
> at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.core.extractors.DateExtractor.append(DateExtractor.java:129)
> at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.core.extractors.RegexExtractor.append(RegexExtractor.java:88)
> at
> com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:110)
> ERROR com.cloudera.flume.core.connector.DirectDriver - Closing down due to
> exception during append calls
> java.lang.NullPointerException
> at
> com.cloudera.flume.handlers.hdfs.CustomDfsSink.close(CustomDfsSink.java:88)
> at
> com.cloudera.flume.handlers.hdfs.EscapedCustomDfsSink.close(EscapedCustomDfsSink.java:132)
> at com.cloudera.flume.core.CompositeSink.close(CompositeSink.java:56)
> at
> com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
> at
> com.cloudera.flume.collector.CollectorSink$RollDetectDeco.close(CollectorSink.java:171)
> at
> com.cloudera.flume.handlers.rolling.RollSink.close(RollSink.java:331)
> at
> com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
> at
> com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
> at
> com.cloudera.flume.handlers.debug.InsistentOpenDecorator.close(InsistentOpenDecorator.java:175)
> at
> com.cloudera.flume.core.EventSinkDecorator.close(EventSinkDecorator.java:67)
> at
> com.cloudera.flume.handlers.debug.StubbornAppendSink.append(StubbornAppendSink.java:96)
> at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.handlers.debug.InsistentAppendDecorator.append(InsistentAppendDecorator.java:110)
> at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.handlers.endtoend.AckChecksumChecker.append(AckChecksumChecker.java:113)
> at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.handlers.batch.UnbatchingDecorator.append(UnbatchingDecorator.java:62)
> at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.handlers.batch.GunzipDecorator.append(GunzipDecorator.java:81)
> at
> com.cloudera.flume.collector.CollectorSink.append(CollectorSink.java:222)
> at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.core.extractors.DateExtractor.append(DateExtractor.java:129)
> at
> com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
> at
> com.cloudera.flume.core.extractors.RegexExtractor.append(RegexExtractor.java:88)
> at
> com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:110)
> The following are from my agents
> ERROR com.cloudera.flume.core.connector.DirectDriver - Expected IDLE but
> timed out in state ACTIVE
> INFO com.cloudera.flume.agent.diskfailover.DiskFailoverDeco - Closing disk
> failover log, subsink still making progre
> ERROR com.cloudera.flume.agent.LogicalNode - Forcing driver to exit uncleanly
> INFO com.cloudera.flume.agent.LogicalNode - Node config successfully set to
> com.cloudera.flume.conf.FlumeConfigData@19a0feb
> WARN com.cloudera.flume.handlers.debug.LazyOpenDecorator - Closing a lazy
> sink that was not logically opened
> INFO com.cloudera.flume.handlers.text.TailDirSource - added file
> /data1/mutator/logs/one.log
> INFO com.cloudera.flume.handlers.rolling.RollSink - Created RollSink:
> trigger=[TimeTrigger: maxAge=10000 tagger=com.
> cloudera.flume.handlers.rolling.ProcessTagger@6279d] checkPeriodMs = 250
> spec='NaiveFileFailover'
> INFO com.cloudera.flume.handlers.rolling.RollSink - opening RollSink
> 'NaiveFileFailover'
> ERROR com.cloudera.flume.agent.diskfailover.DiskFailoverDeco - WAL drain
> thread interrupted
> java.lang.InterruptedException
> at java.lang.Object.wait(Native Method)
> at
> com.cloudera.flume.core.connector.DirectDriver.waitForAtLeastState(DirectDriver.java:308)
> at
> com.cloudera.flume.agent.diskfailover.DiskFailoverDeco.ensureClosedDrainDriver(DiskFailoverDeco.java:129)
> at
> com.cloudera.flume.agent.diskfailover.DiskFailoverDeco.close(DiskFailoverDeco.java:177)
> at
> com.cloudera.flume.core.BackOffFailOverSink.close(BackOffFailOverSink.java:165)
> at com.cloudera.flume.core.CompositeSink.close(CompositeSink.java:56)
> at
> com.cloudera.flume.agent.AgentFailChainSink.close(AgentFailChainSink.java:98)
> at com.cloudera.flume.core.CompositeSink.close(CompositeSink.java:56)
> at
> com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:126)
--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira