xBis7 commented on code in PR #62880:
URL: https://github.com/apache/airflow/pull/62880#discussion_r2892257753
##########
airflow-core/tests/integration/otel/test_otel.py:
##########
@@ -1077,435 +486,40 @@ def test_same_scheduler_processing_the_entire_dag(
log.info("out-start --\n%s\n-- out-end", out)
log.info("err-start --\n%s\n-- err-end", err)
- if self.use_otel != "true":
- # Dag run should have succeeded. Test the spans from the output.
- check_spans_without_continuance(output=out, dag=dag)
-
- @pytest.mark.execution_timeout(90)
- def test_scheduler_change_after_the_first_task_finishes(
- self, monkeypatch, celery_worker_env_vars, capfd, session
- ):
- """
- The scheduler thread will be paused after the first task ends and a
new scheduler process
- will handle the rest of the dag processing. The paused thread will be
resumed afterwards.
- """
-
- # For this test, scheduler1 must be idle but still considered healthy
by scheduler2.
- # If scheduler2 marks the job as unhealthy, then it will recreate
scheduler1's spans
- # because it will consider them lost.
- os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] =
"90"
-
- celery_worker_process = None
- scheduler_process_1 = None
- apiserver_process = None
- scheduler_process_2 = None
- try:
- # Start the processes here and not as fixtures or in a common
setup,
- # so that the test can capture their output.
- celery_worker_process, scheduler_process_1, apiserver_process =
self.start_worker_and_scheduler1()
-
- dag_id = "otel_test_dag_with_pause_between_tasks"
- dag = self.dags[dag_id]
-
- run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
- deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
- while True:
- # To avoid get stuck waiting.
- if time.monotonic() > deadline:
- raise TimeoutError(
- f"Timed out waiting for 'pause' to appear in
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
- )
-
- try:
- with open(self.control_file) as file:
- file_contents = file.read()
-
- if "pause" in file_contents:
- log.info("Control file exists and the task has
been paused.")
- break
- time.sleep(1)
- continue
- except FileNotFoundError:
- print("Control file not found. Waiting...")
- time.sleep(3)
- continue
-
- with capfd.disabled():
- # When the scheduler1 thread is paused, capfd keeps trying to
read the
- # file descriptors for the process and ends up freezing the
test.
- # Temporarily disable capfd to avoid that.
- scheduler_process_1.send_signal(signal.SIGSTOP)
-
- check_dag_run_state_and_span_status(
- dag_id=dag_id, run_id=run_id, state=State.RUNNING,
span_status=SpanStatus.ACTIVE
- )
-
- # Start the 2nd scheduler immediately without any delay to avoid
having the 1st scheduler
- # marked as unhealthy. If that happens, then the 2nd will recreate
the spans that the
- # 1st scheduler started.
- # The scheduler would also be considered unhealthy in case it was
paused
- # and the dag run continued running.
-
- scheduler_process_2 = subprocess.Popen(
- self.scheduler_command_args,
- env=os.environ.copy(),
- stdout=None,
- stderr=None,
- )
-
- # Rewrite the file to unpause the dag.
- with open(self.control_file, "w") as file:
- file.write("continue")
-
- wait_for_dag_run_and_check_span_status(
- dag_id=dag_id, run_id=run_id, max_wait_time=120,
span_status=SpanStatus.SHOULD_END
- )
-
- # Stop scheduler2 in case it still has a db lock on the dag_run.
- scheduler_process_2.terminate()
- scheduler_process_1.send_signal(signal.SIGCONT)
-
- # Wait for the scheduler to start again and continue running.
- time.sleep(10)
-
- wait_for_dag_run_and_check_span_status(
- dag_id=dag_id, run_id=run_id, max_wait_time=30,
span_status=SpanStatus.ENDED
- )
-
- print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
- finally:
- if self.log_level == "debug":
- with create_session() as session:
- dump_airflow_metadata_db(session)
-
- # Reset for the rest of the tests.
- os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"]
= "15"
-
- # Terminate the processes.
- celery_worker_process.terminate()
- celery_worker_process.wait()
-
- scheduler_process_1.terminate()
- scheduler_process_1.wait()
-
- apiserver_process.terminate()
- apiserver_process.wait()
-
- scheduler_process_2.wait()
-
- out, err = capfd.readouterr()
- log.info("out-start --\n%s\n-- out-end", out)
- log.info("err-start --\n%s\n-- err-end", err)
-
- if self.use_otel != "true":
- # Dag run should have succeeded. Test the spans in the output.
- check_spans_for_paused_dag(output=out, dag=dag,
is_recreated=False, check_t1_sub_spans=False)
-
- @pytest.mark.execution_timeout(90)
- def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task(
- self, monkeypatch, celery_worker_env_vars, capfd, session
- ):
- """
- The scheduler that starts the dag run will be stopped, while the first
task is executing,
- and start a new scheduler will be started. That way, the new process
will pick up the dag processing.
- The initial scheduler will exit gracefully.
- """
-
- celery_worker_process = None
- apiserver_process = None
- scheduler_process_2 = None
- try:
- # Start the processes here and not as fixtures or in a common
setup,
- # so that the test can capture their output.
- celery_worker_process, scheduler_process_1, apiserver_process =
self.start_worker_and_scheduler1()
-
- dag_id = "otel_test_dag_with_pause_in_task"
- dag = self.dags[dag_id]
-
- run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
- deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
- while True:
- # To avoid get stuck waiting.
- if time.monotonic() > deadline:
- raise TimeoutError(
- f"Timed out waiting for 'pause' to appear in
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
- )
-
- try:
- with open(self.control_file) as file:
- file_contents = file.read()
-
- if "pause" in file_contents:
- log.info("Control file exists and the task has
been paused.")
- break
- time.sleep(1)
- continue
- except FileNotFoundError:
- print("Control file not found. Waiting...")
- time.sleep(3)
- continue
-
- # Since, we are past the loop, then the file exists and the dag
has been paused.
- # Terminate scheduler1 and start scheduler2.
- with capfd.disabled():
- scheduler_process_1.terminate()
-
- assert scheduler_process_1.wait() == 0
-
- check_dag_run_state_and_span_status(
- dag_id=dag_id, run_id=run_id, state=State.RUNNING,
span_status=SpanStatus.NEEDS_CONTINUANCE
- )
-
- scheduler_process_2 = subprocess.Popen(
- self.scheduler_command_args,
- env=os.environ.copy(),
- stdout=None,
- stderr=None,
- )
-
- # Rewrite the file to unpause the dag.
- with open(self.control_file, "w") as file:
- file.write("continue")
-
- wait_for_dag_run_and_check_span_status(
- dag_id=dag_id, run_id=run_id, max_wait_time=120,
span_status=SpanStatus.ENDED
- )
-
- print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
- finally:
- if self.log_level == "debug":
- with create_session() as session:
- dump_airflow_metadata_db(session)
-
- # Terminate the processes.
- celery_worker_process.terminate()
- celery_worker_process.wait()
-
- apiserver_process.terminate()
- apiserver_process.wait()
-
- scheduler_process_2.terminate()
- scheduler_process_2.wait()
-
- out, err = capfd.readouterr()
- log.info("out-start --\n%s\n-- out-end", out)
- log.info("err-start --\n%s\n-- err-end", err)
-
- if self.use_otel != "true":
- # Dag run should have succeeded. Test the spans in the output.
- check_spans_with_continuance(output=out, dag=dag)
-
- @pytest.mark.execution_timeout(90)
- def test_scheduler_exits_forcefully_in_the_middle_of_the_first_task(
- self, monkeypatch, celery_worker_env_vars, capfd, session
- ):
- """
- The first scheduler will exit forcefully while the first task is
running,
- so that it won't have time end any active spans.
- """
-
- celery_worker_process = None
- scheduler_process_2 = None
- apiserver_process = None
- try:
- # Start the processes here and not as fixtures or in a common
setup,
- # so that the test can capture their output.
- celery_worker_process, scheduler_process_1, apiserver_process =
self.start_worker_and_scheduler1()
-
- dag_id = "otel_test_dag_with_pause_in_task"
- dag = self.dags[dag_id]
-
- run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
- deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
- while True:
- # To avoid get stuck waiting.
- if time.monotonic() > deadline:
- raise TimeoutError(
- f"Timed out waiting for 'pause' to appear in
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
- )
-
- try:
- with open(self.control_file) as file:
- file_contents = file.read()
-
- if "pause" in file_contents:
- log.info("Control file exists and the task has
been paused.")
- break
- time.sleep(1)
- continue
- except FileNotFoundError:
- print("Control file not found. Waiting...")
- time.sleep(3)
- continue
-
- # Since, we are past the loop, then the file exists and the dag
has been paused.
- # Kill scheduler1 and start scheduler2.
- with capfd.disabled():
- scheduler_process_1.send_signal(signal.SIGKILL)
-
- # The process shouldn't have changed the span_status.
- check_dag_run_state_and_span_status(
- dag_id=dag_id, run_id=run_id, state=State.RUNNING,
span_status=SpanStatus.ACTIVE
- )
-
- # Wait so that the health threshold passes and scheduler1 is
considered unhealthy.
- time.sleep(15)
-
- scheduler_process_2 = subprocess.Popen(
- self.scheduler_command_args,
- env=os.environ.copy(),
- stdout=None,
- stderr=None,
- )
-
- # Wait for scheduler2 to be up and running.
- time.sleep(10)
-
- # Rewrite the file to unpause the dag.
- with open(self.control_file, "w") as file:
- file.write("continue")
-
- wait_for_dag_run_and_check_span_status(
- dag_id=dag_id, run_id=run_id, max_wait_time=120,
span_status=SpanStatus.ENDED
- )
-
- print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
- finally:
- if self.log_level == "debug":
- with create_session() as session:
- dump_airflow_metadata_db(session)
-
- # Terminate the processes.
- celery_worker_process.terminate()
- celery_worker_process.wait()
-
- apiserver_process.terminate()
- apiserver_process.wait()
-
- scheduler_process_2.terminate()
- scheduler_process_2.wait()
-
- out, err = capfd.readouterr()
- log.info("out-start --\n%s\n-- out-end", out)
- log.info("err-start --\n%s\n-- err-end", err)
-
- if self.use_otel != "true":
- # Dag run should have succeeded. Test the spans in the output.
- check_spans_without_continuance(output=out, dag=dag,
is_recreated=True, check_t1_sub_spans=False)
-
- @pytest.mark.execution_timeout(90)
- def test_scheduler_exits_forcefully_after_the_first_task_finishes(
- self, monkeypatch, celery_worker_env_vars, capfd, session
- ):
- """
- The first scheduler will exit forcefully after the first task finishes,
- so that it won't have time to end any active spans.
- In this scenario, the sub-spans for the first task will be lost.
- The only way to retrieve them, would be to re-run the task.
- """
-
- celery_worker_process = None
- apiserver_process = None
- scheduler_process_2 = None
- try:
- # Start the processes here and not as fixtures or in a common
setup,
- # so that the test can capture their output.
- celery_worker_process, scheduler_process_1, apiserver_process =
self.start_worker_and_scheduler1()
-
- dag_id = "otel_test_dag_with_pause_between_tasks"
- dag = self.dags[dag_id]
-
- run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
- deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
- while True:
- # To avoid get stuck waiting.
- if time.monotonic() > deadline:
- raise TimeoutError(
- f"Timed out waiting for 'pause' to appear in
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
- )
-
- try:
- with open(self.control_file) as file:
- file_contents = file.read()
-
- if "pause" in file_contents:
- log.info("Control file exists and the task has
been paused.")
- break
- time.sleep(1)
- continue
- except FileNotFoundError:
- print("Control file not found. Waiting...")
- time.sleep(3)
- continue
-
- # Since, we are past the loop, then the file exists and the dag
has been paused.
- # Kill scheduler1 and start scheduler2.
- with capfd.disabled():
- scheduler_process_1.send_signal(signal.SIGKILL)
-
- # The process shouldn't have changed the span_status.
- check_dag_run_state_and_span_status(
- dag_id=dag_id, run_id=run_id, state=State.RUNNING,
span_status=SpanStatus.ACTIVE
- )
-
- # Rewrite the file to unpause the dag.
- with open(self.control_file, "w") as file:
- file.write("continue")
-
- time.sleep(15)
- # The task should be adopted.
-
- scheduler_process_2 = subprocess.Popen(
- self.scheduler_command_args,
- env=os.environ.copy(),
- stdout=None,
- stderr=None,
- )
-
- wait_for_dag_run_and_check_span_status(
- dag_id=dag_id, run_id=run_id, max_wait_time=120,
span_status=SpanStatus.ENDED
- )
-
- print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
- finally:
- if self.log_level == "debug":
- with create_session() as session:
- dump_airflow_metadata_db(session)
-
- # Terminate the processes.
- celery_worker_process.terminate()
- celery_worker_process.wait()
-
- apiserver_process.terminate()
- apiserver_process.wait()
-
- scheduler_process_2.terminate()
- scheduler_process_2.wait()
-
- out, err = capfd.readouterr()
- log.info("out-start --\n%s\n-- out-end", out)
- log.info("err-start --\n%s\n-- err-end", err)
-
- if self.use_otel != "true":
- # Dag run should have succeeded. Test the spans in the output.
- check_spans_for_paused_dag(output=out, dag=dag, is_recreated=True,
check_t1_sub_spans=False)
-
- def start_worker_and_scheduler1(self):
- celery_worker_process = subprocess.Popen(
- self.celery_command_args,
- env=os.environ.copy(),
- stdout=None,
- stderr=None,
- )
-
+ # host = "host.docker.internal"
+ host = "jaeger"
+ service_name = os.environ.get("OTEL_SERVICE_NAME", "test")
+ # service_name ``= "my-service-name"
+ r =
requests.get(f"http://{host}:16686/api/traces?service={service_name}")
+ data = r.json()
+
+ trace = data["data"][-1]
+ spans = trace["spans"]
+
+ def get_span_hierarchy():
+ spans_dict = {x["spanID"]: x for x in spans}
+
+ def get_parent_span_id(span):
+ parents = [x["spanID"] for x in span["references"] if
x["refType"] == "CHILD_OF"]
+ if parents:
+ parent_id = parents[0]
+ return spans_dict[parent_id]["operationName"]
+
+ nested = {x["operationName"]: get_parent_span_id(x) for x in spans}
+ return nested
+
+ nested = get_span_hierarchy()
+ assert nested == {
+ "otel_test_dag": None,
+ "task1": None,
+ "task1_sub_span1": None,
+ "task1_sub_span2": None,
+ "task1_sub_span3": "task1_sub_span2",
+ "task1_sub_span4": None,
+ "task2": None,
+ }
+
+ def start_worker_and_scheduler(self):
Review Comment:
This doesn't start a worker anymore. Maybe it should be renamed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]