[ https://issues.apache.org/jira/browse/BEAM-6283?focusedWorklogId=178107&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-178107 ]
ASF GitHub Bot logged work on BEAM-6283: ---------------------------------------- Author: ASF GitHub Bot Created on: 21/Dec/18 19:03 Start Date: 21/Dec/18 19:03 Worklog Time Spent: 10m Work Description: mxm closed pull request #7333: [BEAM-6283] Convert PortableStateExecutionTest and PortableExecutionTest to using PAssert URL: https://github.com/apache/beam/pull/7333 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java index 34985d75e568..9542bdd79547 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java @@ -17,19 +17,16 @@ */ package org.apache.beam.runners.flink; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; import java.util.concurrent.Executors; import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.Environments; +import org.apache.beam.runners.core.construction.JavaReadViaImpulse; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianLongCoder; @@ -39,12 +36,14 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PortablePipelineOptions; import org.apache.beam.sdk.testing.CrashingRunner; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -61,7 +60,7 @@ @RunWith(Parameterized.class) public class PortableExecutionTest implements Serializable { - @Parameters + @Parameters(name = "streaming: {0}") public static Object[] data() { return new Object[] {true, false}; } @@ -80,9 +79,7 @@ public void tearDown() { flinkJobExecutor.shutdown(); } - private static ArrayList<KV<String, Iterable<Long>>> outputValues = new ArrayList<>(); - - @Test + @Test(timeout = 120_000) public void testExecution() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(CrashingRunner.class); @@ -92,45 +89,42 @@ public void testExecution() throws Exception { .as(PortablePipelineOptions.class) .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED); Pipeline p = Pipeline.create(options); - p.apply("impulse", Impulse.create()) - .apply( - "create", - ParDo.of( - new DoFn<byte[], String>() { - @ProcessElement - public void process(ProcessContext ctxt) { - ctxt.output("zero"); - ctxt.output("one"); - ctxt.output("two"); - } - })) - .apply( - "len", - ParDo.of( - new DoFn<String, Long>() { - @ProcessElement - public void process(ProcessContext ctxt) { - ctxt.output((long) ctxt.element().length()); - } - })) - .apply("addKeys", WithKeys.of("foo")) - // Use some unknown coders - .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of())) - // Force the output to be materialized - .apply("gbk", GroupByKey.create()) - .apply( - "collect", - ParDo.of( - new DoFn<KV<String, Iterable<Long>>, Void>() { - @ProcessElement - public void process(ProcessContext ctx) { - outputValues.add(ctx.element()); - } - })); + PCollection<KV<String, Iterable<Long>>> result = + p.apply("impulse", Impulse.create()) + .apply( + "create", + ParDo.of( + new DoFn<byte[], String>() { + @ProcessElement + public void process(ProcessContext ctxt) { + ctxt.output("zero"); + ctxt.output("one"); + ctxt.output("two"); + } + })) + .apply( + "len", + ParDo.of( + new DoFn<String, Long>() { + @ProcessElement + public void process(ProcessContext ctxt) { + ctxt.output((long) ctxt.element().length()); + } + })) + .apply("addKeys", WithKeys.of("foo")) + // Use some unknown coders + .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of())) + // Force the output to be materialized + .apply("gbk", GroupByKey.create()); + + PAssert.that(result).containsInAnyOrder(KV.of("foo", ImmutableList.of(4L, 3L, 3L))); + + // This is line below required to convert the PAssert's read to an impulse, which is expected + // by the GreedyPipelineFuser. + p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride())); RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); - outputValues.clear(); // execute the pipeline FlinkJobInvocation jobInvocation = FlinkJobInvocation.create( @@ -140,16 +134,10 @@ public void process(ProcessContext ctx) { pipelineProto, options.as(FlinkPipelineOptions.class), null, - Collections.EMPTY_LIST); + Collections.emptyList()); jobInvocation.start(); - long timeout = System.currentTimeMillis() + 60 * 1000; - while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis() < timeout) { + while (jobInvocation.getState() != Enum.DONE) { Thread.sleep(1000); } - assertEquals("job state", Enum.DONE, jobInvocation.getState()); - - assertEquals(1, outputValues.size()); - assertEquals("foo", outputValues.get(0).getKey()); - assertThat(outputValues.get(0).getValue(), containsInAnyOrder(4L, 3L, 3L)); } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java index 05194d1c979b..a658a1c9ec26 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java @@ -17,20 +17,15 @@ */ package org.apache.beam.runners.flink; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.io.Serializable; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.Executors; import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.Environments; +import org.apache.beam.runners.core.construction.JavaReadViaImpulse; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.VarIntCoder; @@ -41,10 +36,12 @@ import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.CrashingRunner; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -60,7 +57,7 @@ @RunWith(Parameterized.class) public class PortableStateExecutionTest implements Serializable { - @Parameters + @Parameters(name = "streaming: {0}") public static Object[] data() { return new Object[] {true, false}; } @@ -79,21 +76,11 @@ public void tearDown() { flinkJobExecutor.shutdown(); } - // State -> Key -> Value - private static final Map<String, Map<String, Integer>> stateValuesMap = new HashMap<>(); - - @Before - public void before() { - stateValuesMap.clear(); - stateValuesMap.put("valueState", new HashMap<>()); - stateValuesMap.put("valueState2", new HashMap<>()); - } - // Special values which clear / write out state private static final int CLEAR_STATE = -1; - private static final int WRITE_STATE_TO_MAP = -2; + private static final int WRITE_STATE = -2; - @Test + @Test(timeout = 120_000) public void testExecution() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(CrashingRunner.class); @@ -103,74 +90,93 @@ public void testExecution() throws Exception { .as(PortablePipelineOptions.class) .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED); Pipeline p = Pipeline.create(options); - p.apply(Impulse.create()) - .apply( - ParDo.of( - new DoFn<byte[], KV<String, Integer>>() { - @ProcessElement - public void process(ProcessContext ctx) { - // Values == -1 will clear the state - ctx.output(KV.of("clearedState", 1)); - ctx.output(KV.of("clearedState", CLEAR_STATE)); - // values >= 1 will be added on top of each other - ctx.output(KV.of("bla1", 42)); - ctx.output(KV.of("bla", 23)); - ctx.output(KV.of("bla2", 64)); - ctx.output(KV.of("bla", 1)); - ctx.output(KV.of("bla", 1)); - // values == -2 will write the state to a map - ctx.output(KV.of("bla", WRITE_STATE_TO_MAP)); - ctx.output(KV.of("bla1", WRITE_STATE_TO_MAP)); - ctx.output(KV.of("bla2", WRITE_STATE_TO_MAP)); - ctx.output(KV.of("clearedState", -2)); - } - })) - .apply( - "statefulDoFn", - ParDo.of( - new DoFn<KV<String, Integer>, String>() { - @StateId("valueState") - private final StateSpec<ValueState<Integer>> valueStateSpec = - StateSpecs.value(VarIntCoder.of()); - - @StateId("valueState2") - private final StateSpec<ValueState<Integer>> valueStateSpec2 = - StateSpecs.value(VarIntCoder.of()); - - @ProcessElement - public void process( - ProcessContext ctx, - @StateId("valueState") ValueState<Integer> valueState, - @StateId("valueState2") ValueState<Integer> valueState2) { - performStateUpdates("valueState", ctx, valueState); - performStateUpdates("valueState2", ctx, valueState2); - } - - private void performStateUpdates( - String stateId, ProcessContext ctx, ValueState<Integer> valueState) { - Map<String, Integer> stateValues = stateValuesMap.get(stateId); - Integer value = ctx.element().getValue(); - if (value == null) { - throw new IllegalStateException(); - } - switch (value) { - case CLEAR_STATE: - valueState.clear(); - break; - case WRITE_STATE_TO_MAP: - stateValues.put(ctx.element().getKey(), valueState.read()); - break; - default: - Integer currentState = valueState.read(); - if (currentState == null) { - currentState = value; - } else { - currentState += value; + PCollection<KV<String, String>> output = + p.apply(Impulse.create()) + .apply( + ParDo.of( + new DoFn<byte[], KV<String, Integer>>() { + @ProcessElement + public void process(ProcessContext ctx) { + // Values == -1 will clear the state + ctx.output(KV.of("clearedState", 1)); + ctx.output(KV.of("clearedState", CLEAR_STATE)); + // values >= 1 will be added on top of each other + ctx.output(KV.of("bla1", 42)); + ctx.output(KV.of("bla", 23)); + ctx.output(KV.of("bla2", 64)); + ctx.output(KV.of("bla", 1)); + ctx.output(KV.of("bla", 1)); + // values == -2 will write the current state to the output + ctx.output(KV.of("bla", WRITE_STATE)); + ctx.output(KV.of("bla1", WRITE_STATE)); + ctx.output(KV.of("bla2", WRITE_STATE)); + ctx.output(KV.of("clearedState", WRITE_STATE)); + } + })) + .apply( + "statefulDoFn", + ParDo.of( + new DoFn<KV<String, Integer>, KV<String, String>>() { + @StateId("valueState") + private final StateSpec<ValueState<Integer>> valueStateSpec = + StateSpecs.value(VarIntCoder.of()); + + @StateId("valueState2") + private final StateSpec<ValueState<Integer>> valueStateSpec2 = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void process( + ProcessContext ctx, + @StateId("valueState") ValueState<Integer> valueState, + @StateId("valueState2") ValueState<Integer> valueState2) { + performStateUpdates(ctx, valueState); + performStateUpdates(ctx, valueState2); + } + + private void performStateUpdates( + ProcessContext ctx, ValueState<Integer> valueState) { + Integer value = ctx.element().getValue(); + if (value == null) { + throw new IllegalStateException(); } - valueState.write(currentState); - } - } - })); + switch (value) { + case CLEAR_STATE: + valueState.clear(); + break; + case WRITE_STATE: + Integer read = valueState.read(); + ctx.output( + KV.of( + ctx.element().getKey(), + read == null ? "null" : read.toString())); + break; + default: + Integer currentState = valueState.read(); + if (currentState == null) { + currentState = value; + } else { + currentState += value; + } + valueState.write(currentState); + } + } + })); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("bla", "25"), + KV.of("bla1", "42"), + KV.of("bla2", "64"), + KV.of("clearedState", "null"), + KV.of("bla", "25"), + KV.of("bla1", "42"), + KV.of("bla2", "64"), + KV.of("clearedState", "null")); + + // This is line below required to convert the PAssert's read to an impulse, which is expected + // by the GreedyPipelineFuser. + p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride())); RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); @@ -185,20 +191,9 @@ private void performStateUpdates( Collections.emptyList()); jobInvocation.start(); - long timeout = System.currentTimeMillis() + 60 * 1000; - while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis() < timeout) { - Thread.sleep(1000); - } - assertThat(jobInvocation.getState(), is(Enum.DONE)); - Map<String, Integer> expected = new HashMap<>(); - expected.put("bla", 25); - expected.put("bla1", 42); - expected.put("bla2", 64); - expected.put("clearedState", null); - - for (Map<String, Integer> statesValues : stateValuesMap.values()) { - assertThat(statesValues, equalTo(expected)); + while (jobInvocation.getState() != Enum.DONE) { + Thread.sleep(1000); } } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java index 58db1baad164..d9639b87162c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java @@ -67,7 +67,7 @@ @RunWith(Parameterized.class) public class PortableTimersExecutionTest implements Serializable { - @Parameters + @Parameters(name = "streaming: {0}") public static Object[] testModes() { return new Object[] {true, false}; } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 178107) Time Spent: 50m (was: 40m) > Convert PortableStateTimerTest and PortableExecutionTest to using PAssert > ------------------------------------------------------------------------- > > Key: BEAM-6283 > URL: https://issues.apache.org/jira/browse/BEAM-6283 > Project: Beam > Issue Type: Test > Components: runner-flink > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Priority: Minor > Fix For: 2.10.0 > > Time Spent: 50m > Remaining Estimate: 0h > > The tests modify a map concurrently. They should synchronize on the map or be > converted to using PAssert, similar to PortableTimerTest. > Just saw PortableStateTest fail here: > https://builds.apache.org/job/beam_PreCommit_Java_Phrase/508 -- This message was sent by Atlassian JIRA (v7.6.3#76005)