I’m using a new flume sink to S3 that doesn’t seem to successfully close out 
.tmp files created in S3 buckets. So I’m essentially getting a whole lot of 
unclosed .tmp files.

The IAM role being used has full S3 permissions to this bucket.

Here’s the flume error when trying to rename and close the file (cp & delete)

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
25 Apr 2019 21:20:01,522 ERROR [hdfs-S3Sink-call-runner-7] 
(org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObjects:1151)  - 
button/qa1-event1/: "AccessDenied" - Access Denied
25 Apr 2019 21:20:01,675 WARN  [hdfs-S3Sink-roll-timer-0] 
(org.apache.flume.sink.hdfs.BucketWriter.close:427)  - failed to rename() file 
(s3a://my-bucket-name/button/qa1-event1/FlumeData.1556226600899.tmp). Exception 
follows.
java.nio.file.AccessDeniedException: 
s3a://my-bucket-name/button/qa1-event1/FlumeData.1556226600899.tmp: 
getFileStatus on 
s3a://my-bucket-name/button/qa1-event1./FlumeData.1556226600899.tmp: 
com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon 
S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 68D5110FD4C0C1DA), 
S3 Extended Request ID: 
xk9gb+hY0NUrqAQS9NQW6dDZL35p0I4SpO57b/o9YZucaVtuk1igtPfYaQZTgEfPrHepyxm6+q8=
        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)
        at 
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:120)
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:1886)
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:1855)
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1799)
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1418)
        at 
org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:2529)
        at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:654)
        at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:651)
        at 
org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
        at 
org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Here’s my S3 sink.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
agent.sinks.S3Sink.type = hdfs
agent.sinks.S3Sink.hdfs.path = s3a://my-bucket-name/
agent.sinks.S3Sink.channel = S3Channel
agent.sinks.S3Sink.hdfs.fileType = DataStream
agent.sinks.S3Sink.hdfs.writeFormat = Text
agent.sinks.S3Sink.hdfs.rollCount = 0
agent.sinks.S3Sink.hdfs.rollSize = 0
agent.sinks.S3Sink.hdfs.batchSize = 10000
agent.sinks.S3Sink.hdfs.rollInterval = 600
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Reply via email to