Re: Which flink version is compatible with beam

2023-01-27 Thread P Singh
Hi,

It’s not working with flink 1.14 and beam 2.44 or 2.43 with python 3.10.

Please suggest.

Get Outlook for iOS

From: Yaroslav Tkachenko 
Sent: Friday, January 27, 2023 10:53:49 PM
To: P Singh 
Cc: user@flink.apache.org 
Subject: Re: Which flink version is compatible with beam

Hi! According to this 
https://beam.apache.org/documentation/runners/flink/#flink-version-compatibility,
 1.14 is the latest supported version.

On Fri, Jan 27, 2023 at 9:19 AM P Singh 
mailto:bigdatadevelop...@gmail.com>> wrote:
Hi Team,

I am trying to run apache beam pipeline on flink cluster. I have set up 
kubernetes locally with flink1.16and apache/beam_python3.10_sdk:2.44.0. When I 
submit the job using like

python file.py

Job is just hang not able to see on flink UI or logs.

Can you please suggest compatible versions?


Looking forward to hearing from you.



Rescale Flink - Adjust Parallelism based on TPS

2023-01-27 Thread Madan D via user
Hello Team,
I would like to understand if there's a way flink can rescale automatically by 
adjusting parallelism based on incoming traffic.
 For example, when a job starts incoming TPS is around 100 which can be 
processed with parallelism 4, and when TPS increased 1000 I might need to 
rescale flink to use parallelism 10 or 20.
Is there a way we can set min and max parallelism to flink application which 
can adjust automatically based on incoming load with stop and starting job with 
new config.
I assume Kubernetes automatically does by changing replicas but all our 
applications are running on yarn as of today. 
Regards,Madan

Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-27 Thread Gyula Fóra
We never encountered this problem before but also we don't configure those
settings.
Can you simply try:

high-availability: kubernetes

And remove the other configs? I think that can only cause problems and
should not achieve anything :)

Gyula

On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
user@flink.apache.org> wrote:

> Hi everyone,
>
> I've been experimenting with Kubernetes HA and the Kubernetes Operator and
> ran into the following issue which is happening regularly on TaskManagers
> with Flink 1.16:
>
> Error while retrieving the leader gateway. Retrying to connect to 
> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>
> (The whole stacktrace is quite long, I put it in a Github Gist here
> . Note
> that I put placeholder values for the Kubernetes Service name and the
> Namespace name)
>
> The job configuration has the following values which should be relevant:
> high-availability: kubernetes
> high-availability.jobmanager.port: 6123
> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
> jobmanager.rpc.port: 6123
>
> Looking a bit more into the logs, I can see that the Akka Actor System is
> started with an external address pointing to the Kubernetes Service defined
> by jobmanager.rpc.address:
> Trying to start actor system, external
> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address 0.0.0.0:6123.
> Actor system started at akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
> :6123
>
> (I believe the external address for the Akka Actor System is set to
> jobmanager.rpc.address from this place
> 
> in the code but I might be wrong)
>
> I can also see these logs for the Dispatcher RPC endpoint:
> Starting RPC endpoint for
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
> akka://flink/user/rpc/dispatcher_1 .
> Successfully wrote leader information
> LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
> leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
> for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.
>
> I confirmed that the HA ConfigMap contains an address which also uses the
> Kubernetes Service defined by jobmanager.rpc.address:
> $ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r
> '.data["org.apache.flink.k8s.leader.dispatcher"]'
>
> ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
> :6123/user/rpc/dispatcher_1
>
> When looking at the code of the Operator and Flink itself, I can see
> that jobmanager.rpc.address is set automatically by the
> InternalServiceDecorator
> 
>  and
> it points to the Kubernetes Service.
> However, the comment
> 
> above clearly says that "only the non-HA scenario relies on this Service
> for internal communication, since in the HA mode, the TaskManager(s)
> directly connects to the JobManager via IP address." According to the docs
> ,
> jobmanager.rpc.address "is ignored on setups with high-availability where
> the leader election mechanism is used to discover this automatically."
>
> This is not what I'm observing as it seems that despite enabling HA, the
> TaskManagers don't use IP addresses but still use this Kubernetes Service
> for JM communication.
>
> Moreover, I've used the Lyft Kubernetes Operator before and it has these
> interesting lines in the code:
> https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L212-L216
> It explicitly sets jobmanager.rpc.address to the host IPs.
>
> Am I misconfiguring or misunderstanding something? Is there any way to fix
> these errors?
>
> Thanks!
> Anton
>


