Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
jason810496 closed pull request #45129: [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records URL: https://github.com/apache/airflow/pull/45129 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
Lee-W commented on PR #45129: URL: https://github.com/apache/airflow/pull/45129#issuecomment-2785888447 > Since the TaskHandler logger being migrate to structlog, I will create another PR for the refactor instead of resolve conflict on this one( too much code change and conflict on this path recently) If that's the case, maybe we could mark this as draft or close and create a new one instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
github-actions[bot] commented on PR #45129: URL: https://github.com/apache/airflow/pull/45129#issuecomment-2779940155 This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
jason810496 commented on PR #45129: URL: https://github.com/apache/airflow/pull/45129#issuecomment-2665017561 > I'm still half way finish reviewing it. Left a few nitpicks, but the PR is great to be honest. Thanks, @Lee-W, for reviewing! I’ve just resolved those nits. The CI failure is due to a flaky test: ``` FAILED tests/operators/test_trigger_dagrun.py::TestDagRunOperator::test_trigger_dagrun - AssertionError: assert equals failed '2025-02-18T08:18:13' '2025-02-18T08:18:14' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
jason810496 commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1959232169
##
airflow/utils/log/file_task_handler.py:
##
@@ -402,11 +588,29 @@ def _read(
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
)
+
+current_total_logs_size = local_logs_size + remote_logs_size +
executor_logs_size + served_logs_size
+interleave_log_stream = _interleave_logs(
+*local_parsed_logs,
+*remote_parsed_logs,
+*(executor_parsed_logs or []),
+*served_parsed_logs,
+)
+
+# skip log stream until the last position
if metadata and "log_pos" in metadata:
-previous_chars = metadata["log_pos"]
-logs = logs[previous_chars:] # Cut off previously passed log test
as new tail
-out_message = logs if "log_pos" in (metadata or {}) else messages +
logs
-return out_message, {"end_of_log": end_of_log, "log_pos": log_pos}
+offset = metadata["log_pos"]
+for _ in range(offset):
+next(interleave_log_stream, None)
+
+out_stream: Iterable[str]
+if "log_pos" in (metadata or {}):
Review Comment:
Both will produce the same result in this context, but yours is much better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
jason810496 commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1959228856
##
airflow/utils/log/file_task_handler.py:
##
@@ -107,30 +131,111 @@ def _parse_timestamp(line: str):
return pendulum.parse(timestamp_str.strip("[]"))
-def _parse_timestamps_in_log_file(lines: Iterable[str]):
-timestamp = None
-next_timestamp = None
-for idx, line in enumerate(lines):
-if line:
-with suppress(Exception):
-# next_timestamp unchanged if line can't be parsed
-next_timestamp = _parse_timestamp(line)
-if next_timestamp:
-timestamp = next_timestamp
-yield timestamp, idx, line
+def _get_parsed_log_stream(file_path: Path) -> _ParsedLogStreamType:
+with open(file_path) as f:
+line_num = 0 # line number for each log line
+for file_chunk in iter(partial(f.read, CHUNK_SIZE), b""):
+if not file_chunk:
+break
+# parse log lines
+lines = file_chunk.splitlines()
+timestamp = None
+next_timestamp = None
+for line in lines:
+if line:
Review Comment:
After checking, there is not need for `if line:` part.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
jason810496 commented on code in PR #45129: URL: https://github.com/apache/airflow/pull/45129#discussion_r1959228128 ## airflow/executors/base_executor.py: ## @@ -569,13 +569,20 @@ def execute_async( """ raise NotImplementedError() -def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: +def get_task_log( +self, ti: TaskInstance, try_number: int +) -> ( +tuple[list[str], list[Generator[tuple[pendulum.DateTime | None, int, str], None, None]], int] +| tuple[list[str], list[str]] Review Comment: Sure, just imported type hit from `file_task_handler`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
jason810496 commented on code in PR #45129: URL: https://github.com/apache/airflow/pull/45129#discussion_r1959055912 ## airflow/utils/log/file_task_handler.py: ## @@ -352,47 +521,64 @@ def _read( # is needed to get correct log path. worker_log_rel_path = self._render_filename(ti, try_number) messages_list: list[str] = [] -remote_logs: list[str] = [] -local_logs: list[str] = [] +remote_parsed_logs: list[_ParsedLogStreamType] = [] +remote_logs_size = 0 Review Comment: These attributes are utilize for storing results from different log sources: https://github.com/jason810496/airflow/blob/refactor/webserver-oom-for-large-log-read/airflow/utils/log/file_task_handler.py#L548-L580 They are also used in the **interleaving** process: https://github.com/jason810496/airflow/blob/refactor/webserver-oom-for-large-log-read/airflow/utils/log/file_task_handler.py#L592-L599 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
Lee-W commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1957799020
##
airflow/utils/log/file_task_handler.py:
##
@@ -159,6 +264,70 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance,
session) -> TaskInstance:
return val
+def _get_compatible_parse_log_streams(remote_logs: list[str]) ->
list[_ParsedLogStreamType]:
+"""
+Compatible utility for new log reading(stream-based + k-way merge log) and
old log reading(read whole log in memory + sorting).
+
+Turn old log reading into new stream-based log reading.
+Will be removed after all providers adapt to stream-based log reading.
+
+:param remote_logs: list of log lines
+:return: parsed log streams if remote_logs is not empty, otherwise empty
list
+"""
+if not remote_logs:
+# empty remote logs
+return []
+
+def _parse_log(logs: list[str]):
+timestamp = None
+next_timestamp = None
+for line_num, line in enumerate(logs):
+with suppress(Exception):
+# next_timestamp unchanged if line can't be parsed
+next_timestamp = _parse_timestamp(line)
+if next_timestamp:
+timestamp = next_timestamp
+yield timestamp, line_num, line
+
+return [_parse_log(remote_logs)]
+
+
+def _get_compatible_read_for_providers(read_response: tuple) ->
tuple[Iterable[str], dict[str, Any]]:
+"""
+Compatible utility for transforming `_read` method return value for
providers.
+
+Providers methods return type might be:
+- `tuple[str,dict[str,Any]]`
+- alibaba.cloud.log.oss_task_handler.OssTaskHandler
+- amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler
+- redis.log.redis_task_handler.RedisTaskHandler
+- `tuple[list[tuple[str,str]],dict[str,Any]]` (
tuple[list[host,log_documents],metadata] )
+- For this case, we need to split host and log_documents and put host
into metadata
+- elasticsearch.log.es_task_handler.ElasticsearchTaskHandler
+- opensearch.log.os_task_handler.OpenSearchTaskHandler
+"""
+if len(read_response) != 2:
+raise ValueError("Unexpected return value from _read")
+# for tuple[str,dict[str,Any]]
+if isinstance(read_response[0], str):
+log_str, metadata = read_response
+return (log_str.splitlines(), metadata)
+
+# for tuple[list[tuple[str,str]],dict[str,Any]]
+if isinstance(read_response[0], list):
+host_by_logs, metadata = read_response
+if len(host_by_logs) > 0:
Review Comment:
```suggestion
if len(host_by_logs):
```
##
airflow/executors/base_executor.py:
##
@@ -569,13 +569,20 @@ def execute_async(
"""
raise NotImplementedError()
-def get_task_log(self, ti: TaskInstance, try_number: int) ->
tuple[list[str], list[str]]:
+def get_task_log(
+self, ti: TaskInstance, try_number: int
+) -> (
+tuple[list[str], list[Generator[tuple[pendulum.DateTime | None, int,
str], None, None]], int]
+| tuple[list[str], list[str]]
Review Comment:
Should we make it a new type? TBH, this is a bit hard to read 🤔
##
airflow/utils/log/file_task_handler.py:
##
@@ -159,6 +264,70 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance,
session) -> TaskInstance:
return val
+def _get_compatible_parse_log_streams(remote_logs: list[str]) ->
list[_ParsedLogStreamType]:
+"""
+Compatible utility for new log reading(stream-based + k-way merge log) and
old log reading(read whole log in memory + sorting).
+
+Turn old log reading into new stream-based log reading.
+Will be removed after all providers adapt to stream-based log reading.
+
+:param remote_logs: list of log lines
+:return: parsed log streams if remote_logs is not empty, otherwise empty
list
+"""
+if not remote_logs:
+# empty remote logs
+return []
+
+def _parse_log(logs: list[str]):
Review Comment:
miss return type
##
airflow/utils/log/file_task_handler.py:
##
@@ -439,43 +643,60 @@ def _get_log_retrieval_url(
log_relative_path,
)
-def read(self, task_instance, try_number=None, metadata=None):
+def read(
+self, task_instance, try_number=None, metadata=None
+) -> tuple[list[str], list[Generator[str, None, None]], list[dict[str,
Any]]]:
"""
Read logs of given task instance from local machine.
:param task_instance: task instance object
:param try_number: task instance try_number to read logs from. If None
it returns all logs separated by try_number
:param metadata: log metadata, can be used for steaming log reading
and auto-tailing.
-:return: a list of listed tuples which order log string by host
+:return: tuple of hosts, log streams, and metadata_array
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
jason810496 commented on PR #45129: URL: https://github.com/apache/airflow/pull/45129#issuecomment-2661782763 Hi @dstandish @ashb, hope you're doing well! Could you please review this PR when you have some time? Thanks! 🙏 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
jason810496 commented on PR #45129: URL: https://github.com/apache/airflow/pull/45129#issuecomment-2645266948 Hi @potiuk, may I ask whether this PR is considered a refactor for 3.0 or 2.10? I saw the 3.0 feature freeze mentioned in the dev list, so I’m not sure which version this PR will be counted for. Here is the related discussion on Slack: https://apache-airflow.slack.com/archives/CCZRF2U5A/p1736767159693839 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
potiuk commented on PR #45129: URL: https://github.com/apache/airflow/pull/45129#issuecomment-2614106600 @dstandish @ashb - can we merge it ? That one seems like a good cnadidate for 2.10.5 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
jason810496 commented on PR #45129: URL: https://github.com/apache/airflow/pull/45129#issuecomment-2571537145 Hi @dstandish, Hope you're doing well, and Happy New Year! Would you mind taking a look at this PR when you have a moment? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
potiuk commented on code in PR #45129: URL: https://github.com/apache/airflow/pull/45129#discussion_r1900428294 ## airflow/utils/log/file_task_handler.py: ## @@ -162,6 +266,70 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance: return val +def _get_compatible_parse_log_streams(remote_logs: list[str]) -> list[_ParsedLogStreamType]: Review Comment: Thanks for responding to compatobilty expectations! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
potiuk commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1900428161
##
airflow/utils/log/file_task_handler.py:
##
@@ -110,30 +133,111 @@ def _parse_timestamp(line: str):
return pendulum.parse(timestamp_str.strip("[]"))
-def _parse_timestamps_in_log_file(lines: Iterable[str]):
-timestamp = None
-next_timestamp = None
-for idx, line in enumerate(lines):
-if line:
-with suppress(Exception):
-# next_timestamp unchanged if line can't be parsed
-next_timestamp = _parse_timestamp(line)
-if next_timestamp:
-timestamp = next_timestamp
-yield timestamp, idx, line
+def _get_parsed_log_stream(file_path: Path) -> _ParsedLogStreamType:
+with open(file_path) as f:
+line_num = 0 # line number for each log line
+for file_chunk in iter(partial(f.read, CHUNK_SIZE), b""):
+if not file_chunk:
+break
+# parse log lines
+lines = file_chunk.splitlines()
+timestamp = None
+next_timestamp = None
+for line in lines:
+if line:
+with suppress(Exception):
+# next_timestamp unchanged if line can't be parsed
+next_timestamp = _parse_timestamp(line)
+if next_timestamp:
+timestamp = next_timestamp
+
+yield timestamp, line_num, line
+line_num += 1
+
+
+def _sort_key(timestamp: pendulum.DateTime | None, line_num: int) -> int:
+"""
+Generate a sort key for log record, to be used in K-way merge.
+
+:param timestamp: timestamp of the log line
+:param line_num: line number of the log line
+:return: a integer as sort key to avoid overhead of memory usage
+"""
+return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+heap: list[tuple[int, str]],
+parsed_log_streams: list[_ParsedLogStreamType],
+) -> None:
+"""
+Add one log record from each parsed log stream to the heap.
+Remove any empty log stream from the list while iterating.
-def _interleave_logs(*logs):
-records = []
-for log in logs:
-records.extend(_parse_timestamps_in_log_file(log.splitlines()))
+:param heap: heap to store log records
+:param parsed_log_streams: list of parsed log streams
+"""
+for log_stream in parsed_log_streams:
+record: _ParsedLogRecordType | None = next(log_stream, None)
+if record is None:
+parsed_log_streams.remove(log_stream)
+continue
+timestamp, line_num, line = record
+# take int as sort key to avoid overhead of memory usage
+heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+
+
+def _interleave_logs(*parsed_log_streams: _ParsedLogStreamType) ->
Generator[str, None, None]:
+"""
+Merge parsed log streams using K-way merge.
+
+By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we
can reduce the chance of messing up the global order.
+Since there are multiple log streams, we can't guarantee that the records
are in global order.
+
+e.g.
+
+log_stream1: --
+log_stream2:
+log_stream3:
+
+The first record of log_stream3 is later than the fourth record of
log_stream1 !
+
+:param parsed_log_streams: parsed log streams
+:return: interleaved log stream
+"""
+# don't need to push whole tuple into heap, which increases too much
overhead
+# push only sort_key and line into heap
Review Comment:
Nice optimisation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
potiuk commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1900427959
##
airflow/utils/log/file_task_handler.py:
##
@@ -110,30 +133,111 @@ def _parse_timestamp(line: str):
return pendulum.parse(timestamp_str.strip("[]"))
-def _parse_timestamps_in_log_file(lines: Iterable[str]):
-timestamp = None
-next_timestamp = None
-for idx, line in enumerate(lines):
-if line:
-with suppress(Exception):
-# next_timestamp unchanged if line can't be parsed
-next_timestamp = _parse_timestamp(line)
-if next_timestamp:
-timestamp = next_timestamp
-yield timestamp, idx, line
+def _get_parsed_log_stream(file_path: Path) -> _ParsedLogStreamType:
+with open(file_path) as f:
+line_num = 0 # line number for each log line
+for file_chunk in iter(partial(f.read, CHUNK_SIZE), b""):
+if not file_chunk:
+break
+# parse log lines
+lines = file_chunk.splitlines()
+timestamp = None
+next_timestamp = None
+for line in lines:
+if line:
+with suppress(Exception):
+# next_timestamp unchanged if line can't be parsed
+next_timestamp = _parse_timestamp(line)
+if next_timestamp:
+timestamp = next_timestamp
+
+yield timestamp, line_num, line
+line_num += 1
+
+
+def _sort_key(timestamp: pendulum.DateTime | None, line_num: int) -> int:
+"""
+Generate a sort key for log record, to be used in K-way merge.
+
+:param timestamp: timestamp of the log line
+:param line_num: line number of the log line
+:return: a integer as sort key to avoid overhead of memory usage
+"""
+return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+heap: list[tuple[int, str]],
+parsed_log_streams: list[_ParsedLogStreamType],
+) -> None:
+"""
+Add one log record from each parsed log stream to the heap.
+Remove any empty log stream from the list while iterating.
-def _interleave_logs(*logs):
-records = []
-for log in logs:
-records.extend(_parse_timestamps_in_log_file(log.splitlines()))
+:param heap: heap to store log records
+:param parsed_log_streams: list of parsed log streams
+"""
+for log_stream in parsed_log_streams:
+record: _ParsedLogRecordType | None = next(log_stream, None)
+if record is None:
+parsed_log_streams.remove(log_stream)
+continue
+timestamp, line_num, line = record
+# take int as sort key to avoid overhead of memory usage
+heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+
+
+def _interleave_logs(*parsed_log_streams: _ParsedLogStreamType) ->
Generator[str, None, None]:
+"""
+Merge parsed log streams using K-way merge.
+
+By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we
can reduce the chance of messing up the global order.
+Since there are multiple log streams, we can't guarantee that the records
are in global order.
Review Comment:
Nice
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
potiuk commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1900428066
##
airflow/utils/log/file_task_handler.py:
##
@@ -110,30 +133,111 @@ def _parse_timestamp(line: str):
return pendulum.parse(timestamp_str.strip("[]"))
-def _parse_timestamps_in_log_file(lines: Iterable[str]):
-timestamp = None
-next_timestamp = None
-for idx, line in enumerate(lines):
-if line:
-with suppress(Exception):
-# next_timestamp unchanged if line can't be parsed
-next_timestamp = _parse_timestamp(line)
-if next_timestamp:
-timestamp = next_timestamp
-yield timestamp, idx, line
+def _get_parsed_log_stream(file_path: Path) -> _ParsedLogStreamType:
+with open(file_path) as f:
+line_num = 0 # line number for each log line
+for file_chunk in iter(partial(f.read, CHUNK_SIZE), b""):
+if not file_chunk:
+break
+# parse log lines
+lines = file_chunk.splitlines()
+timestamp = None
+next_timestamp = None
+for line in lines:
+if line:
+with suppress(Exception):
+# next_timestamp unchanged if line can't be parsed
+next_timestamp = _parse_timestamp(line)
+if next_timestamp:
+timestamp = next_timestamp
+
+yield timestamp, line_num, line
+line_num += 1
+
+
+def _sort_key(timestamp: pendulum.DateTime | None, line_num: int) -> int:
+"""
+Generate a sort key for log record, to be used in K-way merge.
+
+:param timestamp: timestamp of the log line
+:param line_num: line number of the log line
+:return: a integer as sort key to avoid overhead of memory usage
+"""
+return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) *
SORT_KEY_OFFSET + line_num
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+heap: list[tuple[int, str]],
+parsed_log_streams: list[_ParsedLogStreamType],
+) -> None:
+"""
+Add one log record from each parsed log stream to the heap.
+Remove any empty log stream from the list while iterating.
-def _interleave_logs(*logs):
-records = []
-for log in logs:
-records.extend(_parse_timestamps_in_log_file(log.splitlines()))
+:param heap: heap to store log records
+:param parsed_log_streams: list of parsed log streams
+"""
+for log_stream in parsed_log_streams:
+record: _ParsedLogRecordType | None = next(log_stream, None)
+if record is None:
+parsed_log_streams.remove(log_stream)
+continue
+timestamp, line_num, line = record
+# take int as sort key to avoid overhead of memory usage
+heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+
+
+def _interleave_logs(*parsed_log_streams: _ParsedLogStreamType) ->
Generator[str, None, None]:
+"""
+Merge parsed log streams using K-way merge.
+
+By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we
can reduce the chance of messing up the global order.
+Since there are multiple log streams, we can't guarantee that the records
are in global order.
Review Comment:
Thanks for the visual explanation here!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]
jason810496 commented on PR #45129: URL: https://github.com/apache/airflow/pull/45129#issuecomment-2562218329 Finally fixed the tests! This is the first (and likely the largest) PR for resolving OOM issues when reading large logs in the webserver. Further PRs will only focus on refactoring each provider, as listed in the TODO tasks in #45079. Even though the providers haven't yet been refactored to support stream-based log reading, the compatibility utility will transform the old `read` log method (which returns the entire list of logs) into a stream-based approach. Once all providers are refactored to use stream-based reading, the compatibility utility can be removed. For the testing part: Since the CI will run provider compatibility tests for versions 2.9.3 and 2.10.3, my approach is to copy the old test cases related to log reading into new stream-based tests. I’ve added the `mark_test_for_old_read_log_method` and `mark_test_for_stream_based_read_log_method` pytest decorators to selectively skip the corresponding test runs. From my perspective, this approach is simpler and minimizes changes to the original test logic. Additionally, tests marked with `mark_test_for_old_read_log_method` can be safely removed once all providers migrate to stream-based reading. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
