This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5d658c1e867 [Dataflow Streaming] Improve logs (#37152)
5d658c1e867 is described below
commit 5d658c1e86714c08fbbbb589d9d52862c62e9629
Author: Arun Pandian <[email protected]>
AuthorDate: Fri Dec 19 07:41:50 2025 -0800
[Dataflow Streaming] Improve logs (#37152)
---
.../worker/StreamingModeExecutionContext.java | 3 ++-
.../streaming/KeyCommitTooLargeException.java | 4 ++--
.../processing/failures/WorkFailureProcessor.java | 22 +++++++++++-----------
3 files changed, 15 insertions(+), 14 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index c8ff7840bd1..ddc047ddbaf 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -409,7 +409,8 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
try {
activeReader.close();
} catch (IOException e) {
- LOG.warn("Failed to close reader for {}-{}", computationId,
key.toStringUtf8(), e);
+ LOG.warn(
+ "Failed to close reader for {}-{}", computationId,
getWorkItem().getShardingKey(), e);
}
}
activeReader = null;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/KeyCommitTooLargeException.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/KeyCommitTooLargeException.java
index 76228b9092b..4d6ae8a208c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/KeyCommitTooLargeException.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/KeyCommitTooLargeException.java
@@ -26,8 +26,8 @@ public final class KeyCommitTooLargeException extends
Exception {
StringBuilder message = new StringBuilder();
message.append("Commit request for stage ");
message.append(computationId);
- message.append(" and key ");
- message.append(request.getKey().toStringUtf8());
+ message.append(" and sharding key ");
+ message.append(request.getShardingKey());
if (request.getSerializedSize() > 0) {
message.append(
" has size "
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java
index 9dab209a12a..0f0513b81c7 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java
@@ -136,13 +136,13 @@ public final class WorkFailureProcessor {
Throwable parsedException = (t instanceof UserCodeException && cause !=
null) ? cause : t;
if (KeyTokenInvalidException.isKeyTokenInvalidException(parsedException)) {
LOG.debug(
- "Execution of work for computation '{}' on key '{}' failed due to
token expiration. "
+ "Execution of work for computation '{}' on sharding key '{}' failed
due to token expiration. "
+ "Work will not be retried locally.",
computationId,
- work.getWorkItem().getKey().toStringUtf8());
+ work.getWorkItem().getShardingKey());
} else if
(WorkItemCancelledException.isWorkItemCancelledException(parsedException)) {
LOG.debug(
- "Execution of work for computation '{}' on key '{}' failed. "
+ "Execution of work for computation '{}' on sharding key '{}' failed.
"
+ "Work will not be retried locally.",
computationId,
work.getWorkItem().getShardingKey());
@@ -152,36 +152,36 @@ public final class WorkFailureProcessor {
Duration elapsedTimeSinceStart = new Duration(work.getStartTime(),
clock.get());
if (!failureTracker.trackFailure(computationId, work.getWorkItem(),
parsedException)) {
LOG.error(
- "Execution of work for computation '{}' on key '{}' failed with
uncaught exception, "
+ "Execution of work for computation '{}' on sharding key '{}'
failed with uncaught exception, "
+ "and Windmill indicated not to retry locally.",
computationId,
- work.getWorkItem().getKey().toStringUtf8(),
+ work.getWorkItem().getShardingKey(),
parsedException);
} else if (isOutOfMemoryError(parsedException)) {
String heapDump = tryToDumpHeap();
LOG.error(
- "Execution of work for computation '{}' for key '{}' failed with
out-of-memory. "
+ "Execution of work for computation '{}' for sharding key '{}'
failed with out-of-memory. "
+ "Work will not be retried locally. Heap dump {}.",
computationId,
- work.getWorkItem().getKey().toStringUtf8(),
+ work.getWorkItem().getShardingKey(),
heapDump,
parsedException);
} else if
(elapsedTimeSinceStart.isLongerThan(MAX_LOCAL_PROCESSING_RETRY_DURATION)) {
LOG.error(
- "Execution of work for computation '{}' for key '{}' failed with
uncaught exception, "
+ "Execution of work for computation '{}' for sharding key '{}'
failed with uncaught exception, "
+ "and it will not be retried locally because the elapsed time
since start {} "
+ "exceeds {}.",
computationId,
- work.getWorkItem().getKey().toStringUtf8(),
+ work.getWorkItem().getShardingKey(),
elapsedTimeSinceStart,
MAX_LOCAL_PROCESSING_RETRY_DURATION,
parsedException);
} else {
LOG.error(
- "Execution of work for computation '{}' on key '{}' failed with
uncaught exception. "
+ "Execution of work for computation '{}' on sharding key '{}'
failed with uncaught exception. "
+ "Work will be retried locally.",
computationId,
- work.getWorkItem().getKey().toStringUtf8(),
+ work.getWorkItem().getShardingKey(),
parsedException);
return true;
}