Re: Flink application mode/S3A Exception after upgrading from to Flink 1.16.0

2023-01-27 Thread Leon Xu
Thank you Gabor. I will test with a downgraded version to see how it goes.


On Fri, Jan 27, 2023 at 11:51 AM Gabor Somogyi 
wrote:

> The min supported version was 2.8.5 but in 1.17 it's gonna be 2.10.1 so
> one can downgrade.
>
> G
>
> On Fri, Jan 27, 2023, 20:42 Leon Xu  wrote:
>
>> Thank you Mate.
>> Yeah this looks like the root cause. A follow-up question, do you know if
>> Flink 1.16 will have a hard dependency on Hadoop 3.3.x? or can we
>> actually use a older version of hadoop to work around this issue.
>>
>> Thanks
>> Leon
>>
>> On Fri, Jan 27, 2023 at 10:28 AM Máté Czagány  wrote:
>>
>>> Hi Leon,
>>>
>>> It seems to me like S3 cannot be used as YARN resource storage starting
>>> with Hadoop 3.3.2
>>>
>>> In HADOOP-17139 S3AFileSystem#copyFromLocalFile was refactored and
>>> expects the local source Hadoop Path object to have a scheme specified
>>> which the YarnClusterDescriptor uploading the local files won't have.
>>>
>>> https://github.com/apache/flink/blob/release-1.16.0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1049
>>>
>>> CopyFromLocalOperation#uploadSourceFromFS will collect all the files in
>>> the source path passed to S3AFileSystem#copyFromLocalFile. If a single file
>>> was specified, it will collect that single file, but with file:// scheme.
>>> Then in CopyFromLocalOperation#getFinalPath it will call
>>> source.toURI().relativize(currentSrcUri) with the path we supplied and the
>>> single file it found, but it will return with currentSrcUri and throw
>>> PathIOException because the file schemes don't match.
>>>
>>> I think this could be fixed by using new
>>> Path(tmpConfigurationFile.toURI()) instead of new
>>> Path(tmpConfigurationFile.getAbsolutePath()) in YarnClusterDescriptor and
>>> doing the same for all the other file uploads as well. Then the scheme
>>> would be present in the Path object.
>>> But it might also be considered as a bug in hadoop-aws for failing when
>>> used with an URI without scheme specified.
>>>
>>> You can also reproduce this issue easily:
>>>
>>> File file = File.createTempFile("flink-conf.yaml", null);
>>> org.apache.hadoop.fs.Path source = new
>>> org.apache.hadoop.fs.Path(file.getAbsolutePath());
>>> org.apache.hadoop.fs.Path source2 = new
>>> org.apache.hadoop.fs.Path(file.toURI());
>>>
>>> s3FileSystem.copyFromLocalFile(source, s3FileSystem.getHomeDirectory());
>>>  // will fail
>>> s3FileSystem.copyFromLocalFile(source2,
>>> s3FileSystem.getHomeDirectory());  // works fine
>>>
>>> I don't have a JIRA account yet, but once I do and no one has any
>>> objections I'll create a bug ticket and try to resolve this.
>>>
>>> Best regards,
>>> Mate
>>>
>>> Leon Xu  ezt írta (időpont: 2023. jan. 27., P,
>>> 8:21):
>>>
 Hi Flink Users,

 We are trying to upgrade Flink from 1.12.7 to 1.16.0. But we run into
 the following issue:
 We are running Flink job through application mode. After the upgrade,
 when we submit the job and now it gets this exception:

 *org.apache.flink.client.deployment.ClusterDeploymentException:
 Couldn't deploy Yarn Application Cluster at
 org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:478)
 ~[flink-yarn-1.16.0.jar!/:1.16.0]*
 *..* Caused by: org.apache.hadoop.fs.PathIOException:
 `Cannot get relative path for
 URI:file:///tmp/application_1674531932229_0030-flink-conf.yaml587547081521530798.tmp':
 Input/output error at
 org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
 org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
 org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
 org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$25(S3AFileSystem.java:3920)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
 org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
 ~[hadoop-common-3.3.3.jar!/:?] at
 org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
 ~[hadoop-common-3.3.3.jar!/:?] at
 org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
 org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
 org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3913)
 ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
 org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
 

Re: Flink application mode/S3A Exception after upgrading from to Flink 1.16.0

2023-01-27 Thread Gabor Somogyi
The min supported version was 2.8.5 but in 1.17 it's gonna be 2.10.1 so one
can downgrade.

G

On Fri, Jan 27, 2023, 20:42 Leon Xu  wrote:

> Thank you Mate.
> Yeah this looks like the root cause. A follow-up question, do you know if
> Flink 1.16 will have a hard dependency on Hadoop 3.3.x? or can we
> actually use a older version of hadoop to work around this issue.
>
> Thanks
> Leon
>
> On Fri, Jan 27, 2023 at 10:28 AM Máté Czagány  wrote:
>
>> Hi Leon,
>>
>> It seems to me like S3 cannot be used as YARN resource storage starting
>> with Hadoop 3.3.2
>>
>> In HADOOP-17139 S3AFileSystem#copyFromLocalFile was refactored and
>> expects the local source Hadoop Path object to have a scheme specified
>> which the YarnClusterDescriptor uploading the local files won't have.
>>
>> https://github.com/apache/flink/blob/release-1.16.0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1049
>>
>> CopyFromLocalOperation#uploadSourceFromFS will collect all the files in
>> the source path passed to S3AFileSystem#copyFromLocalFile. If a single file
>> was specified, it will collect that single file, but with file:// scheme.
>> Then in CopyFromLocalOperation#getFinalPath it will call
>> source.toURI().relativize(currentSrcUri) with the path we supplied and the
>> single file it found, but it will return with currentSrcUri and throw
>> PathIOException because the file schemes don't match.
>>
>> I think this could be fixed by using new
>> Path(tmpConfigurationFile.toURI()) instead of new
>> Path(tmpConfigurationFile.getAbsolutePath()) in YarnClusterDescriptor and
>> doing the same for all the other file uploads as well. Then the scheme
>> would be present in the Path object.
>> But it might also be considered as a bug in hadoop-aws for failing when
>> used with an URI without scheme specified.
>>
>> You can also reproduce this issue easily:
>>
>> File file = File.createTempFile("flink-conf.yaml", null);
>> org.apache.hadoop.fs.Path source = new
>> org.apache.hadoop.fs.Path(file.getAbsolutePath());
>> org.apache.hadoop.fs.Path source2 = new
>> org.apache.hadoop.fs.Path(file.toURI());
>>
>> s3FileSystem.copyFromLocalFile(source, s3FileSystem.getHomeDirectory());
>>  // will fail
>> s3FileSystem.copyFromLocalFile(source2, s3FileSystem.getHomeDirectory());
>>  // works fine
>>
>> I don't have a JIRA account yet, but once I do and no one has any
>> objections I'll create a bug ticket and try to resolve this.
>>
>> Best regards,
>> Mate
>>
>> Leon Xu  ezt írta (időpont: 2023. jan. 27., P,
>> 8:21):
>>
>>> Hi Flink Users,
>>>
>>> We are trying to upgrade Flink from 1.12.7 to 1.16.0. But we run into
>>> the following issue:
>>> We are running Flink job through application mode. After the upgrade,
>>> when we submit the job and now it gets this exception:
>>>
>>> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
>>> deploy Yarn Application Cluster at
>>> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:478)
>>> ~[flink-yarn-1.16.0.jar!/:1.16.0]*
>>> *..* Caused by: org.apache.hadoop.fs.PathIOException:
>>> `Cannot get relative path for
>>> URI:file:///tmp/application_1674531932229_0030-flink-conf.yaml587547081521530798.tmp':
>>> Input/output error at
>>> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
>>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>>> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
>>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>>> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
>>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$25(S3AFileSystem.java:3920)
>>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>>> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
>>> ~[hadoop-common-3.3.3.jar!/:?] at
>>> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
>>> ~[hadoop-common-3.3.3.jar!/:?] at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
>>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
>>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3913)
>>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>>> org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
>>> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
>>> org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202)
>>> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
>>> 

