This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 904a39e  [BEAM-10670] Update Twister2Runner.runTest with the same 
logic as Twister2Runner.run
     new 06dbe4f  Merge pull request #14864 from boyuanzz/fix
904a39e is described below

commit 904a39e45d071397e31e423740fcda596147fb5a
Author: Boyuan Zhang <boyu...@google.com>
AuthorDate: Fri May 21 16:33:05 2021 -0700

    [BEAM-10670] Update Twister2Runner.runTest with the same logic as 
Twister2Runner.run
---
 .../java/org/apache/beam/runners/twister2/Twister2Runner.java  | 10 +++-------
 1 file changed, 3 insertions(+), 7 deletions(-)

diff --git 
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
index a6f0797..b3a02d3 100644
--- 
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
+++ 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
@@ -151,15 +151,11 @@ public class Twister2Runner extends 
PipelineRunner<PipelineResult> {
     Twister2PipelineExecutionEnvironment env = new 
Twister2PipelineExecutionEnvironment(options);
     LOG.info("Translating pipeline to Twister2 program.");
     pipeline.replaceAll(getDefaultOverrides());
+
     // TODO(BEAM-10670): Use SDF read as default when we address performance 
issue.
-    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"use_sdf_read")) {
-      // Populate experiments directly to have Kafka use legacy read.
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"beam_fn_api_use_deprecated_read");
-      ExperimentalOptions.addExperiment(
-          pipeline.getOptions().as(ExperimentalOptions.class), 
"use_deprecated_read");
+    if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), 
"beam_fn_api")) {
+      
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
     }
-    
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
 
     env.translate(pipeline);
     setupSystemTest(options);

Reply via email to