This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d658c1e867 [Dataflow Streaming] Improve logs (#37152)
5d658c1e867 is described below

commit 5d658c1e86714c08fbbbb589d9d52862c62e9629
Author: Arun Pandian <[email protected]>
AuthorDate: Fri Dec 19 07:41:50 2025 -0800

    [Dataflow Streaming] Improve logs (#37152)
---
 .../worker/StreamingModeExecutionContext.java      |  3 ++-
 .../streaming/KeyCommitTooLargeException.java      |  4 ++--
 .../processing/failures/WorkFailureProcessor.java  | 22 +++++++++++-----------
 3 files changed, 15 insertions(+), 14 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index c8ff7840bd1..ddc047ddbaf 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -409,7 +409,8 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
         try {
           activeReader.close();
         } catch (IOException e) {
-          LOG.warn("Failed to close reader for {}-{}", computationId, 
key.toStringUtf8(), e);
+          LOG.warn(
+              "Failed to close reader for {}-{}", computationId, 
getWorkItem().getShardingKey(), e);
         }
       }
       activeReader = null;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/KeyCommitTooLargeException.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/KeyCommitTooLargeException.java
index 76228b9092b..4d6ae8a208c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/KeyCommitTooLargeException.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/KeyCommitTooLargeException.java
@@ -26,8 +26,8 @@ public final class KeyCommitTooLargeException extends 
Exception {
     StringBuilder message = new StringBuilder();
     message.append("Commit request for stage ");
     message.append(computationId);
-    message.append(" and key ");
-    message.append(request.getKey().toStringUtf8());
+    message.append(" and sharding key ");
+    message.append(request.getShardingKey());
     if (request.getSerializedSize() > 0) {
       message.append(
           " has size "
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java
index 9dab209a12a..0f0513b81c7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java
@@ -136,13 +136,13 @@ public final class WorkFailureProcessor {
     Throwable parsedException = (t instanceof UserCodeException && cause != 
null) ? cause : t;
     if (KeyTokenInvalidException.isKeyTokenInvalidException(parsedException)) {
       LOG.debug(
-          "Execution of work for computation '{}' on key '{}' failed due to 
token expiration. "
+          "Execution of work for computation '{}' on sharding key '{}' failed 
due to token expiration. "
               + "Work will not be retried locally.",
           computationId,
-          work.getWorkItem().getKey().toStringUtf8());
+          work.getWorkItem().getShardingKey());
     } else if 
(WorkItemCancelledException.isWorkItemCancelledException(parsedException)) {
       LOG.debug(
-          "Execution of work for computation '{}' on key '{}' failed. "
+          "Execution of work for computation '{}' on sharding key '{}' failed. 
"
               + "Work will not be retried locally.",
           computationId,
           work.getWorkItem().getShardingKey());
@@ -152,36 +152,36 @@ public final class WorkFailureProcessor {
       Duration elapsedTimeSinceStart = new Duration(work.getStartTime(), 
clock.get());
       if (!failureTracker.trackFailure(computationId, work.getWorkItem(), 
parsedException)) {
         LOG.error(
-            "Execution of work for computation '{}' on key '{}' failed with 
uncaught exception, "
+            "Execution of work for computation '{}' on sharding key '{}' 
failed with uncaught exception, "
                 + "and Windmill indicated not to retry locally.",
             computationId,
-            work.getWorkItem().getKey().toStringUtf8(),
+            work.getWorkItem().getShardingKey(),
             parsedException);
       } else if (isOutOfMemoryError(parsedException)) {
         String heapDump = tryToDumpHeap();
         LOG.error(
-            "Execution of work for computation '{}' for key '{}' failed with 
out-of-memory. "
+            "Execution of work for computation '{}' for sharding key '{}' 
failed with out-of-memory. "
                 + "Work will not be retried locally. Heap dump {}.",
             computationId,
-            work.getWorkItem().getKey().toStringUtf8(),
+            work.getWorkItem().getShardingKey(),
             heapDump,
             parsedException);
       } else if 
(elapsedTimeSinceStart.isLongerThan(MAX_LOCAL_PROCESSING_RETRY_DURATION)) {
         LOG.error(
-            "Execution of work for computation '{}' for key '{}' failed with 
uncaught exception, "
+            "Execution of work for computation '{}' for sharding key '{}' 
failed with uncaught exception, "
                 + "and it will not be retried locally because the elapsed time 
since start {} "
                 + "exceeds {}.",
             computationId,
-            work.getWorkItem().getKey().toStringUtf8(),
+            work.getWorkItem().getShardingKey(),
             elapsedTimeSinceStart,
             MAX_LOCAL_PROCESSING_RETRY_DURATION,
             parsedException);
       } else {
         LOG.error(
-            "Execution of work for computation '{}' on key '{}' failed with 
uncaught exception. "
+            "Execution of work for computation '{}' on sharding key '{}' 
failed with uncaught exception. "
                 + "Work will be retried locally.",
             computationId,
-            work.getWorkItem().getKey().toStringUtf8(),
+            work.getWorkItem().getShardingKey(),
             parsedException);
         return true;
       }

Reply via email to