[jira] [Created] (FLINK-32592) Mixed-up job execution on concurrent job submission
Fabio Wanner created FLINK-32592: Summary: Mixed-up job execution on concurrent job submission Key: FLINK-32592 URL: https://issues.apache.org/jira/browse/FLINK-32592 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.17.1, 1.15.4, 1.18.0 Reporter: Fabio Wanner *Context* We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a single session cluster. The job submissions done by the operator happen concurrently, basically at the same time. Operator version: 1.5.0 Flink version: 1.15.4, 1.7.1, 1.18 (master@f37d41cf) *Problem* Rarely (~once every 50 deployments) one of the jobs will not be executed. In the following incident 4 jobs are deployed at the same time: * gorner-task-staging-e5730831 * gorner-facility-staging-e5730831 * gorner-aepp-staging-e5730831 * gorner-session-staging-e5730831 The operator submits the job, they all get a reasonable jobID: {code:java} 2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO ][aelps-staging/gorner-task-staging-e5730831] Submitting job: 4968b186061e44390002 to session cluster. 2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO ][aelps-staging/gorner-facility-staging-e5730831] Submitting job: 91a5260d916c4dff0002 to session cluster. 2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: 103c0446e14749a10002 to session cluster. 2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO ][aelps-staging/gorner-session-staging-e5730831] Submitting job: de59304d370b4b8e0002 to session cluster. {code} In the cluster the JarRunHandler's handleRequest() method will get the request, all 4 jobIDs are present (also all args, etc are correct): {code:java} 2023-07-14 10:25:35,320 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - handleRequest - requestBody.jobId: 4968b186061e44390002 2023-07-14 10:25:35,321 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - handleRequest - requestBody.jobId: de59304d370b4b8e0002 2023-07-14 10:25:35,321 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - handleRequest - requestBody.jobId: 91a5260d916c4dff0002 2023-07-14 10:25:35,321 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - handleRequest - requestBody.jobId: 103c0446e14749a10002 {code} But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is called instead of getting 1 call per jobID we have 4 calls but one of the jobIDs twice: {code:java} 2023-07-14 10:25:35,616 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - optJobId: Optional[4968b186061e44390002] 2023-07-14 10:25:35,616 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - optJobId: Optional[103c0446e14749a10002] 2023-07-14 10:25:35,616 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - optJobId: Optional[de59304d370b4b8e0002] 2023-07-14 10:25:35,721 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - optJobId: Optional[de59304d370b4b8e0002] {code} If this is important: the jobGraph obtained does not match the jobID. We get 2 times de59304d370b4b8e0002 but the jobgraph for this jobID is never returned by getJobGraph() in EmbeddedExecutor.submitAndGetJobClientFuture(). This will then lead to the job already existing: {code:java} 2023-07-14 10:25:35,616 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - submittedJobIds: [] 2023-07-14 10:25:35,616 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - submittedJobIds: [] 2023-07-14 10:25:35,616 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - submittedJobIds: [] 2023-07-14 10:25:35,721 WARN org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - submittedJobIds: [de59304d370b4b8e0002] {code} But since the jobs are completely different the execution will fail. Depending on the timing with one of the following exceptions: * RestHandlerException: No jobs included in application * ClassNotFoundException: io.dectris.aelps.pipelines.gorner.facility.FacilityEventProcessor -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32552) Mixed up Flink session job deployments
Fabio Wanner created FLINK-32552: Summary: Mixed up Flink session job deployments Key: FLINK-32552 URL: https://issues.apache.org/jira/browse/FLINK-32552 Project: Flink Issue Type: Bug Components: Kubernetes Operator Reporter: Fabio Wanner *Context* In the scope of end-to-end tests we deploy all the Flink session jobs we have regularly in a staging environment. Some of the jobs are bundled together in one helm chart and therefore deployed at the same time. There are around 40 individual Flink jobs (running on the same Flink session cluster). The session cluster is individual for each e2e test run. The problems described below happen scarcely (1 in ~ 50 run maybe). *Problem* Rarely the operator seems to "mix up" the deployments. This can be seen in the Flink cluster logs as multiple {{Received JobGraph submission '' ()}} logs are created from jobs with the same job_id. This results in errors such as: {{DuplicateJobSubmissionException}} or {{ClassNotFoundException.}} It' also visible in the FlinkSessionJob resource: status.jobStatus.jobName does not match the expected job name of the job being deployed (The job name is passed to the application via argument). So far we were unable to reliably reproduce the error. *Details* The following lines show the status of 3 jobs form the view point of the Flink cluster dashboard, and the FlinkSessionJob ressource: *aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615* Apache Flink Dashboard: * State: Restarting * ID: a7d36f3881f943a2 * Exceptions: Cannot load user class: aelps.pipelines.aletsch.smc.SMCUrlMapper FlinkSessionJob Ressource: * State: RUNNING * jobId: a1221c743367497b0002 * uid: a1221c74-3367-497b-ad2f-8793ab23919d *aletsch_mat_e5730831db8092adb12f5189c4c895ef3a268615* Apache Flink Dashboard: * State: - * ID: - FlinkSessionJob Ressource: * State: UPGRADING * jobId: - * uid: a7d36f38-81f9-43a0-898f-19b950430e9d Flink K8s Operator: * Exceptions: DuplicateJobSubmissionException: Job has already been submitted. *aletsch_wp_wafer_e5730831db8092adb12f5189c4c895ef3a268615* Apache Flink Dashboard: * State: Running * ID: e692c2dfaa18441c0002 * Exceptions: - FlinkSessionJob Ressource: * State: RUNNING * jobId: e692c2dfaa18441c0002 * uid: e692c2df-aa18-441c-a352-88aefa9a3017 As we can see the *aletsch_smc* job is presumably running according to the FlinkSessionJob resource, but crash-looping in the cluster and it has the jobID matching the uid of the resource of {*}aletsch_mat{*}. While *aletsch_mat* is not even running. The following logs also show some suspicious entries: There are several {{Received JobGraph submission}} from different jobs with the same jobID. *Logs* The logs are filtered by the ** 3 jobIds from above. JobID: a7d36f3881f943a2 {code:bash} Flink Cluster ... 023-07-06 10:23:50,552 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 (a7d36f3881f943a2) switched from state RUNNING to RESTARTING. 2023-07-06 10:23:50 file: '/tmp/tm_10.0.11.159:6122-e9fadc/blobStorage/job_a7d36f3881f943a2/blob_p-40c7a30adef8868254191d2cf2dbc4cb7ab46f0d-8a02a0583d91c5e8e6c94f378aa444c2' (valid JAR) 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=4}] 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=3}] 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}] 2023-07-06 10:23:50,522 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job a7d36f3881f943a2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2023-07-06 10:23:50,512 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job aletsch_smc_e5730831db8092adb12f5189c4c895ef3a268615 (a7d36f3881f943a2) switched from state RESTARTING to RUNNING. 2023-07-06 10:23:48,979 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job
[jira] [Created] (FLINK-32412) JobID collisions in FlinkSessionJob
Fabio Wanner created FLINK-32412: Summary: JobID collisions in FlinkSessionJob Key: FLINK-32412 URL: https://issues.apache.org/jira/browse/FLINK-32412 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0 Reporter: Fabio Wanner >From time to time we see {{JobId}} collisions in our deployments due to the >low entropy of the generated {{{}JobId{}}}. The problem is that, although the >{{uid}} from the k8s-resource (which is a UUID, but we don't know of which >version), only the {{hashCode}} of it will be used for the {{{}JobId{}}}. The >{{hashCode}} is an integer, thus 32 bits. If we look at the birthday problem >theorem we can expect a collision with a 50% chance with only 77000 random >integers. In reality we seem to see the problem more often, but this could be because the {{uid}} might not be completely random, therefore increasing the chances if we just use parts of it. We propose to at least use the complete 64 bits of the upper part of the {{{}JobId{}}}, where 5.1×10{^}9{^} IDs are needed for a collision chance of 50%. We could even argue that most probably 64 bit for the generation number is not needed and another 32 bit could be spent on the uid to increase the entropy of the {{JobId}} even further (This would mean the max generation would be 4,294,967,295). Our suggestion for using 64 bits would be: {code:java} new JobID( UUID.fromString(Preconditions.checkNotNull(uid)).getMostSignificantBits(), Preconditions.checkNotNull(generation) ); {code} Any thoughts on this? I would create a PR once we know how to proceed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30845) Params in jarURI end up in file name
Fabio Wanner created FLINK-30845: Summary: Params in jarURI end up in file name Key: FLINK-30845 URL: https://issues.apache.org/jira/browse/FLINK-30845 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.3.0 Reporter: Fabio Wanner *Context* Jar files for jobs are submitted to the operator by supplying a URI to the .jar file. This URI can be a file system path or a URI to some HTTP resource. If a HTTP URI is given, the file will be fetched using the {{{}HttpArtifactFetcher{}}}. There are cases where the supplied URI will contain additional params. For example if pre-signed S3 URLs are used. Example: {code:java} https://some-domain.example.com/some.jar?some=params{code} *Problem* When the HttpArtifactFetcher determines the name of the .jar file it does also use the params as part of the file name. In the example from above the resulting file name would be: {{some.jar?some=params}} Submitting this job to Flink will result in an error as it will be checked for the file name to end with {{.jar}} *Possible Solution* In the {{HttpArtifactFetcher}} it would be enough to replace: {code:java} String fileName = FilenameUtils.getName(url.getFile());{code} with {code:java} String fileName = FilenameUtils.getName(url.getPath());{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)