Re: Flink application mode/S3A Exception after upgrading from to Flink 1.16.0

2023-01-27 Thread Leon Xu
Thank you Mate.
Yeah this looks like the root cause. A follow-up question, do you know if
Flink 1.16 will have a hard dependency on Hadoop 3.3.x? or can we
actually use a older version of hadoop to work around this issue.

Thanks
Leon

On Fri, Jan 27, 2023 at 10:28 AM Máté Czagány  wrote:

> Hi Leon,
>
> It seems to me like S3 cannot be used as YARN resource storage starting
> with Hadoop 3.3.2
>
> In HADOOP-17139 S3AFileSystem#copyFromLocalFile was refactored and expects
> the local source Hadoop Path object to have a scheme specified which the
> YarnClusterDescriptor uploading the local files won't have.
>
> https://github.com/apache/flink/blob/release-1.16.0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1049
>
> CopyFromLocalOperation#uploadSourceFromFS will collect all the files in
> the source path passed to S3AFileSystem#copyFromLocalFile. If a single file
> was specified, it will collect that single file, but with file:// scheme.
> Then in CopyFromLocalOperation#getFinalPath it will call
> source.toURI().relativize(currentSrcUri) with the path we supplied and the
> single file it found, but it will return with currentSrcUri and throw
> PathIOException because the file schemes don't match.
>
> I think this could be fixed by using new
> Path(tmpConfigurationFile.toURI()) instead of new
> Path(tmpConfigurationFile.getAbsolutePath()) in YarnClusterDescriptor and
> doing the same for all the other file uploads as well. Then the scheme
> would be present in the Path object.
> But it might also be considered as a bug in hadoop-aws for failing when
> used with an URI without scheme specified.
>
> You can also reproduce this issue easily:
>
> File file = File.createTempFile("flink-conf.yaml", null);
> org.apache.hadoop.fs.Path source = new
> org.apache.hadoop.fs.Path(file.getAbsolutePath());
> org.apache.hadoop.fs.Path source2 = new
> org.apache.hadoop.fs.Path(file.toURI());
>
> s3FileSystem.copyFromLocalFile(source, s3FileSystem.getHomeDirectory());
>  // will fail
> s3FileSystem.copyFromLocalFile(source2, s3FileSystem.getHomeDirectory());
>  // works fine
>
> I don't have a JIRA account yet, but once I do and no one has any
> objections I'll create a bug ticket and try to resolve this.
>
> Best regards,
> Mate
>
> Leon Xu  ezt írta (időpont: 2023. jan. 27., P,
> 8:21):
>
>> Hi Flink Users,
>>
>> We are trying to upgrade Flink from 1.12.7 to 1.16.0. But we run into the
>> following issue:
>> We are running Flink job through application mode. After the upgrade,
>> when we submit the job and now it gets this exception:
>>
>> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
>> deploy Yarn Application Cluster at
>> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:478)
>> ~[flink-yarn-1.16.0.jar!/:1.16.0]*
>> *..* Caused by: org.apache.hadoop.fs.PathIOException:
>> `Cannot get relative path for
>> URI:file:///tmp/application_1674531932229_0030-flink-conf.yaml587547081521530798.tmp':
>> Input/output error at
>> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$25(S3AFileSystem.java:3920)
>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
>> ~[hadoop-common-3.3.3.jar!/:?] at
>> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
>> ~[hadoop-common-3.3.3.jar!/:?] at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3913)
>> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
>> org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
>> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
>> org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202)
>> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
>> org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181)
>> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
>> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1047)
>> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
>> 

Re: Flink application mode/S3A Exception after upgrading from to Flink 1.16.0

2023-01-27 Thread Máté Czagány
Hi Leon,

It seems to me like S3 cannot be used as YARN resource storage starting
with Hadoop 3.3.2

In HADOOP-17139 S3AFileSystem#copyFromLocalFile was refactored and expects
the local source Hadoop Path object to have a scheme specified which the
YarnClusterDescriptor uploading the local files won't have.
https://github.com/apache/flink/blob/release-1.16.0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L1049

CopyFromLocalOperation#uploadSourceFromFS will collect all the files in the
source path passed to S3AFileSystem#copyFromLocalFile. If a single file was
specified, it will collect that single file, but with file:// scheme.
Then in CopyFromLocalOperation#getFinalPath it will call
source.toURI().relativize(currentSrcUri) with the path we supplied and the
single file it found, but it will return with currentSrcUri and throw
PathIOException because the file schemes don't match.

I think this could be fixed by using new Path(tmpConfigurationFile.toURI())
instead of new Path(tmpConfigurationFile.getAbsolutePath()) in
YarnClusterDescriptor and doing the same for all the other file uploads as
well. Then the scheme would be present in the Path object.
But it might also be considered as a bug in hadoop-aws for failing when
used with an URI without scheme specified.

You can also reproduce this issue easily:

File file = File.createTempFile("flink-conf.yaml", null);
org.apache.hadoop.fs.Path source = new
org.apache.hadoop.fs.Path(file.getAbsolutePath());
org.apache.hadoop.fs.Path source2 = new
org.apache.hadoop.fs.Path(file.toURI());

s3FileSystem.copyFromLocalFile(source, s3FileSystem.getHomeDirectory());
 // will fail
s3FileSystem.copyFromLocalFile(source2, s3FileSystem.getHomeDirectory());
 // works fine

I don't have a JIRA account yet, but once I do and no one has any
objections I'll create a bug ticket and try to resolve this.

Best regards,
Mate

Leon Xu  ezt írta (időpont: 2023. jan. 27., P,
8:21):

> Hi Flink Users,
>
> We are trying to upgrade Flink from 1.12.7 to 1.16.0. But we run into the
> following issue:
> We are running Flink job through application mode. After the upgrade, when
> we submit the job and now it gets this exception:
>
> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn Application Cluster at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:478)
> ~[flink-yarn-1.16.0.jar!/:1.16.0]*
> *..* Caused by: org.apache.hadoop.fs.PathIOException: `Cannot
> get relative path for
> URI:file:///tmp/application_1674531932229_0030-flink-conf.yaml587547081521530798.tmp':
> Input/output error at
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$25(S3AFileSystem.java:3920)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> ~[hadoop-common-3.3.3.jar!/:?] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> ~[hadoop-common-3.3.3.jar!/:?] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3913)
> ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0] at
> org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397)
> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
> org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202)
> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
> org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181)
> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1047)
> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:623)
> ~[flink-yarn-1.16.0.jar!/:1.16.0] at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:471)
> ~[flink-yarn-1.16.0.jar!/:1.16.0] ... 35 more
>
> Looks like it failed to upload the temp flink conf file onto S3. In Flink
> 1.12.7 we don't have this issue. I am wondering if we can get some help
> here.

