This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e655f5c5990 Support log level override for direct runner (#37100)
e655f5c5990 is described below
commit e655f5c59908a76229d7a943451bcdc180d80849
Author: Yi Hu <[email protected]>
AuthorDate: Mon Dec 22 14:06:43 2025 -0500
Support log level override for direct runner (#37100)
---
.../apache/beam/runners/direct/DirectRunner.java | 3 +-
.../beam/runners/direct/DirectRunnerTest.java | 47 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 1 deletion(-)
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index e27b6a618c3..874cb96293c 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.UserCodeException;
@@ -184,7 +185,7 @@ public class DirectRunner extends
PipelineRunner<DirectPipelineResult> {
DisplayDataValidator.validatePipeline(pipeline);
DisplayDataValidator.validateOptions(options);
-
+
SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class));
ExecutorService metricsPool =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index dc45de20002..7131247c3d7 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -49,6 +49,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.LogManager;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -75,6 +77,7 @@ import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -744,6 +747,50 @@ public class DirectRunnerTest implements Serializable {
void setIgnoredField(String value);
}
+ @Test
+ public void testLogLevel() {
+ PipelineOptions options =
+ PipelineOptionsFactory.fromArgs(
+ new String[] {
+ "--runner=DirectRunner",
+ "--defaultSdkHarnessLogLevel=ERROR",
+
"--sdkHarnessLogLevelOverrides={\"org.apache.beam.runners.direct.DirectRunnerTest\":\"INFO\"}"
+ })
+ .create();
+ Pipeline pipeline = Pipeline.create(options);
+
+ LogManager logManager = LogManager.getLogManager();
+ // use full name to avoid conflicts with org.slf4j.Logger
+ java.util.logging.Logger rootLogger = logManager.getLogger("");
+ Level originalLevel = rootLogger.getLevel();
+
+ try {
+ pipeline
+ .apply(Impulse.create())
+ .apply(
+ ParDo.of(
+ new DoFn<byte[], byte[]>() {
+ @ProcessElement
+ public void process(@Element byte[] element,
OutputReceiver<byte[]> o) {
+ LogManager logManager = LogManager.getLogManager();
+ java.util.logging.Logger rootLogger =
logManager.getLogger("");
+ // check loglevel here. Whether actual logs are rendered
depends on slf4j impl
+ // and upstream configs.
+ assertEquals(Level.SEVERE, rootLogger.getLevel());
+ assertEquals(
+ Level.INFO,
+ java.util.logging.Logger.getLogger(
+
"org.apache.beam.runners.direct.DirectRunnerTest")
+ .getLevel());
+ }
+ }));
+ pipeline.run();
+ } finally {
+ // resume original log level
+ rootLogger.setLevel(originalLevel);
+ }
+ }
+
private static class LongNoDecodeCoder extends AtomicCoder<Long> {
@Override
public void encode(Long value, OutputStream outStream) throws IOException
{}