This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e655f5c5990 Support log level override for direct runner (#37100)
e655f5c5990 is described below

commit e655f5c59908a76229d7a943451bcdc180d80849
Author: Yi Hu <[email protected]>
AuthorDate: Mon Dec 22 14:06:43 2025 -0500

    Support log level override for direct runner (#37100)
---
 .../apache/beam/runners/direct/DirectRunner.java   |  3 +-
 .../beam/runners/direct/DirectRunnerTest.java      | 47 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index e27b6a618c3..874cb96293c 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
@@ -184,7 +185,7 @@ public class DirectRunner extends 
PipelineRunner<DirectPipelineResult> {
 
       DisplayDataValidator.validatePipeline(pipeline);
       DisplayDataValidator.validateOptions(options);
-
+      
SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class));
       ExecutorService metricsPool =
           Executors.newCachedThreadPool(
               new ThreadFactoryBuilder()
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index dc45de20002..7131247c3d7 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -49,6 +49,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.LogManager;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -75,6 +77,7 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -744,6 +747,50 @@ public class DirectRunnerTest implements Serializable {
     void setIgnoredField(String value);
   }
 
+  @Test
+  public void testLogLevel() {
+    PipelineOptions options =
+        PipelineOptionsFactory.fromArgs(
+                new String[] {
+                  "--runner=DirectRunner",
+                  "--defaultSdkHarnessLogLevel=ERROR",
+                  
"--sdkHarnessLogLevelOverrides={\"org.apache.beam.runners.direct.DirectRunnerTest\":\"INFO\"}"
+                })
+            .create();
+    Pipeline pipeline = Pipeline.create(options);
+
+    LogManager logManager = LogManager.getLogManager();
+    // use full name to avoid conflicts with org.slf4j.Logger
+    java.util.logging.Logger rootLogger = logManager.getLogger("");
+    Level originalLevel = rootLogger.getLevel();
+
+    try {
+      pipeline
+          .apply(Impulse.create())
+          .apply(
+              ParDo.of(
+                  new DoFn<byte[], byte[]>() {
+                    @ProcessElement
+                    public void process(@Element byte[] element, 
OutputReceiver<byte[]> o) {
+                      LogManager logManager = LogManager.getLogManager();
+                      java.util.logging.Logger rootLogger = 
logManager.getLogger("");
+                      // check loglevel here. Whether actual logs are rendered 
depends on slf4j impl
+                      // and upstream configs.
+                      assertEquals(Level.SEVERE, rootLogger.getLevel());
+                      assertEquals(
+                          Level.INFO,
+                          java.util.logging.Logger.getLogger(
+                                  
"org.apache.beam.runners.direct.DirectRunnerTest")
+                              .getLevel());
+                    }
+                  }));
+      pipeline.run();
+    } finally {
+      // resume original log level
+      rootLogger.setLevel(originalLevel);
+    }
+  }
+
   private static class LongNoDecodeCoder extends AtomicCoder<Long> {
     @Override
     public void encode(Long value, OutputStream outStream) throws IOException 
{}

Reply via email to