Multiple Window Streams to same Kinesis Sink

2023-01-27 Thread Curtis Jensen
I'm trying to sink two Window Streams to the same Kinesis Sink.  When
I do this, no results are making it to the sink (code below).  If I
remove one of the windows from the Job, results do get published.
Adding another stream to the sink seems to void both.

How can I have results from both Window Streams go to the same sink?

Thanks


public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

ObjectMapper jsonParser = new ObjectMapper();

DataStream inputStream = createKinesisSource(env);
FlinkKinesisProducer kinesisSink = createKinesisSink();

WindowedStream oneMinStream = inputStream
.map(value -> jsonParser.readValue(value, JsonNode.class))
.keyBy(node -> node.get("accountId"))
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)));

oneMinStream
.aggregate(new LoginAggregator("k1m"))
.addSink(kinesisSink);

WindowedStream twoMinStream = inputStream
.map(value -> jsonParser.readValue(value, JsonNode.class))
.keyBy(node -> node.get("accountId"))
.window(TumblingProcessingTimeWindows.of(Time.minutes(2)));

twoMinStream
.aggregate(new LoginAggregator("k2m"))
.addSink(kinesisSink);

try {
env.execute("Flink Kinesis Streaming Sink Job");
} catch (Exception e) {
LOG.error("failed");
LOG.error(e.getLocalizedMessage());
LOG.error(e.getStackTrace().toString());

throw e;
}
}


