Hi Sidhartha,

Your explanation is correct.
If you stopped the job with a savepoint and then you try to restore
from that savepoint, then Flink will try to restore its state
which is, of course, included in its old bucket.

But new data will go to the new bucket.

One solution is either to restart your job from scratch, if you do not
care about your "old" state.

Cheers,
Kostas

On Thu, Sep 5, 2019 at 1:27 PM Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi,
>
> Kostas (in CC) might be able to help.
>
> Best, Fabian
>
> Am Mi., 4. Sept. 2019 um 22:59 Uhr schrieb sidhartha saurav 
> <sidsau...@gmail.com>:
>>
>> Hi,
>>
>> Can someone suggest a workaround so that we do not get this issue while 
>> changing the S3 bucket ?
>>
>> On Thu, Aug 22, 2019 at 4:24 PM sidhartha saurav <sidsau...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> We are trying to change our StreamingFileSink S3 bucket, say from 
>>> s3://eu1/output_old to s3://eu2/output_new. When we do so we get an 
>>> exception and the taskmanger goes into a restart loop.
>>>
>>> We suspect that it tries to restore state and gets the bucketid from saved 
>>> state [<Buckets.java> final BucketID bucketId = 
>>> recoveredState.getBucketId()]. Flink then tries to read output_old from eu2 
>>> and gets an AccessDeniedError. Rightly so as it has permission for 
>>> s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is 
>>> Flink trying to access the old bucket and how to avoid this exception.
>>>
>>> Logs:
>>>
>>> > "S3Committer.java","line":"87","message":"Failed to commit after recovery 
>>> > output_old/2019-08-22/18/part-3-40134 with MPU ID 
>>> > 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7.
>>> >  Checking if file was committed before...",
>>>
>>> > "Task.java","line":"910","message":"... switched from RUNNING to FAILED."
>>>
>>> > java.nio.file.AccessDeniedException: 
>>> > output_old/2019-08-22/18/part-3-40134: getObjectMetadata on 
>>> > output_old/2019-08-22/18/part-3-40134: 
>>> > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>> >  Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 
>>> > Forbidden; Request ID: 79F1AEE53131FB66; S3 Extended Request ID: 
>>> > 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=),
>>> >  S3 Extended Request ID: 
>>> > 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403
>>> >  Forbidden
>>>
>>> flink-taskmanager at 
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
>>> flink-taskmanager at 
>>> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126)
>>> flink-taskmanager at 
>>> org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:128)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>> flink-taskmanager at 
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>> flink-taskmanager at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> We are using Flink 1.8 and externalized checkpoint. The S3 bucket for 
>>> externalized checkpoint have not been modified.
>>>
>>> Thanks
>>> Sidhartha

Reply via email to