Re: Kubernetes Operator resource limits and requests

2023-02-08 Thread Máté Czagány
Hi,

The values you referenced are for the request fields, for the limit fields
you can use these configuration values:
- kubernetes.jobmanager.cpu.limit-factor
- kubernetes.jobmanager.memory.limit-factor
- kubernetes.taskmanager.cpu.limit-factor
- kubernetes.taskmanager.memory.limit-factor

Please see this page for more info:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#kubernetes


Regards,
Mate

Lars Skjærven  ezt írta (időpont: 2023. febr. 8., Sze,
10:31):

> Hello,
>
> How can we define *limit* and *request* for the kubernetes pods as
> described here:
>
> https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits
>
> Looks like we can only set one value for CPU and memory:
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#taskmanagerspec
>
> limits and request seems to be supported in the Spotify version of the
> operator:
>
> https://github.com/spotify/flink-on-k8s-operator/blob/master/docs/crd.md#taskmanagerspec
>
> Kind regards,
> Lars
>


Re: Flink application mode/S3A Exception after upgrading from to Flink 1.16.0

2023-01-27 Thread Máté Czagány
Hi Leon,

It seems to me like S3 cannot be used as YARN resource storage starting
with Hadoop 3.3.2

In HADOOP-17139 S3AFileSystem#copyFromLocalFile was refactored and expects
the local source Hadoop Path object to have a scheme specified which the
YarnClusterDescriptor uploading the local files won't have.
https://github.com/apache/flink/blob/release-1.16.0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1049

CopyFromLocalOperation#uploadSourceFromFS will collect all the files in the
source path passed to S3AFileSystem#copyFromLocalFile. If a single file was
specified, it will collect that single file, but with file:// scheme.
Then in CopyFromLocalOperation#getFinalPath it will call
source.toURI().relativize(currentSrcUri) with the path we supplied and the
single file it found, but it will return with currentSrcUri and throw
PathIOException because the file schemes don't match.

I think this could be fixed by using new Path(tmpConfigurationFile.toURI())
instead of new Path(tmpConfigurationFile.getAbsolutePath()) in
YarnClusterDescriptor and doing the same for all the other file uploads as
well. Then the scheme would be present in the Path object.
But it might also be considered as a bug in hadoop-aws for failing when
used with an URI without scheme specified.

You can also reproduce this issue easily:

File file = File.createTempFile("flink-conf.yaml", null);
org.apache.hadoop.fs.Path source = new
org.apache.hadoop.fs.Path(file.getAbsolutePath());
org.apache.hadoop.fs.Path source2 = new
org.apache.hadoop.fs.Path(file.toURI());

s3FileSystem.copyFromLocalFile(source, s3FileSystem.getHomeDirectory());
 // will fail
s3FileSystem.copyFromLocalFile(source2, s3FileSystem.getHomeDirectory());
 // works fine

I don't have a JIRA account yet, but once I do and no one has any
objections I'll create a bug ticket and try to resolve this.

Best regards,
Mate

Leon Xu  ezt írta (időpont: 2023. jan. 27., P,
8:21):

> Hi Flink Users,
>
> We are trying to upgrade Flink from 1.12.7 to 1.16.0. But we run into the
> following issue:
> We are running Flink job through application mode. After the upgrade, when
> we submit the job and now it gets this exception:
>
> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn Application Cluster at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:478)
> ~[flink-yarn-1.16.0.jar!/:1.16.0]*
> *..* Caused by: org.apache.hadoop.fs.PathIOException: `Cannot
> get relative path for
> URI:file:///tmp/application_1674531932229_0030-flink-conf.yaml587547081521530798.tmp':
> Input/output error at
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$25(S3AFileSystem.java:3920)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> ~[hadoop-common-3.3.3.jar!/:?] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> ~[hadoop-common-3.3.3.jar!/:?] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3913)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
> org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202)
> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
> org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181)
> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1047)
> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:623)
> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:471)
> ~[flink-yarn-1.16.0.jar!/:1.16.0] ... 35 more
>
> Looks like it failed to upload the temp flink conf file onto S3. In Flink
> 1.12.7 we don't have this issue. I am wondering if we can get some help
> here.