[jira] [Created] (FLINK-32592) Mixed-up job execution on concurrent job submission

2023-07-14 Thread Fabio Wanner (Jira)
Fabio Wanner created FLINK-32592:


 Summary: Mixed-up job execution on concurrent job submission
 Key: FLINK-32592
 URL: https://issues.apache.org/jira/browse/FLINK-32592
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.17.1, 1.15.4, 1.18.0
Reporter: Fabio Wanner


*Context*

We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a 
single session cluster. The job submissions done by the operator happen 
concurrently, basically at the same time.

Operator version: 1.5.0

Flink version:  1.15.4, 1.7.1, 1.18 (master@f37d41cf)

*Problem*

Rarely (~once every 50 deployments) one of the jobs will not be executed. In 
the following incident 4 jobs are deployed at the same time:
 * gorner-task-staging-e5730831
 * gorner-facility-staging-e5730831
 * gorner-aepp-staging-e5730831
 * gorner-session-staging-e5730831

 
The operator submits the job, they all get a reasonable jobID:
{code:java}
2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO 
][aelps-staging/gorner-task-staging-e5730831] Submitting job: 
4968b186061e44390002 to session cluster.
2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO 
][aelps-staging/gorner-facility-staging-e5730831] Submitting job: 
91a5260d916c4dff0002 to session cluster.
2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO 
][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: 
103c0446e14749a10002 to session cluster.
2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO 
][aelps-staging/gorner-session-staging-e5730831] Submitting job: 
de59304d370b4b8e0002 to session cluster.
{code}
In the cluster the JarRunHandler's handleRequest() method will get the request, 
all 4 jobIDs are present (also all args, etc are correct):
{code:java}
2023-07-14 10:25:35,320 WARN  
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - handleRequest 
- requestBody.jobId: 4968b186061e44390002
2023-07-14 10:25:35,321 WARN  
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - handleRequest 
- requestBody.jobId: de59304d370b4b8e0002
2023-07-14 10:25:35,321 WARN  
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - handleRequest 
- requestBody.jobId: 91a5260d916c4dff0002
2023-07-14 10:25:35,321 WARN  
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - handleRequest 
- requestBody.jobId: 103c0446e14749a10002
{code}
But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is called 
instead of getting 1 call per jobID we have 4 calls but one of the jobIDs twice:
{code:java}
2023-07-14 10:25:35,616 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - optJobId: Optional[4968b186061e44390002]
2023-07-14 10:25:35,616 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - optJobId: Optional[103c0446e14749a10002]
2023-07-14 10:25:35,616 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - optJobId: Optional[de59304d370b4b8e0002]
2023-07-14 10:25:35,721 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - optJobId: Optional[de59304d370b4b8e0002]
{code}
If this is important: the jobGraph obtained does not match the jobID. We get 2 
times de59304d370b4b8e0002 but the jobgraph for this jobID is never 
returned by getJobGraph() in EmbeddedExecutor.submitAndGetJobClientFuture().

This will then lead to the job already existing:
{code:java}
2023-07-14 10:25:35,616 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - submittedJobIds: []
2023-07-14 10:25:35,616 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - submittedJobIds: []
2023-07-14 10:25:35,616 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - submittedJobIds: []
2023-07-14 10:25:35,721 WARN  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
execute - submittedJobIds: [de59304d370b4b8e0002]
{code}
But since the jobs are completely different the execution will fail. Depending 
on the timing with one of the following exceptions:
 * RestHandlerException: No jobs included in application
 * ClassNotFoundException: 
io.dectris.aelps.pipelines.gorner.facility.FacilityEventProcessor

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32552) Mixed up Flink session job deployments

2023-07-06 Thread Fabio Wanner (Jira)
Fabio Wanner created FLINK-32552:


 Summary: Mixed up Flink session job deployments
 Key: FLINK-32552
 URL: https://issues.apache.org/jira/browse/FLINK-32552
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Reporter: Fabio Wanner


*Context*

In the scope of end-to-end tests we deploy all the Flink session jobs we have 
regularly in a staging environment. Some of the jobs are bundled together in 
one helm chart and therefore deployed at the same time. There are around 40 
individual Flink jobs (running on the same Flink session cluster). The session 
cluster is individual for each e2e test run. The problems described below 
happen scarcely (1 in ~ 50 run maybe).

*Problem*

Rarely the operator seems to "mix up" the deployments. This can be seen in the 
Flink cluster logs as multiple {{Received JobGraph submission '' 
()}} logs are created from jobs with the same job_id. This results in 
errors such as:

{{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}}

It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName does 
not match the expected job name of the job being deployed (The job name is 
passed to the application via argument).

So far we were unable to reliably reproduce the error.

*Details*

The following lines show the status of 3 jobs form the view point of the Flink 
cluster dashboard, and the FlinkSessionJob ressource:

*aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615*

