Exception when trying to change StreamingFileSink S3 bucket

2019-08-22 Thread sidhartha saurav
Hi,

We are trying to change our StreamingFileSink S3 bucket, say from s3://
*eu1/output_old* to s3://*eu2/output_new*. When we do so we get an
exception and the taskmanger goes into a restart loop.

We suspect that it tries to restore state and gets the bucketid from saved
state [* final BucketID bucketId =
recoveredState.getBucketId()*]. Flink then tries to read output_old from
eu2 and gets an AccessDeniedError. Rightly so as it has permission for
s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is
Flink trying to access the old bucket and how to avoid this exception.

Logs:

> "S3Committer.java","line":"87","message":"Failed to commit after recovery
output_old/2019-08-22/18/part-3-40134 with MPU ID
7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7.
Checking if file was committed before...",

> "Task.java","line":"910","message":"... switched from RUNNING to FAILED."

> java.nio.file.AccessDeniedException: output_old/2019-08-22/18/part-3-40134:
getObjectMetadata on output_old/2019-08-22/18/part-3-40134:
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden;
Request ID: 79F1AEE53131FB66; S3 Extended Request ID:
8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=),
S3 Extended Request ID:
8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403
Forbidden

flink-taskmanager at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
flink-taskmanager at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126)
flink-taskmanager at
org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:128)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
flink-taskmanager at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
flink-taskmanager at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
flink-taskmanager at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
*restoreFunctionState*(StreamingFunctionUtils.java:160)
flink-taskmanager at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
flink-taskmanager at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
flink-taskmanager at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
flink-taskmanager at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
flink-taskmanager at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
flink-taskmanager at java.lang.Thread.run(Thread.java:748)


We are using Flink 1.8 and externalized checkpoint. The S3 bucket for
externalized checkpoint have not been modified.

Thanks
Sidhartha


Re: Exception when trying to change StreamingFileSink S3 bucket

2019-09-04 Thread sidhartha saurav
Hi,

Can someone suggest a workaround so that we do not get this issue while
changing the S3 bucket ?

On Thu, Aug 22, 2019 at 4:24 PM sidhartha saurav 
wrote:

> Hi,
>
> We are trying to change our StreamingFileSink S3 bucket, say from s3://
> *eu1/output_old* to s3://*eu2/output_new*. When we do so we get an
> exception and the taskmanger goes into a restart loop.
>
> We suspect that it tries to restore state and gets the bucketid from saved
> state [* final BucketID bucketId =
> recoveredState.getBucketId()*]. Flink then tries to read output_old from
> eu2 and gets an AccessDeniedError. Rightly so as it has permission for
> s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is
> Flink trying to access the old bucket and how to avoid this exception.
>
> Logs:
>
> > "S3Committer.java","line":"87","message":"Failed to commit after
> recovery output_old/2019-08-22/18/part-3-40134 with MPU ID
> 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7.
> Checking if file was committed before...",
>
> > "Task.java","line":"910","message":"... switched from RUNNING to FAILED."
>
> > java.nio.file.AccessDeniedException: output_old/2019-08-22/18/part-3-40134:
> getObjectMetadata on output_old/2019-08-22/18/part-3-40134:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden;
> Request ID: 79F1AEE53131FB66; S3 Extended Request ID:
> 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=),
> S3 Extended Request ID:
> 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403
> Forbidden
>
> flink-taskmanager at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
> flink-taskmanager at
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126)
> flink-taskmanager at
> org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92)
> flink-taskmanager at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
> flink-taskmanager at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:128)
> flink-taskmanager at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
> flink-taskmanager at
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
> flink-taskmanager at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
> flink-taskmanager at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
> flink-taskmanager at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
> flink-taskmanager at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
> flink-taskmanager at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> flink-taskmanager at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
> *restoreFunctionState*(StreamingFunctionUtils.java:160)
> flink-taskmanager at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> flink-taskmanager at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
> flink-taskmanager at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> flink-taskmanager at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> flink-taskmanager at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> flink-taskmanager at java.lang.Thread.run(Thread.java:748)
>
>
> We are using Flink 1.8 and externalized checkpoint. The S3 bucket for
> externalized checkpoint have not been modified.
>
> Thanks
> Sidhartha
>


Re: Exception when trying to change StreamingFileSink S3 bucket

2019-09-05 Thread Fabian Hueske
Hi,

Kostas (in CC) might be able to help.

Best, Fabian

Am Mi., 4. Sept. 2019 um 22:59 Uhr schrieb sidhartha saurav <
sidsau...@gmail.com>:

> Hi,
>
> Can someone suggest a workaround so that we do not get this issue while
> changing the S3 bucket ?
>
> On Thu, Aug 22, 2019 at 4:24 PM sidhartha saurav 
> wrote:
>
>> Hi,
>>
>> We are trying to change our StreamingFileSink S3 bucket, say from s3://
>> *eu1/output_old* to s3://*eu2/output_new*. When we do so we get an
>> exception and the taskmanger goes into a restart loop.
>>
>> We suspect that it tries to restore state and gets the bucketid from
>> saved state [* final BucketID bucketId =
>> recoveredState.getBucketId()*]. Flink then tries to read output_old from
>> eu2 and gets an AccessDeniedError. Rightly so as it has permission for
>> s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is
>> Flink trying to access the old bucket and how to avoid this exception.
>>
>> Logs:
>>
>> > "S3Committer.java","line":"87","message":"Failed to commit after
>> recovery output_old/2019-08-22/18/part-3-40134 with MPU ID
>> 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7.
>> Checking if file was committed before...",
>>
>> > "Task.java","line":"910","message":"... switched from RUNNING to
>> FAILED."
>>
>> > java.nio.file.AccessDeniedException: output_old/2019-08-22/18/part-3-40134:
>> getObjectMetadata on output_old/2019-08-22/18/part-3-40134:
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>> Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden;
>> Request ID: 79F1AEE53131FB66; S3 Extended Request ID:
>> 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=),
>> S3 Extended Request ID:
>> 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403
>> Forbidden
>>
>> flink-taskmanager at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
>> flink-taskmanager at
>> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126)
>> flink-taskmanager at
>> org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:128)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>> flink-taskmanager at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>> flink-taskmanager at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
>> *restoreFunctionState*(StreamingFunctionUtils.java:160)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>> flink-taskmanager at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>> flink-taskmanager at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>> flink-taskmanager at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> flink-taskmanager at java.lang.Thread.run(Thread.java:748)
>>
>>
>> We are using Flink 1.8 and externalized checkpoint. The S3 bucket for
>> externalized checkpoint have not been modified.
>>
>> Thanks
>> Sidhartha
>>
>


Re: Exception when trying to change StreamingFileSink S3 bucket

2019-09-05 Thread Kostas Kloudas
Hi Sidhartha,

Your explanation is correct.
If you stopped the job with a savepoint and then you try to restore
from that savepoint, then Flink will try to restore its state
which is, of course, included in its old bucket.

But new data will go to the new bucket.

One solution is either to restart your job from scratch, if you do not
care about your "old" state.

Cheers,
Kostas

On Thu, Sep 5, 2019 at 1:27 PM Fabian Hueske  wrote:
>
> Hi,
>
> Kostas (in CC) might be able to help.
>
> Best, Fabian
>
> Am Mi., 4. Sept. 2019 um 22:59 Uhr schrieb sidhartha saurav 
> :
>>
>> Hi,
>>
>> Can someone suggest a workaround so that we do not get this issue while 
>> changing the S3 bucket ?
>>
>> On Thu, Aug 22, 2019 at 4:24 PM sidhartha saurav  wrote:
>>>
>>> Hi,
>>>
>>> We are trying to change our StreamingFileSink S3 bucket, say from 
>>> s3://eu1/output_old to s3://eu2/output_new. When we do so we get an 
>>> exception and the taskmanger goes into a restart loop.
>>>
>>> We suspect that it tries to restore state and gets the bucketid from saved 
>>> state [ final BucketID bucketId = 
>>> recoveredState.getBucketId()]. Flink then tries to read output_old from eu2 
>>> and gets an AccessDeniedError. Rightly so as it has permission for 
>>> s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is 
>>> Flink trying to access the old bucket and how to avoid this exception.
>>>
>>> Logs:
>>>
>>> > "S3Committer.java","line":"87","message":"Failed to commit after recovery 
>>> > output_old/2019-08-22/18/part-3-40134 with MPU ID 
>>> > 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7.
>>> >  Checking if file was committed before...",
>>>
>>> > "Task.java","line":"910","message":"... switched from RUNNING to FAILED."
>>>
>>> > java.nio.file.AccessDeniedException: 
>>> > output_old/2019-08-22/18/part-3-40134: getObjectMetadata on 
>>> > output_old/2019-08-22/18/part-3-40134: 
>>> > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>> >  Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 
>>> > Forbidden; Request ID: 79F1AEE53131FB66; S3 Extended Request ID: 
>>> > 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=),
>>> >  S3 Extended Request ID: 
>>> > 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403
>>> >  Forbidden
>>>
>>> flink-taskmanager at 
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
>>> flink-taskmanager at 
>>> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126)
>>> flink-taskmanager at 
>>> org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:128)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>> flink-taskmanager at 
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>> flink-taskmanager at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> We are using Flink 1.8 and externalized checkpoint. The S3 bucket for 
>>> externalized checkpoint ha