[jira] [Commented] (AIRFLOW-2385) Airflow task is not stopped when execution timeout gets triggered

2018-04-27 Thread Yohei Onishi (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16455799#comment-16455799
 ] 

Yohei Onishi commented on AIRFLOW-2385:
---

Yes, this job runs Spark job. I added on_kill method and stop Spark session.  I 
will see if it works. Any advices are highly welcomed.
{code:java}
def execute(self, context):
  self.spark = self.create_spark_session()
  // run some queries

def create_spark_session(self):
  return SparkSession.builder. \
appName(self.app_name). \
config('spark.master', self.spark_master). \
config('spark.executor.cores', self.spark_executor_cores). \
config('spark.executor.memory', self.spark_executor_memory). \
config('spark.executor.instances', self.spark_num_executors). \
config('spark.yarn.queue', self.yarn_queue). \
enableHiveSupport(). \
getOrCreate()

def on_kill(self):
  if (self.spark):
self.spark.stop()
{code}

> Airflow task is not stopped when execution timeout gets triggered
> -
>
> Key: AIRFLOW-2385
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2385
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG
>Affects Versions: 1.9.0
>Reporter: Yohei Onishi
>Priority: Major
>
> I have my own custom operator extends BaseOperator as follows. I tried to 
> kill a task if the task runs for more than 30 minutes. timeout seems to be 
> triggered according to a log but the task still continued.
> Am I missing something? I checked the official document but do not know what 
> is wrong.[https://airflow.apache.org/code.html#baseoperator]
> My operator is like as follows.
> {code:java}
> class MyOperator(BaseOperator):
>   @apply_defaults
>   def __init__(
> self,
> some_parameters_here,
> *args,
> **kwargs):
> super(MyOperator, self).__init__(*args, **kwargs)
> # some initialization here
>   def execute(self, context):
> # some code here
> {code}
>  
> {{}}My task is like as follows.
> {code:java}
> t = MyOperator(
>   task_id='task',
>   dag=scheduled_dag,
>   execution_timeout=timedelta(minutes=30)
> {code}
>  
> I found this error but the task continued.
> {code:java}
> [2018-04-12 03:30:28,353] {base_task_runner.py:98} INFO - Subtask: [Stage 
> 6:==(1380 + -160) / 
> 1224][2018-04- 12 03:30:28,353] {timeout.py:36} ERROR - Process timed out
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2385) Airflow task is not stopped when execution timeout gets triggered

2018-04-26 Thread Arthur Wiedmer (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16455717#comment-16455717
 ] 

Arthur Wiedmer commented on AIRFLOW-2385:
-

Hi Yohei,

Unless I am mistaken, it looks like your operator is executing a Spark Job (I 
seem to recognize the progress bar from the logs.). execution_timeout will only 
a raise an exception in the Python process, but it might not kill the job.

You probably want to implement the on_kill method for your operator so that it 
knows how to clean up your process. It has been implemented in a few operators 
already in the code base.

Good luck!

> Airflow task is not stopped when execution timeout gets triggered
> -
>
> Key: AIRFLOW-2385
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2385
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG
>Affects Versions: 1.9.0
>Reporter: Yohei Onishi
>Priority: Major
>
> I have my own custom operator extends BaseOperator as follows. I tried to 
> kill a task if the task runs for more than 30 minutes. timeout seems to be 
> triggered according to a log but the task still continued.
> Am I missing something? I checked the official document but do not know what 
> is wrong.[https://airflow.apache.org/code.html#baseoperator]
> My operator is like as follows.
> {code:java}
> class MyOperator(BaseOperator):
>   @apply_defaults
>   def __init__(
> self,
> some_parameters_here,
> *args,
> **kwargs):
> super(MyOperator, self).__init__(*args, **kwargs)
> # some initialization here
>   def execute(self, context):
> # some code here
> {code}
>  
> {{}}My task is like as follows.
> {code:java}
> t = MyOperator(
>   task_id='task',
>   dag=scheduled_dag,
>   execution_timeout=timedelta(minutes=30)
> {code}
>  
> I found this error but the task continued.
> {code:java}
> [2018-04-12 03:30:28,353] {base_task_runner.py:98} INFO - Subtask: [Stage 
> 6:==(1380 + -160) / 
> 1224][2018-04- 12 03:30:28,353] {timeout.py:36} ERROR - Process timed out
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)