bin-lian opened a new issue, #44810: URL: https://github.com/apache/airflow/issues/44810
### Apache Airflow version 2.10.3 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? I use airflow to schedule spark jobs on k8s using SparkSubmitOperator.The spark job succeeded, but the airflow status is failed ### What you think should happen instead? Due to memory oom exceptions in some tasks, the exit code is generated, resulting in _spark_exit_code not being equal to 0. However, the task will retry itself, and the spark task is ultimately successful. Since _spark_exit_code is not 0, SparkSubmitOperator considers the task status to be a failure. Is it possible to not check _spark_exit_code? The status code returned by the child process shall prevail (returncode)  ` def submit(self, application: str = "", **kwargs: Any) -> None: """ Remote Popen to execute the spark-submit job. :param application: Submitted application, jar or py file :param kwargs: extra arguments to Popen (see subprocess.Popen) """ spark_submit_cmd = self._build_spark_submit_command(application) if self._env: env = os.environ.copy() env.update(self._env) kwargs["env"] = env self._submit_sp = subprocess.Popen( spark_submit_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=-1, universal_newlines=True, **kwargs, ) self._process_spark_submit_log(iter(self._submit_sp.stdout)) # type: ignore returncode = self._submit_sp.wait() # Check spark-submit return code. In Kubernetes mode, also check the value # of exit code in the log, as it may differ. if returncode or (self._is_kubernetes and self._spark_exit_code != 0): if self._is_kubernetes: raise AirflowException( f"Cannot execute: {self._mask_cmd(spark_submit_cmd)}. Error code is: {returncode}. " f"Kubernetes spark exit code is: {self._spark_exit_code}" ) else: raise AirflowException( f"Cannot execute: {self._mask_cmd(spark_submit_cmd)}. Error code is: {returncode}." ) self.log.debug("Should track driver: %s", self._should_track_driver_status) # We want the Airflow job to wait until the Spark driver is finished if self._should_track_driver_status: if self._driver_id is None: raise AirflowException( "No driver id is known: something went wrong when executing the spark submit command" ) # We start with the SUBMITTED status as initial status self._driver_status = "SUBMITTED" # Start tracking the driver status (blocking function) self._start_driver_status_tracking() if self._driver_status != "FINISHED": raise AirflowException( f"ERROR : Driver {self._driver_id} badly exited with status {self._driver_status}" ) ` ` def _process_spark_submit_log(self, itr: Iterator[Any]) -> None: """ Process the log files and extract useful information out of it. If the deploy-mode is 'client', log the output of the submit command as those are the output logs of the Spark worker directly. Remark: If the driver needs to be tracked for its status, the log-level of the spark deploy needs to be at least INFO (log4j.logger.org.apache.spark.deploy=INFO) :param itr: An iterator which iterates over the input of the subprocess """ # Consume the iterator for line in itr: line = line.strip() # If we run yarn cluster mode, we want to extract the application id from # the logs so we can kill the application when we stop it unexpectedly if self._is_yarn and self._connection["deploy_mode"] == "cluster": match = re.search("application[0-9_]+", line) if match: self._yarn_application_id = match.group(0) self.log.info("Identified spark application id: %s", self._yarn_application_id) # If we run Kubernetes cluster mode, we want to extract the driver pod id # from the logs so we can kill the application when we stop it unexpectedly elif self._is_kubernetes: match_driver_pod = re.search(r"\s*pod name: ((.+?)-([a-z0-9]+)-driver$)", line) if match_driver_pod: self._kubernetes_driver_pod = match_driver_pod.group(1) self.log.info("Identified spark driver pod: %s", self._kubernetes_driver_pod) match_application_id = re.search(r"\s*spark-app-selector -> (spark-([a-z0-9]+)), ", line) if match_application_id: self._kubernetes_application_id = match_application_id.group(1) self.log.info("Identified spark application id: %s", self._kubernetes_application_id) # Store the Spark Exit code match_exit_code = re.search(r"\s*[eE]xit code: (\d+)", line) if match_exit_code: self._spark_exit_code = int(match_exit_code.group(1)) # if we run in standalone cluster mode and we want to track the driver status # we need to extract the driver id from the logs. This allows us to poll for # the status using the driver id. Also, we can kill the driver when needed. elif self._should_track_driver_status and not self._driver_id: match_driver_id = re.search(r"driver-[0-9\-]+", line) if match_driver_id: self._driver_id = match_driver_id.group(0) self.log.info("identified spark driver id: %s", self._driver_id) self.log.info(line) ` ### How to reproduce You can make the internal task reproduce the problem due to partial OOM failure ### Operating System PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/" ### Versions of Apache Airflow Providers _No response_ ### Deployment Other Docker-based deployment ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
