Hello,

Thanks for the research! Good to know the cause.

Greetings,
Frank

On 03.02.22 17:18, Dawid Wysakowicz wrote:
I looked into the code again and unfortunately I have bad news :( Indeed we treat S3 as if it always injects entropy. Even if the entropy key is not specified, which effectively means it is disabled. I created a JIRA ticket[1] to fix it.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-25952

On 03/02/2022 17:02, Frank Dekervel wrote:
Hello,

I didn't know about entropy injection. I have checked, and there is no entropy injection configured in my flink-conf.yaml. This is the relevant section:

s3.access-key: ???
s3.endpoint: http://minio/
s3.path.style.access: true
s3.secret-key: ???

I see that there are still S3 paths defined in the _metadata

kervel@atlas:~/Downloads/savepoint-299415-266c01ff6b2a$ cat _metadata  | strings | grep s3 Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/a6d59334-2769-4a6e-b582-d38d58352021 Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/f627c959-d69d-41a1-9732-748795efb9ad Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/f9a03af4-2868-4797-a950-10257282ed1e
...

not all paths are existing

kervel@atlas:~/Downloads/savepoint-299415-266c01ff6b2a$ l
b81e4e28-eabd-499e-9561-b98137084a9c  _metadata

Thanks!

Greetings,
Frank

On 03.02.22 16:38, Dawid Wysakowicz wrote:

Hi Frank.

Do you use entropy injection by chance? I am afraid savepoints are not relocatable in combination with entropy injection as described here[1].

Best,

Dawid

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints

On 03/02/2022 14:44, Frank Dekervel wrote:
Hello,

I'm trying to inspect a savepoint that was stored on s3://flink/ (on a minio server), i downloaded it to my laptop for inspection. I have two KeyedProcessFunctions (state in the same savepoint) and strangely enough, one works perfectly and the other one doesn't.

The code is fairly simple:

val savepoint= Savepoint.load(bEnv.getJavaEnv, path, newHashMapStateBackend()) ;
import org.apache.flink.api.scala._

// first one

val ti= createTypeInformation[AlarmMessageKey]
val tia= createTypeInformation[AlmState]
val ds= savepoint.readKeyedState("alm-1", newAlmStateReader(), ti, tia)
val valss= ds.collect().asScala;

// now the second one:
val savepoint2= Savepoint.load(bEnv.getJavaEnv, path, newHashMapStateBackend()) ; val ds_sup= savepoint.readKeyedState("ags-1", newSupStateReader()); // here we ser/deser in kryo not scala case class serializer. No idea why, but that's how its in the savepoint
val vals_sup= ds_sup.collect().asScala;

The second one seems to fail because it wants to access the savepoint on the original path on S3 (which my laptop doesn't have access to). I tought savepoints were supposed to be relocatable. Weirdly enough, the first one works just fine.

This is the exception i get:

[error] Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 79QK1G93VPVPED3H; S3 Extended Request ID: U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=; Proxy: null), S3 Extended Request ID: U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g= [error]         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819) [error]         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403) [error]         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372) [error]         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) [error]         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) [error]         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) [error]         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) [error]         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) [error]         at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) [error]         at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) [error]         at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) [error]         at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259) [error]         at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206) [error]         at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1512) [error]         at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$2(PrestoS3FileSystem.java:1096) [error]         at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139) [error]         at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1093) [error]         at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1078) [error]         at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:1071) [error]         at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$1(PrestoS3FileSystem.java:1015) [error]         at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139) [error]         at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:1014) [error]         at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) [error]         at java.io.BufferedInputStream.read(BufferedInputStream.java:265) [error]         at java.io.FilterInputStream.read(FilterInputStream.java:83) [error]         at org.apache.flink.fs.s3presto.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86) [error]         at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50) [error]         at java.io.DataInputStream.readInt(DataInputStream.java:387) [error]         at org.apache.flink.core.io.VersionedIOReadableWritable.read(VersionedIOReadableWritable.java:46) [error]         at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:139) [error]         at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.readMetaData(FullSnapshotRestoreOperation.java:194) [error]         at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.restoreKeyGroupsInStateHandle(FullSnapshotRestoreOperation.java:171) [error]         at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.access$100(FullSnapshotRestoreOperation.java:113) [error]         at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation$1.next(FullSnapshotRestoreOperation.java:158) [error]         at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation$1.next(FullSnapshotRestoreOperation.java:140) [error]         at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:115) [error]         at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57) [error]         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:174) [error]         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) [error]         at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) [error]         at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) [error]         at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) [error]         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) [error]         at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) [error]         at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) [error]         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) [error]         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) [error]         at org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:187) [error]         at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:155) [error]         at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64) [error]         at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183) [error]         at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [error]         at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [error]         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [error]         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
[error]         at java.lang.Thread.run(Thread.java:748)

Anybody knows what i'm doing wrong ?

Thanks!
Frank





Reply via email to