[jira] [Created] (FLINK-35640) Drop Flink 1.15 support for the operator
Mate Czagany created FLINK-35640: Summary: Drop Flink 1.15 support for the operator Key: FLINK-35640 URL: https://issues.apache.org/jira/browse/FLINK-35640 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Mate Czagany Fix For: kubernetes-operator-1.10.0 As the operator only supports the latest 4 stable minor Flink versions, 1.15 support should be dropped. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35493) Make max history age and count configurable for FlinkStateSnapshot resources
Mate Czagany created FLINK-35493: Summary: Make max history age and count configurable for FlinkStateSnapshot resources Key: FLINK-35493 URL: https://issues.apache.org/jira/browse/FLINK-35493 Project: Flink Issue Type: Sub-task Components: Kubernetes Operator Reporter: Mate Czagany -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35492) Add metrics for FlinkStateSnapshot resources
Mate Czagany created FLINK-35492: Summary: Add metrics for FlinkStateSnapshot resources Key: FLINK-35492 URL: https://issues.apache.org/jira/browse/FLINK-35492 Project: Flink Issue Type: Sub-task Components: Kubernetes Operator Reporter: Mate Czagany -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35267) Create documentation for FlinkStateSnapshot CR
Mate Czagany created FLINK-35267: Summary: Create documentation for FlinkStateSnapshot CR Key: FLINK-35267 URL: https://issues.apache.org/jira/browse/FLINK-35267 Project: Flink Issue Type: Sub-task Components: Kubernetes Operator Reporter: Mate Czagany Fix For: kubernetes-operator-1.9.0 This should cover the new features and migration from the now deprecated methods of taking snapshots. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35266) Add e2e tests for FlinkStateSnapshot CRs
Mate Czagany created FLINK-35266: Summary: Add e2e tests for FlinkStateSnapshot CRs Key: FLINK-35266 URL: https://issues.apache.org/jira/browse/FLINK-35266 Project: Flink Issue Type: Sub-task Components: Kubernetes Operator Reporter: Mate Czagany Fix For: kubernetes-operator-1.9.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35265) Implement FlinkStateSnapshot custom resource
Mate Czagany created FLINK-35265: Summary: Implement FlinkStateSnapshot custom resource Key: FLINK-35265 URL: https://issues.apache.org/jira/browse/FLINK-35265 Project: Flink Issue Type: Sub-task Components: Kubernetes Operator Reporter: Mate Czagany Fix For: kubernetes-operator-1.9.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35263) FLIP-446: Kubernetes Operator State Snapshot CRD
Mate Czagany created FLINK-35263: Summary: FLIP-446: Kubernetes Operator State Snapshot CRD Key: FLINK-35263 URL: https://issues.apache.org/jira/browse/FLINK-35263 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Reporter: Mate Czagany Fix For: kubernetes-operator-1.9.0 Described in [https://cwiki.apache.org/confluence/display/FLINK/FLIP-446%3A+Kubernetes+Operator+State+Snapshot+CRD] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35106) Kubernetes Operator ignores checkpoint type configuration
Mate Czagany created FLINK-35106: Summary: Kubernetes Operator ignores checkpoint type configuration Key: FLINK-35106 URL: https://issues.apache.org/jira/browse/FLINK-35106 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.8.0 Reporter: Mate Czagany There is a configuration for checkpoint type that will be taken if perioid checkpointing is enabled or a manual checkpoint is triggered. However, the configuration value `kubernetes.operator.checkpoint.type` is completely ignored when any checkpoint is triggered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34439) Move chown operations to COPY commands in Dockerfile
Mate Czagany created FLINK-34439: Summary: Move chown operations to COPY commands in Dockerfile Key: FLINK-34439 URL: https://issues.apache.org/jira/browse/FLINK-34439 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Mate Czagany We can lower the size of the output operator container image if we don't run 'chown' commands in seperate RUN commands inside the Dockerfile, but instead use the '--chown' argument of the COPY command. Using 'RUN chown...' will copy all the files affected with their whole size to a new layer, duplicating the previous files from the COPY command. Example: {code:java} $ docker image history ghcr.io/apache/flink-kubernetes-operator:ccb10b8 ... 3 months ago RUN /bin/sh -c chown -R flink:flink $FLINK... 116MB buildkit.dockerfile.v0 ... {code} This would mean a 20% reduction in image size. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34438) Kubernetes Operator doesn't wait for TaskManager deletion in native mode
Mate Czagany created FLINK-34438: Summary: Kubernetes Operator doesn't wait for TaskManager deletion in native mode Key: FLINK-34438 URL: https://issues.apache.org/jira/browse/FLINK-34438 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.6.1, kubernetes-operator-1.7.0, kubernetes-operator-1.8.0 Reporter: Mate Czagany This issue was partly fixed in FLINK-32334 but native mode was not included in the fix. I don't see any downsides with adding the same check to native deployment mode, which would make sure that all TaskManagers were deleted when we shut down a Flink cluster. There should also be some logs suggesting that the timeout was exceeded instead of silently returning when waiting for the cluster to shut down. An issue was also mentioned on the mailing list which seems to be related to this: [https://lists.apache.org/thread/4gwj4ob4n9zg7b90vnqohj8x1p0bb5cb] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32652) Operator cannot scale standalone deployments in reactive mode
Mate Czagany created FLINK-32652: Summary: Operator cannot scale standalone deployments in reactive mode Key: FLINK-32652 URL: https://issues.apache.org/jira/browse/FLINK-32652 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.6.0 Reporter: Mate Czagany After we upgraded the Fabric8 Kubernetes Client to 6.7.0 the operator can no longer scale standalone deployments in reactive mode because it uses the "deployments/scale" API instead of patching the deployment since this commit: [https://github.com/fabric8io/kubernetes-client/commit/c4d3dd14c6ba7261fe4646636d277cba1c2122a2] We will get the following error: {code:java} org.apache.flink.kubernetes.operator.exception.ReconciliationException: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1:443/apis/apps/v1/namespaces/flink/deployments/basic-reactive-example-taskmanager/scale. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. deployments.apps "basic-reactive-example-taskmanager" is forbidden: User "system:serviceaccount:flink:flink-operator" cannot get resource "deployments/scale" in API group "apps" in the namespace "flink". {code} The fix is easy, we just need to add "deployments/scale" to the ClusterRole we create, I'll create a PR soon -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31613) Some default operator config values are overwritten by values.yaml
Mate Czagany created FLINK-31613: Summary: Some default operator config values are overwritten by values.yaml Key: FLINK-31613 URL: https://issues.apache.org/jira/browse/FLINK-31613 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0 Reporter: Mate Czagany It's a bit confusing that in the documentation it's stated that 'kubernetes.operator.reconcile.interval' is 1 min by default and 'kubernetes.operator.observer.progress-check.interval' is 10 sec when they are being overwritten to 15 sec and 5 sec respectively in the default values.yaml. A possible solution might be to change the default values to 15 and 5 sec in the configuration values and remove/comment them in values.yaml, however this will introduce a change in configuration for users that have set a custom 'defaultConfiguration.flink-conf.yaml' value. Please let me know what you think and if the solution sounds good feel free to assign me this ticket and I'll create a PR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31326) Disabled source scaling breaks downstream scaling if source busyTimeMsPerSecond is 0
Mate Czagany created FLINK-31326: Summary: Disabled source scaling breaks downstream scaling if source busyTimeMsPerSecond is 0 Key: FLINK-31326 URL: https://issues.apache.org/jira/browse/FLINK-31326 Project: Flink Issue Type: Bug Components: Autoscaler, Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0 Reporter: Mate Czagany In case of 'scaling.sources.enabled'='false' the 'TARGET_DATA_RATE' of the source vertex will be calculated as '(1000 / busyTimeMsPerSecond) * numRecordsOutPerSecond' which currently on the main branch results in an infinite value if 'busyTimeMsPerSecond' is 0. This will also affect downstream operators. I'm not that familiar with the autoscaler code, but it's in my opinion it's quite unexpected to have such behavioral changes by setting 'scaling.sources.enabled' to false. With PR #543 for FLINK-30575 (https://github.com/apache/flink-kubernetes-operator/pull/543) scaling will happen even with 'busyTimeMsPerSecond' being 0, but it will result in unreasonably high parallelism numbers for downstream operators because 'TARGET_DATA_RATE' will be very high where 0 'busyTimeMsPerSecond' will be replaced with 1e-10. Metrics from the operator logs (source=e5a72f353fc1e6bbf3bd96a41384998c, sink=51312116a3e504bccb3874fc80d5055e) 'scaling.sources.enabled'='true': {code:java} jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.PARALLELISM.Current: 1.0 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.MAX_PARALLELISM.Current: 1.0 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Current: NaN jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Average: NaN jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.CATCH_UP_DATA_RATE.Current: 0.0 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_UP_RATE_THRESHOLD.Current: 5.0 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_DOWN_RATE_THRESHOLD.Current: 10.0 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Current: 2.0 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Average: 2.0 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Current: Infinity jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Average: NaN jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Current: 3.8667 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Average: 3.8833 jobVertexID.51312116a3e504bccb3874fc80d5055e.PARALLELISM.Current: 4.0 jobVertexID.51312116a3e504bccb3874fc80d5055e.MAX_PARALLELISM.Current: 12.0 jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Current: 4.827299209321681 jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Average: 4.848351269098938 jobVertexID.51312116a3e504bccb3874fc80d5055e.CATCH_UP_DATA_RATE.Current: 0.0 jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_UP_RATE_THRESHOLD.Current: 10.0 jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_DOWN_RATE_THRESHOLD.Current: 21.0 jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Current: 7.733 jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Average: 7.767{code} 'scaling.sources.enabled'='false': {code:java} jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.PARALLELISM.Current: 1.0 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.MAX_PARALLELISM.Current: 1.0 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Current: NaN jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Average: NaN jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.CATCH_UP_DATA_RATE.Current: 0.0 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_UP_RATE_THRESHOLD.Current: NaN jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_DOWN_RATE_THRESHOLD.Current: NaN jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Current: 2.0 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Average: 2.0 jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Current: Infinity jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Average: NaN jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Current: Infinity jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Average: NaN jobVertexID.51312116a3e504bccb3874fc80d5055e.PARALLELISM.Current: 4.0 jobVertexID.51312116a3e504bccb3874fc80d5055e.MAX_PARALLELISM.Current: 12.0 jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Current: 5.0 jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Average: 4.9805556 jobVertexID.51312116a3e504bccb3874fc80d5055e.CATCH_UP_DATA_RATE.Current: 0.0 jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_UP_RATE_THRESHOLD.Current: NaN jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_DOWN_RATE_THRESHOLD.Current: NaN jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Current: Infinity
[jira] [Created] (FLINK-31187) Standalone HA mode does not work if dynamic properties are supplied
Mate Czagany created FLINK-31187: Summary: Standalone HA mode does not work if dynamic properties are supplied Key: FLINK-31187 URL: https://issues.apache.org/jira/browse/FLINK-31187 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.4.0 Reporter: Mate Czagany Attachments: standalone-ha.yaml With FLINK-30518 '--host $(POD_IP)' has been added to the arguments of the JMs which fixes the issue with HA on standalone mode, but it always gets appended to the end of the final JM arguments: https://github.com/usamj/flink-kubernetes-operator/blob/72ec9d384def3091ce50c2a3e2a06cded3b572e6/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java#L107 But this will not be parsed properly in case any dynamic properties were set in the arguments, e.g.: {code:java} Program Arguments: --configDir /opt/flink/conf -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=469762048b -D jobmanager.memory.jvm-overhead.max=201326592b --job-classname org.apache.flink.streaming.examples.statemachine.StateMachineExample --test test --host 172.17.0.11{code} You can verify this bug by using the YAML I've attached and in the JM logs you can see this line: {code:java} Remoting started; listening on addresses :[akka.tcp://flink@flink-example-statemachine.flink:6123]{code} Without any program arguments supplied this would correctly be: {code:java} Remoting started; listening on addresses :[akka.tcp://flink@172.17.0.8:6123]{code} I believe this could be easily fixed by appending the --host parameter before JobSpec.args and if a committer can assign this ticket to me I can create a PR for this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30899) FileSystemTableSource with CSV format incorrectly selects fields if filtering for partition
Mate Czagany created FLINK-30899: Summary: FileSystemTableSource with CSV format incorrectly selects fields if filtering for partition Key: FLINK-30899 URL: https://issues.apache.org/jira/browse/FLINK-30899 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.17.0 Reporter: Mate Czagany In my testing it only affected csv and testcsv formats. I think it's caused by `FileSystemTableSource` calling `DeserializationFormatFactory#createRuntimeDecoder` with wrong `physicalDataType`. The files won't contain the partitioned field values, but in case of a projection pushdown (which can happen during planning phase if we filter the partition field by a constant value) the final `physicalDataType` passed to the deserializer by `FileSystemTableSource` will contain the partitioned fields as well. As described in `DecodingFormat`, every field in the `physicalDataType` parameter will have to be present in the serialized record. Example: {code:java} CREATE TABLE test_table ( f0 INT, f1 INT, f2 INT, f3 INT ) PARTITIONED BY (f0,f1) WITH ( 'connector' = 'filesystem', 'path' = 'file:///path/to/whatever', 'format' = 'csv' ) SELECT * FROM test_table WHERE f0 = 1; -- should be 1,4,7,10 +-+-+-+-+ | f0 | f1 | f2 | f3 | +-+-+-+-+ | 1 | 4 | 10 | 0 | +-+-+-+-+ SELECT * FROM test_table; +-+-+-+-+ | f0 | f1 | f2 | f3 | +-+-+-+-+ | 2 | 5 | 8 | 11 | | 1 | 4 | 7 | 10 | | 3 | 6 | 9 | 12 | +-+-+-+-+ SELECT * FROM test_table WHERE f0>0; +-+-+-+-+ | f0 | f1 | f2 | f3 | +-+-+-+-+ | 1 | 4 | 7 | 10 | | 3 | 6 | 9 | 12 | | 2 | 5 | 8 | 11 | +-+-+-+-+ SELECT * FROM test_table WHERE f0 = 1 AND f1 = 4; ... Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 4 at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:49) at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:27) at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:101) at org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:92) at org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:42) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ... {code} At [https://github.com/apache/flink/blob/b1e70aebd3e248d68cf41a43db385ec9c9b6235a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java#L147] the `physicalRowDataType` will contain the partition fields as well, but `partitionKeysToExtract` will not contain it since `producedDataType` has been modified in the `applyProjection` method, so it will result in an empty projection. Then on line 154 we construct the final `physicalDataType`, but since `partitionKeysProjections` is empty, it will result with the same value as `physicalDataType` which contains the partition fields too. By changing {code:java} final Projection partitionKeysProjections = Projection.fromFieldNames(physicalDataType, partitionKeysToExtract);{code} to {code:java} final Projection partitionKeysProjections = Projection.fromFieldNames(physicalDataType, partitionKeys);{code} the issue can be solved. I have verified this solution with 1 and 2 partition keys, with and without metadata columns, with and without virtual columns. But I still need to test this change with other formats. If this solution seems correct and a committer could assign me to the JIRA I can start working on it -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30812) YARN with S3 resource storage fails for Hadoop 3.3.2
Mate Czagany created FLINK-30812: Summary: YARN with S3 resource storage fails for Hadoop 3.3.2 Key: FLINK-30812 URL: https://issues.apache.org/jira/browse/FLINK-30812 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.16.0 Reporter: Mate Czagany In HADOOP-17139 S3AFileSystem#copyFromLocalFile was refactored and expects the local source Hadoop Path object to have a scheme specified which the YarnClusterDescriptor uploading the local files won't have. When uploading files to S3 CopyFromLocalOperation#getFinalPath compares the passed source Hadoop Path with the file it found(which will have file:// scheme) using URI.relativize but it will fail because of the scheme difference and throw PathIOException as can be seen in this exception: {code:java} org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn Application Cluster at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:478) ~[flink-yarn-1.16.0.jar!/:1.16.0] .. Caused by: org.apache.hadoop.fs.PathIOException: `Cannot get relative path for URI:file:///tmp/application_1674531932229_0030-flink-conf.yaml587547081521530798.tmp': Input/output error at org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$25(S3AFileSystem.java:3920) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499) ~[hadoop-common-3.3.3.jar!/:?] at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444) ~[hadoop-common-3.3.3.jar!/:?] at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3913) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397) ~[flink-yarn-1.16.0.jar!/:1.16.0] at org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202) ~[flink-yarn-1.16.0.jar!/:1.16.0] at org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181) ~[flink-yarn-1.16.0.jar!/:1.16.0] at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1047) ~[flink-yarn-1.16.0.jar!/:1.16.0] at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:623) ~[flink-yarn-1.16.0.jar!/:1.16.0] at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:471) ~[flink-yarn-1.16.0.jar!/:1.16.0] ... 35 more {code} The possibly easiest solution would be to somehow add the file:// scheme in YarnApplicationFileUploader#copyToRemoteApplicationDir The other solution would be to change all calls uploading local files to use "new Path(file.toURI())" instead of "new Path(file.getAbsolutePath())" but it might not be as future-proof as the other solution Email thread: [https://lists.apache.org/thread/oo5rlyo3jr7kds2y6wwnfo1yhnk0fx4c] If a committer can assign this ticket to me I can start working on this -- This message was sent by Atlassian Jira (v8.20.10#820010)