[
https://issues.apache.org/jira/browse/FLUME-3203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Umesh Chaudhary updated FLUME-3203:
-----------------------------------
Description:
Here are the steps to reproduce:
1) Use below config in the flume agent:
{code:java}
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 1000
tier1.channels.channel1.transactionCapacity = 1000
tier1.sinks.sink1.channel = channel1
tier1.sources.source1.channels = channel1
tier1.sources.source1.type = spooldir
tier1.sources.source1.spoolDir = /root/testSpoolDir
tier1.sources.source1.fileHeader = true
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path =/tmp/spoolEvnts
tier1.sinks.sink1.hdfs.filePrefix = events-
{code}
2) When the agent is started with the above config, use below command to move a
sample text file in spooling dir:
{code:java}
mv Sample-text-file-50kb.txt /home/systest/spoolDir
{code}
agent will start processing the events and output can be seen in HDFS dir:
{code:java}
$ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l
37
{code}
3) Again move same file into spooling dir using below command:
{code:java}
mv /tmp/Sample-text-file-50kb.txt /home/systest/spoolDir
{code}
This time flume will raise an exception as below but continue processing the
file again:
{noformat}
2017-12-21 00:00:27,581 INFO
org.apache.flume.client.avro.ReliableSpoolingFileEventReader: Preparing to move
file /home/systest/spoolDir/Sample-text-file-50kb.txt to
/home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED
2017-12-21 00:00:27,582 ERROR org.apache.flume.source.SpoolDirectorySource:
FATAL: Spool Directory source source1: { spoolDir: /home/systest/spoolDir }:
Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume
to continue processing.
java.lang.IllegalStateException: File name has been re-used with different
files. Spooling assumptions violated for
/home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED
at
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:463)
at
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:414)
at
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:326)
at
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2017-12-21 00:00:31,265 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing
/tmp/spoolEvnts/events-.1513843202836.tmp
2017-12-21 00:00:31,275 INFO org.apache.flume.sink.hdfs.BucketWriter: Renaming
/tmp/spoolEvnts/events-.1513843202836.tmp to
/tmp/spoolEvnts/events-.1513843202836
2017-12-21 00:00:31,293 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating
/tmp/spoolEvnts/events-.1513843202837.tmp
2017-12-21 00:00:31,321 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing
/tmp/spoolEvnts/events-.1513843202837.tmp
{noformat}
And if we check at HDFS it shows the below file count :
{code:java}
$ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l
72
{code}
Based on [the
doc|https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source] it
should not process the file which has same name with .COMPLETED suffix. It
causes duplicate records on sink.
was:
Here are the steps to reproduce:
1) Use below config in the flume agent:
{code:java}
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 1000
tier1.channels.channel1.transactionCapacity = 1000
tier1.sinks.sink1.channel = channel1
tier1.sources.source1.channels = channel1
tier1.sources.source1.type = spooldir
tier1.sources.source1.spoolDir = /root/testSpoolDir
tier1.sources.source1.fileHeader = true
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path =/tmp/spoolEvnts
tier1.sinks.sink1.hdfs.filePrefix = events-
{code}
2) When the agent is started with the above config, use below command to move a
sample text file in spooling dir:
{code:java}
mv Sample-text-file-50kb.txt /home/systest/spoolDir
{code}
agent will start processing the events and output can be seen in HDFS dir:
{code:java}
$ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l
37
{code}
3) Again move same file into spooling dir using below command:
{code:java}
mv /tmp/Sample-text-file-50kb.txt /home/systest/spoolDir
{code}
This time flume will raise an exception as below but continue processing the
file again:
{noformat}
2017-12-21 00:00:27,581 INFO
org.apache.flume.client.avro.ReliableSpoolingFileEventReader: Preparing to move
file /home/systest/spoolDir/Sample-text-file-50kb.txt to
/home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED
2017-12-21 00:00:27,582 ERROR org.apache.flume.source.SpoolDirectorySource:
FATAL: Spool Directory source source1: { spoolDir: /home/systest/spoolDir }:
Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume
to continue processing.
java.lang.IllegalStateException: File name has been re-used with different
files. Spooling assumptions violated for
/home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED
at
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:463)
at
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:414)
at
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:326)
at
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2017-12-21 00:00:31,265 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing
/tmp/spoolEvnts/events-.1513843202836.tmp
2017-12-21 00:00:31,275 INFO org.apache.flume.sink.hdfs.BucketWriter: Renaming
/tmp/spoolEvnts/events-.1513843202836.tmp to
/tmp/spoolEvnts/events-.1513843202836
2017-12-21 00:00:31,293 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating
/tmp/spoolEvnts/events-.1513843202837.tmp
2017-12-21 00:00:31,321 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing
/tmp/spoolEvnts/events-.1513843202837.tmp
{noformat}
And if we check at HDFS it shows the below file count :
{code:java}
$ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l
72
{code}
Based on [the
doc|https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source] it
should not process the same file which has same name with .COMPLETED suffix. It
causes duplicate records on sink.
> Spooling dir source leaks records from a file when a corresponding .COMPLETED
> file already present
> --------------------------------------------------------------------------------------------------
>
> Key: FLUME-3203
> URL: https://issues.apache.org/jira/browse/FLUME-3203
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: 1.6.0
> Reporter: Umesh Chaudhary
>
> Here are the steps to reproduce:
> 1) Use below config in the flume agent:
> {code:java}
> tier1.sources = source1
> tier1.channels = channel1
> tier1.sinks = sink1
> tier1.channels.channel1.type = memory
> tier1.channels.channel1.capacity = 1000
> tier1.channels.channel1.transactionCapacity = 1000
> tier1.sinks.sink1.channel = channel1
> tier1.sources.source1.channels = channel1
> tier1.sources.source1.type = spooldir
> tier1.sources.source1.spoolDir = /root/testSpoolDir
> tier1.sources.source1.fileHeader = true
> tier1.sinks.sink1.type = hdfs
> tier1.sinks.sink1.hdfs.path =/tmp/spoolEvnts
> tier1.sinks.sink1.hdfs.filePrefix = events-
> {code}
> 2) When the agent is started with the above config, use below command to move
> a sample text file in spooling dir:
> {code:java}
> mv Sample-text-file-50kb.txt /home/systest/spoolDir
> {code}
> agent will start processing the events and output can be seen in HDFS dir:
> {code:java}
> $ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l
> 37
> {code}
> 3) Again move same file into spooling dir using below command:
> {code:java}
> mv /tmp/Sample-text-file-50kb.txt /home/systest/spoolDir
> {code}
> This time flume will raise an exception as below but continue processing the
> file again:
> {noformat}
> 2017-12-21 00:00:27,581 INFO
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader: Preparing to
> move file /home/systest/spoolDir/Sample-text-file-50kb.txt to
> /home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED
> 2017-12-21 00:00:27,582 ERROR org.apache.flume.source.SpoolDirectorySource:
> FATAL: Spool Directory source source1: { spoolDir: /home/systest/spoolDir }:
> Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure
> Flume to continue processing.
> java.lang.IllegalStateException: File name has been re-used with different
> files. Spooling assumptions violated for
> /home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED
> at
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:463)
> at
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:414)
> at
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:326)
> at
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2017-12-21 00:00:31,265 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing
> /tmp/spoolEvnts/events-.1513843202836.tmp
> 2017-12-21 00:00:31,275 INFO org.apache.flume.sink.hdfs.BucketWriter:
> Renaming /tmp/spoolEvnts/events-.1513843202836.tmp to
> /tmp/spoolEvnts/events-.1513843202836
> 2017-12-21 00:00:31,293 INFO org.apache.flume.sink.hdfs.BucketWriter:
> Creating /tmp/spoolEvnts/events-.1513843202837.tmp
> 2017-12-21 00:00:31,321 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing
> /tmp/spoolEvnts/events-.1513843202837.tmp
> {noformat}
> And if we check at HDFS it shows the below file count :
> {code:java}
> $ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l
> 72
> {code}
> Based on [the
> doc|https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source]
> it should not process the file which has same name with .COMPLETED suffix. It
> causes duplicate records on sink.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)