This is an automated email from the ASF dual-hosted git repository. heejong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 904a39e [BEAM-10670] Update Twister2Runner.runTest with the same logic as Twister2Runner.run new 06dbe4f Merge pull request #14864 from boyuanzz/fix 904a39e is described below commit 904a39e45d071397e31e423740fcda596147fb5a Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Fri May 21 16:33:05 2021 -0700 [BEAM-10670] Update Twister2Runner.runTest with the same logic as Twister2Runner.run --- .../java/org/apache/beam/runners/twister2/Twister2Runner.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java index a6f0797..b3a02d3 100644 --- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java @@ -151,15 +151,11 @@ public class Twister2Runner extends PipelineRunner<PipelineResult> { Twister2PipelineExecutionEnvironment env = new Twister2PipelineExecutionEnvironment(options); LOG.info("Translating pipeline to Twister2 program."); pipeline.replaceAll(getDefaultOverrides()); + // TODO(BEAM-10670): Use SDF read as default when we address performance issue. - if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_sdf_read")) { - // Populate experiments directly to have Kafka use legacy read. - ExperimentalOptions.addExperiment( - pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api_use_deprecated_read"); - ExperimentalOptions.addExperiment( - pipeline.getOptions().as(ExperimentalOptions.class), "use_deprecated_read"); + if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) { + SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); } - SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); env.translate(pipeline); setupSystemTest(options);