Apache Flink Dashboard:
 * State: Restarting
 * ID: a7d36f3881f943a2
 * Exceptions: Cannot load user class: aelps.pipelines.aletsch.smc.SMCUrlMapper

FlinkSessionJob Ressource:
 * State: RUNNING
 * jobId: a1221c743367497b0002
 * uid: a1221c74-3367-497b-ad2f-8793ab23919d

*aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615*

Apache Flink Dashboard:
 * State: -
 * ID: -

FlinkSessionJob Ressource:
 * State: UPGRADING
 * jobId: -
 * uid: a7d36f38-81f9-43a0-898f-19b950430e9d

Flink K8s Operator:
 * Exceptions: DuplicateJobSubmissionException: Job has already been submitted.

*aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615*

Apache Flink Dashboard:
 * State: Running
 * ID: e692c2dfaa18441c0002
 * Exceptions: -

FlinkSessionJob Ressource:
 * State: RUNNING
 * jobId: e692c2dfaa18441c0002
 * uid: e692c2df-aa18-441c-a352-88aefa9a3017

As we can see the *aletsch_smc* job is presumably running according to the 
FlinkSessionJob resource, but crash-looping in the cluster and it has the jobID 
matching the uid of the resource of {*}aletsch_mat{*}. While *aletsch_mat* is 
not even running. The following logs also show some suspicious entries: There 
are several {{Received JobGraph submission}} from different jobs with the same 
jobID.

*Logs*

The logs are filtered by the ** 3 jobIds from above.

 

JobID: a7d36f3881f943a2
{code:bash}
Flink Cluster
...
023-07-06 10:23:50,552 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
(a7d36f3881f943a2) switched from state RUNNING to RESTARTING.
2023-07-06 10:23:50 file: 
'/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2'
 (valid JAR)
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=4}]
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=3}]
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=2}]
2023-07-06 10:23:50,522 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Received resource requirements from job a7d36f3881f943a2: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
2023-07-06 10:23:50,512 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 
(a7d36f3881f943a2) switched from state RESTARTING to RUNNING.
2023-07-06 10:23:48,979 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Clearing resource requirements of job 

[jira] [Created] (FLINK-32412) JobID collisions in FlinkSessionJob

2023-06-22 Thread Fabio Wanner (Jira)
Fabio Wanner created FLINK-32412:


 Summary: JobID collisions in FlinkSessionJob
 Key: FLINK-32412
 URL: https://issues.apache.org/jira/browse/FLINK-32412
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Fabio Wanner


>From time to time we see {{JobId}} collisions in our deployments due to the 
>low entropy of the generated {{{}JobId{}}}. The problem is that, although the 
>{{uid}} from the k8s-resource (which is a UUID, but we don't know of which 
>version), only the {{hashCode}} of it will be used for the {{{}JobId{}}}. The 
>{{hashCode}} is an integer, thus 32 bits. If we look at the birthday problem 
>theorem we can expect a collision with a 50% chance with only 77000 random 
>integers. 

In reality we seem to see the problem more often, but this could be because the 
{{uid}} might not be completely random, therefore increasing the chances if we 
just use parts of it.

We propose to at least use the complete 64 bits of the upper part of the 
{{{}JobId{}}}, where 5.1×10{^}9{^} IDs are needed for a collision chance of 
50%. We could even argue that most probably 64 bit for the generation number is 
not needed and another 32 bit could be spent on the uid to increase the entropy 
of the {{JobId}} even further (This would mean the max generation would be 
4,294,967,295).

Our suggestion for using 64 bits would be:
{code:java}
new JobID(
UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), 
Preconditions.checkNotNull(generation)
);
{code}
Any thoughts on this? I would create a PR once we know how to proceed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30845) Params in jarURI end up in file name

2023-01-30 Thread Fabio Wanner (Jira)
Fabio Wanner created FLINK-30845:


 Summary: Params in jarURI end up in file name
 Key: FLINK-30845
 URL: https://issues.apache.org/jira/browse/FLINK-30845
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.3.0
Reporter: Fabio Wanner


*Context*

Jar files for jobs are submitted to the operator by supplying a URI to the .jar 
file. This URI can be a file system path or a URI to some HTTP resource. If a 
HTTP URI is given, the file will be fetched using the 
{{{}HttpArtifactFetcher{}}}. 

There are cases where the supplied URI will contain additional params. For 
example if pre-signed S3 URLs are used.

Example:
{code:java}
https://some-domain.example.com/some.jar?some=params{code}
*Problem*

When the HttpArtifactFetcher determines the name of the .jar file it does also 
use the params as part of the file name. In the example from above the 
resulting file name would be:  {{some.jar?some=params}}

Submitting this job to Flink will result in an error as it will be checked for 
the file name to end with {{.jar}}

*Possible Solution*
In the {{HttpArtifactFetcher}} it would be enough to replace:
{code:java}
String fileName = FilenameUtils.getName(url.getFile());{code}
with
{code:java}
String fileName = FilenameUtils.getName(url.getPath());{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)