Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-04-29 Thread via GitHub


jason810496 closed pull request #45129: [Resolve OOM When Reading Large Logs in 
Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting 
Entire Log Records
URL: https://github.com/apache/airflow/pull/45129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-04-10 Thread via GitHub


Lee-W commented on PR #45129:
URL: https://github.com/apache/airflow/pull/45129#issuecomment-2785888447

   > Since the TaskHandler logger being migrate to structlog, I will create 
another PR for the refactor instead of resolve conflict on this one( too much 
code change and conflict on this path recently)
   
   If that's the case, maybe we could mark this as draft or close and create a 
new one instead?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-04-04 Thread via GitHub


github-actions[bot] commented on PR #45129:
URL: https://github.com/apache/airflow/pull/45129#issuecomment-2779940155

   This pull request has been automatically marked as stale because it has not 
had recent activity. It will be closed in 5 days if no further activity occurs. 
Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-02-18 Thread via GitHub


jason810496 commented on PR #45129:
URL: https://github.com/apache/airflow/pull/45129#issuecomment-2665017561

   > I'm still half way finish reviewing it. Left a few nitpicks, but the PR is 
great to be honest.
   
   Thanks, @Lee-W, for reviewing! I’ve just resolved those nits.  
   
   The CI failure is due to a flaky test:  
   ```
   FAILED 
tests/operators/test_trigger_dagrun.py::TestDagRunOperator::test_trigger_dagrun 
- AssertionError: assert equals failed
 '2025-02-18T08:18:13'  '2025-02-18T08:18:14'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-02-17 Thread via GitHub


jason810496 commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1959232169


##
airflow/utils/log/file_task_handler.py:
##
@@ -402,11 +588,29 @@ def _read(
 TaskInstanceState.RUNNING,
 TaskInstanceState.DEFERRED,
 )
+
+current_total_logs_size = local_logs_size + remote_logs_size + 
executor_logs_size + served_logs_size
+interleave_log_stream = _interleave_logs(
+*local_parsed_logs,
+*remote_parsed_logs,
+*(executor_parsed_logs or []),
+*served_parsed_logs,
+)
+
+# skip log stream until the last position
 if metadata and "log_pos" in metadata:
-previous_chars = metadata["log_pos"]
-logs = logs[previous_chars:]  # Cut off previously passed log test 
as new tail
-out_message = logs if "log_pos" in (metadata or {}) else messages + 
logs
-return out_message, {"end_of_log": end_of_log, "log_pos": log_pos}
+offset = metadata["log_pos"]
+for _ in range(offset):
+next(interleave_log_stream, None)
+
+out_stream: Iterable[str]
+if "log_pos" in (metadata or {}):

Review Comment:
   Both will produce the same result in this context, but yours is much better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-02-17 Thread via GitHub


jason810496 commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1959228856


##
airflow/utils/log/file_task_handler.py:
##
@@ -107,30 +131,111 @@ def _parse_timestamp(line: str):
 return pendulum.parse(timestamp_str.strip("[]"))
 
 
-def _parse_timestamps_in_log_file(lines: Iterable[str]):
-timestamp = None
-next_timestamp = None
-for idx, line in enumerate(lines):
-if line:
-with suppress(Exception):
-# next_timestamp unchanged if line can't be parsed
-next_timestamp = _parse_timestamp(line)
-if next_timestamp:
-timestamp = next_timestamp
-yield timestamp, idx, line
+def _get_parsed_log_stream(file_path: Path) -> _ParsedLogStreamType:
+with open(file_path) as f:
+line_num = 0  # line number for each log line
+for file_chunk in iter(partial(f.read, CHUNK_SIZE), b""):
+if not file_chunk:
+break
+# parse log lines
+lines = file_chunk.splitlines()
+timestamp = None
+next_timestamp = None
+for line in lines:
+if line:

Review Comment:
   After checking, there is not need for `if line:` part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-02-17 Thread via GitHub


jason810496 commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1959228128


##
airflow/executors/base_executor.py:
##
@@ -569,13 +569,20 @@ def execute_async(
 """
 raise NotImplementedError()
 
-def get_task_log(self, ti: TaskInstance, try_number: int) -> 
tuple[list[str], list[str]]:
+def get_task_log(
+self, ti: TaskInstance, try_number: int
+) -> (
+tuple[list[str], list[Generator[tuple[pendulum.DateTime | None, int, 
str], None, None]], int]
+| tuple[list[str], list[str]]

Review Comment:
   Sure, just imported type hit from `file_task_handler`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-02-17 Thread via GitHub


jason810496 commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1959055912


##
airflow/utils/log/file_task_handler.py:
##
@@ -352,47 +521,64 @@ def _read(
 # is needed to get correct log path.
 worker_log_rel_path = self._render_filename(ti, try_number)
 messages_list: list[str] = []
-remote_logs: list[str] = []
-local_logs: list[str] = []
+remote_parsed_logs: list[_ParsedLogStreamType] = []
+remote_logs_size = 0

Review Comment:
   These attributes are utilize for storing results from different log sources:
   
https://github.com/jason810496/airflow/blob/refactor/webserver-oom-for-large-log-read/airflow/utils/log/file_task_handler.py#L548-L580
   
   They are also used in the **interleaving** process:  
   
https://github.com/jason810496/airflow/blob/refactor/webserver-oom-for-large-log-read/airflow/utils/log/file_task_handler.py#L592-L599



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-02-17 Thread via GitHub


Lee-W commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1957799020


##
airflow/utils/log/file_task_handler.py:
##
@@ -159,6 +264,70 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, 
session) -> TaskInstance:
 return val
 
 
+def _get_compatible_parse_log_streams(remote_logs: list[str]) -> 
list[_ParsedLogStreamType]:
+"""
+Compatible utility for new log reading(stream-based + k-way merge log) and 
old log reading(read whole log in memory + sorting).
+
+Turn old log reading into new stream-based log reading.
+Will be removed after all providers adapt to stream-based log reading.
+
+:param remote_logs: list of log lines
+:return: parsed log streams if remote_logs is not empty, otherwise empty 
list
+"""
+if not remote_logs:
+# empty remote logs
+return []
+
+def _parse_log(logs: list[str]):
+timestamp = None
+next_timestamp = None
+for line_num, line in enumerate(logs):
+with suppress(Exception):
+# next_timestamp unchanged if line can't be parsed
+next_timestamp = _parse_timestamp(line)
+if next_timestamp:
+timestamp = next_timestamp
+yield timestamp, line_num, line
+
+return [_parse_log(remote_logs)]
+
+
+def _get_compatible_read_for_providers(read_response: tuple) -> 
tuple[Iterable[str], dict[str, Any]]:
+"""
+Compatible utility for transforming `_read` method return value for 
providers.
+
+Providers methods return type might be:
+- `tuple[str,dict[str,Any]]`
+- alibaba.cloud.log.oss_task_handler.OssTaskHandler
+- amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler
+- redis.log.redis_task_handler.RedisTaskHandler
+- `tuple[list[tuple[str,str]],dict[str,Any]]` ( 
tuple[list[host,log_documents],metadata] )
+- For this case, we need to split host and log_documents and put host 
into metadata
+- elasticsearch.log.es_task_handler.ElasticsearchTaskHandler
+- opensearch.log.os_task_handler.OpenSearchTaskHandler
+"""
+if len(read_response) != 2:
+raise ValueError("Unexpected return value from _read")
+# for tuple[str,dict[str,Any]]
+if isinstance(read_response[0], str):
+log_str, metadata = read_response
+return (log_str.splitlines(), metadata)
+
+# for tuple[list[tuple[str,str]],dict[str,Any]]
+if isinstance(read_response[0], list):
+host_by_logs, metadata = read_response
+if len(host_by_logs) > 0:

Review Comment:
   ```suggestion
   if len(host_by_logs):
   ```



##
airflow/executors/base_executor.py:
##
@@ -569,13 +569,20 @@ def execute_async(
 """
 raise NotImplementedError()
 
-def get_task_log(self, ti: TaskInstance, try_number: int) -> 
tuple[list[str], list[str]]:
+def get_task_log(
+self, ti: TaskInstance, try_number: int
+) -> (
+tuple[list[str], list[Generator[tuple[pendulum.DateTime | None, int, 
str], None, None]], int]
+| tuple[list[str], list[str]]

Review Comment:
   Should we make it a new type? TBH, this is a bit hard to read 🤔



##
airflow/utils/log/file_task_handler.py:
##
@@ -159,6 +264,70 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, 
session) -> TaskInstance:
 return val
 
 
+def _get_compatible_parse_log_streams(remote_logs: list[str]) -> 
list[_ParsedLogStreamType]:
+"""
+Compatible utility for new log reading(stream-based + k-way merge log) and 
old log reading(read whole log in memory + sorting).
+
+Turn old log reading into new stream-based log reading.
+Will be removed after all providers adapt to stream-based log reading.
+
+:param remote_logs: list of log lines
+:return: parsed log streams if remote_logs is not empty, otherwise empty 
list
+"""
+if not remote_logs:
+# empty remote logs
+return []
+
+def _parse_log(logs: list[str]):

Review Comment:
   miss return type



##
airflow/utils/log/file_task_handler.py:
##
@@ -439,43 +643,60 @@ def _get_log_retrieval_url(
 log_relative_path,
 )
 
-def read(self, task_instance, try_number=None, metadata=None):
+def read(
+self, task_instance, try_number=None, metadata=None
+) -> tuple[list[str], list[Generator[str, None, None]], list[dict[str, 
Any]]]:
 """
 Read logs of given task instance from local machine.
 
 :param task_instance: task instance object
 :param try_number: task instance try_number to read logs from. If None
it returns all logs separated by try_number
 :param metadata: log metadata, can be used for steaming log reading 
and auto-tailing.
-:return: a list of listed tuples which order log string by host
+:return: tuple of hosts, log streams, and metadata_array

Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-02-16 Thread via GitHub


jason810496 commented on PR #45129:
URL: https://github.com/apache/airflow/pull/45129#issuecomment-2661782763

   Hi @dstandish @ashb, hope you're doing well! Could you please review this PR 
when you have some time? Thanks! 🙏


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-02-08 Thread via GitHub


jason810496 commented on PR #45129:
URL: https://github.com/apache/airflow/pull/45129#issuecomment-2645266948

   Hi @potiuk, may I ask whether this PR is considered a refactor for 3.0 or 
2.10? I saw the 3.0 feature freeze mentioned in the dev list, so I’m not sure 
which version this PR will be counted for.
   
   Here is the related discussion on Slack: 
https://apache-airflow.slack.com/archives/CCZRF2U5A/p1736767159693839


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-01-25 Thread via GitHub


potiuk commented on PR #45129:
URL: https://github.com/apache/airflow/pull/45129#issuecomment-2614106600

   @dstandish @ashb - can we merge it ? That one seems like a good cnadidate 
for 2.10.5 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-01-04 Thread via GitHub


jason810496 commented on PR #45129:
URL: https://github.com/apache/airflow/pull/45129#issuecomment-2571537145

   Hi @dstandish,  
   
   Hope you're doing well, and Happy New Year! Would you mind taking a look at 
this PR when you have a moment? Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-01-01 Thread via GitHub


potiuk commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1900428294


##
airflow/utils/log/file_task_handler.py:
##
@@ -162,6 +266,70 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, 
session) -> TaskInstance:
 return val
 
 
+def _get_compatible_parse_log_streams(remote_logs: list[str]) -> 
list[_ParsedLogStreamType]:

Review Comment:
   Thanks for responding to compatobilty expectations!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-01-01 Thread via GitHub


potiuk commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1900428161


##
airflow/utils/log/file_task_handler.py:
##
@@ -110,30 +133,111 @@ def _parse_timestamp(line: str):
 return pendulum.parse(timestamp_str.strip("[]"))
 
 
-def _parse_timestamps_in_log_file(lines: Iterable[str]):
-timestamp = None
-next_timestamp = None
-for idx, line in enumerate(lines):
-if line:
-with suppress(Exception):
-# next_timestamp unchanged if line can't be parsed
-next_timestamp = _parse_timestamp(line)
-if next_timestamp:
-timestamp = next_timestamp
-yield timestamp, idx, line
+def _get_parsed_log_stream(file_path: Path) -> _ParsedLogStreamType:
+with open(file_path) as f:
+line_num = 0  # line number for each log line
+for file_chunk in iter(partial(f.read, CHUNK_SIZE), b""):
+if not file_chunk:
+break
+# parse log lines
+lines = file_chunk.splitlines()
+timestamp = None
+next_timestamp = None
+for line in lines:
+if line:
+with suppress(Exception):
+# next_timestamp unchanged if line can't be parsed
+next_timestamp = _parse_timestamp(line)
+if next_timestamp:
+timestamp = next_timestamp
+
+yield timestamp, line_num, line
+line_num += 1
+
+
+def _sort_key(timestamp: pendulum.DateTime | None, line_num: int) -> int:
+"""
+Generate a sort key for log record, to be used in K-way merge.
+
+:param timestamp: timestamp of the log line
+:param line_num: line number of the log line
+:return: a integer as sort key to avoid overhead of memory usage
+"""
+return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) * 
SORT_KEY_OFFSET + line_num
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+heap: list[tuple[int, str]],
+parsed_log_streams: list[_ParsedLogStreamType],
+) -> None:
+"""
+Add one log record from each parsed log stream to the heap.
 
+Remove any empty log stream from the list while iterating.
 
-def _interleave_logs(*logs):
-records = []
-for log in logs:
-records.extend(_parse_timestamps_in_log_file(log.splitlines()))
+:param heap: heap to store log records
+:param parsed_log_streams: list of parsed log streams
+"""
+for log_stream in parsed_log_streams:
+record: _ParsedLogRecordType | None = next(log_stream, None)
+if record is None:
+parsed_log_streams.remove(log_stream)
+continue
+timestamp, line_num, line = record
+# take int as sort key to avoid overhead of memory usage
+heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+
+
+def _interleave_logs(*parsed_log_streams: _ParsedLogStreamType) -> 
Generator[str, None, None]:
+"""
+Merge parsed log streams using K-way merge.
+
+By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we 
can reduce the chance of messing up the global order.
+Since there are multiple log streams, we can't guarantee that the records 
are in global order.
+
+e.g.
+
+log_stream1: --
+log_stream2:   
+log_stream3: 
+
+The first record of log_stream3 is later than the fourth record of 
log_stream1 !
+
+:param parsed_log_streams: parsed log streams
+:return: interleaved log stream
+"""
+# don't need to push whole tuple into heap, which increases too much 
overhead
+# push only sort_key and line into heap

Review Comment:
   Nice optimisation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-01-01 Thread via GitHub


potiuk commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1900427959


##
airflow/utils/log/file_task_handler.py:
##
@@ -110,30 +133,111 @@ def _parse_timestamp(line: str):
 return pendulum.parse(timestamp_str.strip("[]"))
 
 
-def _parse_timestamps_in_log_file(lines: Iterable[str]):
-timestamp = None
-next_timestamp = None
-for idx, line in enumerate(lines):
-if line:
-with suppress(Exception):
-# next_timestamp unchanged if line can't be parsed
-next_timestamp = _parse_timestamp(line)
-if next_timestamp:
-timestamp = next_timestamp
-yield timestamp, idx, line
+def _get_parsed_log_stream(file_path: Path) -> _ParsedLogStreamType:
+with open(file_path) as f:
+line_num = 0  # line number for each log line
+for file_chunk in iter(partial(f.read, CHUNK_SIZE), b""):
+if not file_chunk:
+break
+# parse log lines
+lines = file_chunk.splitlines()
+timestamp = None
+next_timestamp = None
+for line in lines:
+if line:
+with suppress(Exception):
+# next_timestamp unchanged if line can't be parsed
+next_timestamp = _parse_timestamp(line)
+if next_timestamp:
+timestamp = next_timestamp
+
+yield timestamp, line_num, line
+line_num += 1
+
+
+def _sort_key(timestamp: pendulum.DateTime | None, line_num: int) -> int:
+"""
+Generate a sort key for log record, to be used in K-way merge.
+
+:param timestamp: timestamp of the log line
+:param line_num: line number of the log line
+:return: a integer as sort key to avoid overhead of memory usage
+"""
+return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) * 
SORT_KEY_OFFSET + line_num
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+heap: list[tuple[int, str]],
+parsed_log_streams: list[_ParsedLogStreamType],
+) -> None:
+"""
+Add one log record from each parsed log stream to the heap.
 
+Remove any empty log stream from the list while iterating.
 
-def _interleave_logs(*logs):
-records = []
-for log in logs:
-records.extend(_parse_timestamps_in_log_file(log.splitlines()))
+:param heap: heap to store log records
+:param parsed_log_streams: list of parsed log streams
+"""
+for log_stream in parsed_log_streams:
+record: _ParsedLogRecordType | None = next(log_stream, None)
+if record is None:
+parsed_log_streams.remove(log_stream)
+continue
+timestamp, line_num, line = record
+# take int as sort key to avoid overhead of memory usage
+heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+
+
+def _interleave_logs(*parsed_log_streams: _ParsedLogStreamType) -> 
Generator[str, None, None]:
+"""
+Merge parsed log streams using K-way merge.
+
+By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we 
can reduce the chance of messing up the global order.
+Since there are multiple log streams, we can't guarantee that the records 
are in global order.

Review Comment:
   Nice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2025-01-01 Thread via GitHub


potiuk commented on code in PR #45129:
URL: https://github.com/apache/airflow/pull/45129#discussion_r1900428066


##
airflow/utils/log/file_task_handler.py:
##
@@ -110,30 +133,111 @@ def _parse_timestamp(line: str):
 return pendulum.parse(timestamp_str.strip("[]"))
 
 
-def _parse_timestamps_in_log_file(lines: Iterable[str]):
-timestamp = None
-next_timestamp = None
-for idx, line in enumerate(lines):
-if line:
-with suppress(Exception):
-# next_timestamp unchanged if line can't be parsed
-next_timestamp = _parse_timestamp(line)
-if next_timestamp:
-timestamp = next_timestamp
-yield timestamp, idx, line
+def _get_parsed_log_stream(file_path: Path) -> _ParsedLogStreamType:
+with open(file_path) as f:
+line_num = 0  # line number for each log line
+for file_chunk in iter(partial(f.read, CHUNK_SIZE), b""):
+if not file_chunk:
+break
+# parse log lines
+lines = file_chunk.splitlines()
+timestamp = None
+next_timestamp = None
+for line in lines:
+if line:
+with suppress(Exception):
+# next_timestamp unchanged if line can't be parsed
+next_timestamp = _parse_timestamp(line)
+if next_timestamp:
+timestamp = next_timestamp
+
+yield timestamp, line_num, line
+line_num += 1
+
+
+def _sort_key(timestamp: pendulum.DateTime | None, line_num: int) -> int:
+"""
+Generate a sort key for log record, to be used in K-way merge.
+
+:param timestamp: timestamp of the log line
+:param line_num: line number of the log line
+:return: a integer as sort key to avoid overhead of memory usage
+"""
+return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) * 
SORT_KEY_OFFSET + line_num
+
+
+def _add_log_from_parsed_log_streams_to_heap(
+heap: list[tuple[int, str]],
+parsed_log_streams: list[_ParsedLogStreamType],
+) -> None:
+"""
+Add one log record from each parsed log stream to the heap.
 
+Remove any empty log stream from the list while iterating.
 
-def _interleave_logs(*logs):
-records = []
-for log in logs:
-records.extend(_parse_timestamps_in_log_file(log.splitlines()))
+:param heap: heap to store log records
+:param parsed_log_streams: list of parsed log streams
+"""
+for log_stream in parsed_log_streams:
+record: _ParsedLogRecordType | None = next(log_stream, None)
+if record is None:
+parsed_log_streams.remove(log_stream)
+continue
+timestamp, line_num, line = record
+# take int as sort key to avoid overhead of memory usage
+heapq.heappush(heap, (_sort_key(timestamp, line_num), line))
+
+
+def _interleave_logs(*parsed_log_streams: _ParsedLogStreamType) -> 
Generator[str, None, None]:
+"""
+Merge parsed log streams using K-way merge.
+
+By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we 
can reduce the chance of messing up the global order.
+Since there are multiple log streams, we can't guarantee that the records 
are in global order.

Review Comment:
   Thanks for the visual explanation here!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Resolve OOM When Reading Large Logs in Webserver] Refactor to Use K-Way Merge for Log Streams Instead of Sorting Entire Log Records [airflow]

2024-12-25 Thread via GitHub


jason810496 commented on PR #45129:
URL: https://github.com/apache/airflow/pull/45129#issuecomment-2562218329

   Finally fixed the tests!  
   
   This is the first (and likely the largest) PR for resolving OOM issues when 
reading large logs in the webserver.
   Further PRs will only focus on refactoring each provider, as listed in the 
TODO tasks in #45079.  
   
   Even though the providers haven't yet been refactored to support 
stream-based log reading, the compatibility utility will transform the old 
`read` log method (which returns the entire list of logs) into a stream-based 
approach. Once all providers are refactored to use stream-based reading, the 
compatibility utility can be removed.  
   
   For the testing part:  
   Since the CI will run provider compatibility tests for versions 2.9.3 and 
2.10.3, my approach is to copy the old test cases related to log reading into 
new stream-based tests. I’ve added the `mark_test_for_old_read_log_method` and 
`mark_test_for_stream_based_read_log_method` pytest decorators to selectively 
skip the corresponding test runs.
   From my perspective, this approach is simpler and minimizes changes to the 
original test logic. Additionally, tests marked with 
`mark_test_for_old_read_log_method` can be safely removed once all providers 
migrate to stream-based reading.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]