yathit commented on issue #16140:
URL: https://github.com/apache/airflow/issues/16140#issuecomment-1068627047
I tested in Airflow 2.3.0.dev0 using
`airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator` and
no longer reproduced the issue.
```python
import datetime
import unittest
from airflow.models.dag import DAG
from airflow.providers.apache.spark.operators.spark_submit import
SparkSubmitOperator
from airflow.utils import timezone
from tests.test_utils.config import conf_vars
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
END_DATE = timezone.datetime(2016, 1, 2)
INTERVAL = datetime.timedelta(minutes=5)
FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
class TestSparkSubmitOperator(unittest.TestCase):
def setUp(self):
super().setUp()
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': DEFAULT_DATE,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': datetime.timedelta(minutes=5)
}
self.dag = DAG(
dag_id='pi',
concurrency=1,
default_args=default_args,
catchup=True,
schedule_interval=INTERVAL
)
self.addCleanup(self.dag.clear)
def _run_as_operator(self, **kwargs):
task = SparkSubmitOperator(
task_id='pi_task',
conn_id='spark',
application=f'/home/me/pi.py',
executor_cores=4,
num_executors=4,
executor_memory='1g',
driver_memory='1g',
name='pi',
execution_timeout=datetime.timedelta(minutes=60),
dag=self.dag
)
def test_execute(self):
with conf_vars({}):
self._run_as_operator()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]