private static DataStream
createKinesisSource(StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
new SimpleStringSchema(), inputProperties));
}

private static FlinkKinesisProducer createKinesisSink() {
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
outputProperties.setProperty("AggregationEnabled", "false");

FlinkKinesisProducer sink = new FlinkKinesisProducer<>(new
SimpleStringSchema(), outputProperties);
sink.setDefaultStream(outputStreamName);
sink.setDefaultPartition(UUID.randomUUID().toString());

return sink;
}


"Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-27 Thread Anton Ippolitov via user
Hi everyone,

I've been experimenting with Kubernetes HA and the Kubernetes Operator and
ran into the following issue which is happening regularly on TaskManagers
with Flink 1.16:

Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not
complete the operation. Number of retries has been exhausted.

(The whole stacktrace is quite long, I put it in a Github Gist here
. Note
that I put placeholder values for the Kubernetes Service name and the
Namespace name)

The job configuration has the following values which should be relevant:
high-availability: kubernetes
high-availability.jobmanager.port: 6123
jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
jobmanager.rpc.port: 6123

Looking a bit more into the logs, I can see that the Akka Actor System is
started with an external address pointing to the Kubernetes Service defined
by jobmanager.rpc.address:
Trying to start actor system, external
address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address 0.0.0.0:6123.
Actor system started at akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
:6123

