I’m using a new flume sink to S3 that doesn’t seem to successfully close out
.tmp files created in S3 buckets. So I’m essentially getting a whole lot of
unclosed .tmp files.
The IAM role being used has full S3 permissions to this bucket.
Here’s the flume error when trying to rename and close the file (cp & delete)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
25 Apr 2019 21:20:01,522 ERROR [hdfs-S3Sink-call-runner-7]
(org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObjects:1151) -
button/qa1-event1/: "AccessDenied" - Access Denied
25 Apr 2019 21:20:01,675 WARN [hdfs-S3Sink-roll-timer-0]
(org.apache.flume.sink.hdfs.BucketWriter.close:427) - failed to rename() file
(s3a://my-bucket-name/button/qa1-event1/FlumeData.1556226600899.tmp). Exception
follows.
java.nio.file.AccessDeniedException:
s3a://my-bucket-name/button/qa1-event1/FlumeData.1556226600899.tmp:
getFileStatus on
s3a://my-bucket-name/button/qa1-event1./FlumeData.1556226600899.tmp:
com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon
S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 68D5110FD4C0C1DA),
S3 Extended Request ID:
xk9gb+hY0NUrqAQS9NQW6dDZL35p0I4SpO57b/o9YZucaVtuk1igtPfYaQZTgEfPrHepyxm6+q8=
at
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)
at
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:120)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:1886)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:1855)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1799)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1418)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:2529)
at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:654)
at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:651)
at
org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
at
org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Here’s my S3 sink.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
agent.sinks.S3Sink.type = hdfs
agent.sinks.S3Sink.hdfs.path = s3a://my-bucket-name/
agent.sinks.S3Sink.channel = S3Channel
agent.sinks.S3Sink.hdfs.fileType = DataStream
agent.sinks.S3Sink.hdfs.writeFormat = Text
agent.sinks.S3Sink.hdfs.rollCount = 0
agent.sinks.S3Sink.hdfs.rollSize = 0
agent.sinks.S3Sink.hdfs.batchSize = 10000
agent.sinks.S3Sink.hdfs.rollInterval = 600
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~