bin-lian opened a new issue, #44810:
URL: https://github.com/apache/airflow/issues/44810

   ### Apache Airflow version
   
   2.10.3
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I use airflow to schedule spark jobs on k8s using SparkSubmitOperator.The 
spark job succeeded, but the airflow status is failed
   
   ### What you think should happen instead?
   
   Due to memory oom exceptions in some tasks, the exit code is generated, 
resulting in _spark_exit_code not being equal to 0. However, the task will 
retry itself, and the spark task is ultimately successful. Since 
_spark_exit_code is not 0, SparkSubmitOperator considers the task status to be 
a failure. Is it possible to not check _spark_exit_code? The status code 
returned by the child process shall prevail (returncode)
   
![95DF0514-E3BB-4B53-9F82-3948223F49B6](https://github.com/user-attachments/assets/ce0edfc5-2c94-4d3d-aefb-9d46aae28cb5)
   `    def submit(self, application: str = "", **kwargs: Any) -> None:
           """
           Remote Popen to execute the spark-submit job.
   
           :param application: Submitted application, jar or py file
           :param kwargs: extra arguments to Popen (see subprocess.Popen)
           """
           spark_submit_cmd = self._build_spark_submit_command(application)
   
           if self._env:
               env = os.environ.copy()
               env.update(self._env)
               kwargs["env"] = env
   
           self._submit_sp = subprocess.Popen(
               spark_submit_cmd,
               stdout=subprocess.PIPE,
               stderr=subprocess.STDOUT,
               bufsize=-1,
               universal_newlines=True,
               **kwargs,
           )
   
           self._process_spark_submit_log(iter(self._submit_sp.stdout))  # 
type: ignore
           returncode = self._submit_sp.wait()
   
           # Check spark-submit return code. In Kubernetes mode, also check the 
value
           # of exit code in the log, as it may differ.
           if returncode or (self._is_kubernetes and self._spark_exit_code != 
0):
               if self._is_kubernetes:
                   raise AirflowException(
                       f"Cannot execute: {self._mask_cmd(spark_submit_cmd)}. 
Error code is: {returncode}. "
                       f"Kubernetes spark exit code is: {self._spark_exit_code}"
                   )
               else:
                   raise AirflowException(
                       f"Cannot execute: {self._mask_cmd(spark_submit_cmd)}. 
Error code is: {returncode}."
                   )
   
           self.log.debug("Should track driver: %s", 
self._should_track_driver_status)
   
           # We want the Airflow job to wait until the Spark driver is finished
           if self._should_track_driver_status:
               if self._driver_id is None:
                   raise AirflowException(
                       "No driver id is known: something went wrong when 
executing the spark submit command"
                   )
   
               # We start with the SUBMITTED status as initial status
               self._driver_status = "SUBMITTED"
   
               # Start tracking the driver status (blocking function)
               self._start_driver_status_tracking()
   
               if self._driver_status != "FINISHED":
                   raise AirflowException(
                       f"ERROR : Driver {self._driver_id} badly exited with 
status {self._driver_status}"
                   )
   `
   `    def _process_spark_submit_log(self, itr: Iterator[Any]) -> None:
           """
           Process the log files and extract useful information out of it.
   
           If the deploy-mode is 'client', log the output of the submit command 
as those
           are the output logs of the Spark worker directly.
   
           Remark: If the driver needs to be tracked for its status, the 
log-level of the
           spark deploy needs to be at least INFO 
(log4j.logger.org.apache.spark.deploy=INFO)
   
           :param itr: An iterator which iterates over the input of the 
subprocess
           """
           # Consume the iterator
           for line in itr:
               line = line.strip()
               # If we run yarn cluster mode, we want to extract the 
application id from
               # the logs so we can kill the application when we stop it 
unexpectedly
               if self._is_yarn and self._connection["deploy_mode"] == 
"cluster":
                   match = re.search("application[0-9_]+", line)
                   if match:
                       self._yarn_application_id = match.group(0)
                       self.log.info("Identified spark application id: %s", 
self._yarn_application_id)
   
               # If we run Kubernetes cluster mode, we want to extract the 
driver pod id
               # from the logs so we can kill the application when we stop it 
unexpectedly
               elif self._is_kubernetes:
                   match_driver_pod = re.search(r"\s*pod name: 
((.+?)-([a-z0-9]+)-driver$)", line)
                   if match_driver_pod:
                       self._kubernetes_driver_pod = match_driver_pod.group(1)
                       self.log.info("Identified spark driver pod: %s", 
self._kubernetes_driver_pod)
   
                   match_application_id = re.search(r"\s*spark-app-selector -> 
(spark-([a-z0-9]+)), ", line)
                   if match_application_id:
                       self._kubernetes_application_id = 
match_application_id.group(1)
                       self.log.info("Identified spark application id: %s", 
self._kubernetes_application_id)
   
                   # Store the Spark Exit code
                   match_exit_code = re.search(r"\s*[eE]xit code: (\d+)", line)
                   if match_exit_code:
                       self._spark_exit_code = int(match_exit_code.group(1))
   
               # if we run in standalone cluster mode and we want to track the 
driver status
               # we need to extract the driver id from the logs. This allows us 
to poll for
               # the status using the driver id. Also, we can kill the driver 
when needed.
               elif self._should_track_driver_status and not self._driver_id:
                   match_driver_id = re.search(r"driver-[0-9\-]+", line)
                   if match_driver_id:
                       self._driver_id = match_driver_id.group(0)
                       self.log.info("identified spark driver id: %s", 
self._driver_id)
   
               self.log.info(line)
   `
   
   ### How to reproduce
   
   You can make the internal task reproduce the problem due to partial OOM 
failure
   
   ### Operating System
   
   PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" 
VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian 
HOME_URL="https://www.debian.org/"; SUPPORT_URL="https://www.debian.org/support"; 
BUG_REPORT_URL="https://bugs.debian.org/";
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to