(I believe the external address for the Akka Actor System is set to
jobmanager.rpc.address from this place

in the code but I might be wrong)

I can also see these logs for the Dispatcher RPC endpoint:
Starting RPC endpoint for
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/rpc/dispatcher_1 .
Successfully wrote leader information
LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.

I confirmed that the HA ConfigMap contains an address which also uses the
Kubernetes Service defined by jobmanager.rpc.address:
$ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r
'.data["org.apache.flink.k8s.leader.dispatcher"]'
ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
:6123/user/rpc/dispatcher_1

When looking at the code of the Operator and Flink itself, I can see
that jobmanager.rpc.address is set automatically by the
InternalServiceDecorator

and
it points to the Kubernetes Service.
However, the comment

above clearly says that "only the non-HA scenario relies on this Service
for internal communication, since in the HA mode, the TaskManager(s)
directly connects to the JobManager via IP address." According to the docs
,
jobmanager.rpc.address "is ignored on setups with high-availability where
the leader election mechanism is used to discover this automatically."

This is not what I'm observing as it seems that despite enabling HA, the
TaskManagers don't use IP addresses but still use this Kubernetes Service
for JM communication.

Moreover, I've used the Lyft Kubernetes Operator before and it has these
interesting lines in the code:
https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L212-L216
It explicitly sets jobmanager.rpc.address to the host IPs.

Am I misconfiguring or misunderstanding something? Is there any way to fix
these errors?

Thanks!
Anton


Re: Which flink version is compatible with beam

2023-01-27 Thread Yaroslav Tkachenko
Hi! According to this
https://beam.apache.org/documentation/runners/flink/#flink-version-compatibility,
1.14 is the latest supported version.

On Fri, Jan 27, 2023 at 9:19 AM P Singh  wrote:

> Hi Team,
>
> I am trying to run apache beam pipeline on flink cluster. I have set up
> kubernetes locally with flink1.16and apache/beam_python3.10_sdk:2.44.0.
> When I submit the job using like
>
> python file.py
>
> Job is just hang not able to see on flink UI or logs.
>
> Can you please suggest compatible versions?
>
>
> Looking forward to hearing from you.
>
>


Which flink version is compatible with beam

2023-01-27 Thread P Singh
Hi Team,

I am trying to run apache beam pipeline on flink cluster. I have set up 
kubernetes locally with flink1.16and apache/beam_python3.10_sdk:2.44.0. When I 
submit the job using like

python file.py

Job is just hang not able to see on flink UI or logs.

Can you please suggest compatible versions?


Looking forward to hearing from you.