[ 
https://issues.apache.org/jira/browse/BEAM-6283?focusedWorklogId=178107&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-178107
 ]

ASF GitHub Bot logged work on BEAM-6283:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Dec/18 19:03
            Start Date: 21/Dec/18 19:03
    Worklog Time Spent: 10m 
      Work Description: mxm closed pull request #7333: [BEAM-6283] Convert 
PortableStateExecutionTest and PortableExecutionTest to using PAssert
URL: https://github.com/apache/beam/pull/7333
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
index 34985d75e568..9542bdd79547 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -17,19 +17,16 @@
  */
 package org.apache.beam.runners.flink;
 
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.concurrent.Executors;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
@@ -39,12 +36,14 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,7 +60,7 @@
 @RunWith(Parameterized.class)
 public class PortableExecutionTest implements Serializable {
 
-  @Parameters
+  @Parameters(name = "streaming: {0}")
   public static Object[] data() {
     return new Object[] {true, false};
   }
@@ -80,9 +79,7 @@ public void tearDown() {
     flinkJobExecutor.shutdown();
   }
 
-  private static ArrayList<KV<String, Iterable<Long>>> outputValues = new 
ArrayList<>();
-
-  @Test
+  @Test(timeout = 120_000)
   public void testExecution() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setRunner(CrashingRunner.class);
@@ -92,45 +89,42 @@ public void testExecution() throws Exception {
         .as(PortablePipelineOptions.class)
         .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
     Pipeline p = Pipeline.create(options);
-    p.apply("impulse", Impulse.create())
-        .apply(
-            "create",
-            ParDo.of(
-                new DoFn<byte[], String>() {
-                  @ProcessElement
-                  public void process(ProcessContext ctxt) {
-                    ctxt.output("zero");
-                    ctxt.output("one");
-                    ctxt.output("two");
-                  }
-                }))
-        .apply(
-            "len",
-            ParDo.of(
-                new DoFn<String, Long>() {
-                  @ProcessElement
-                  public void process(ProcessContext ctxt) {
-                    ctxt.output((long) ctxt.element().length());
-                  }
-                }))
-        .apply("addKeys", WithKeys.of("foo"))
-        // Use some unknown coders
-        .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of()))
-        // Force the output to be materialized
-        .apply("gbk", GroupByKey.create())
-        .apply(
-            "collect",
-            ParDo.of(
-                new DoFn<KV<String, Iterable<Long>>, Void>() {
-                  @ProcessElement
-                  public void process(ProcessContext ctx) {
-                    outputValues.add(ctx.element());
-                  }
-                }));
+    PCollection<KV<String, Iterable<Long>>> result =
+        p.apply("impulse", Impulse.create())
+            .apply(
+                "create",
+                ParDo.of(
+                    new DoFn<byte[], String>() {
+                      @ProcessElement
+                      public void process(ProcessContext ctxt) {
+                        ctxt.output("zero");
+                        ctxt.output("one");
+                        ctxt.output("two");
+                      }
+                    }))
+            .apply(
+                "len",
+                ParDo.of(
+                    new DoFn<String, Long>() {
+                      @ProcessElement
+                      public void process(ProcessContext ctxt) {
+                        ctxt.output((long) ctxt.element().length());
+                      }
+                    }))
+            .apply("addKeys", WithKeys.of("foo"))
+            // Use some unknown coders
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianLongCoder.of()))
+            // Force the output to be materialized
+            .apply("gbk", GroupByKey.create());
+
+    PAssert.that(result).containsInAnyOrder(KV.of("foo", ImmutableList.of(4L, 
3L, 3L)));
+
+    // This is line below required to convert the PAssert's read to an 
impulse, which is expected
+    // by the GreedyPipelineFuser.
+    
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
 
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
 
-    outputValues.clear();
     // execute the pipeline
     FlinkJobInvocation jobInvocation =
         FlinkJobInvocation.create(
@@ -140,16 +134,10 @@ public void process(ProcessContext ctx) {
             pipelineProto,
             options.as(FlinkPipelineOptions.class),
             null,
-            Collections.EMPTY_LIST);
+            Collections.emptyList());
     jobInvocation.start();
-    long timeout = System.currentTimeMillis() + 60 * 1000;
-    while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis() 
< timeout) {
+    while (jobInvocation.getState() != Enum.DONE) {
       Thread.sleep(1000);
     }
-    assertEquals("job state", Enum.DONE, jobInvocation.getState());
-
-    assertEquals(1, outputValues.size());
-    assertEquals("foo", outputValues.get(0).getKey());
-    assertThat(outputValues.get(0).getValue(), containsInAnyOrder(4L, 3L, 3L));
   }
 }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
index 05194d1c979b..a658a1c9ec26 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
@@ -17,20 +17,15 @@
  */
 package org.apache.beam.runners.flink;
 
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.Executors;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -41,10 +36,12 @@
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -60,7 +57,7 @@
 @RunWith(Parameterized.class)
 public class PortableStateExecutionTest implements Serializable {
 
-  @Parameters
+  @Parameters(name = "streaming: {0}")
   public static Object[] data() {
     return new Object[] {true, false};
   }
@@ -79,21 +76,11 @@ public void tearDown() {
     flinkJobExecutor.shutdown();
   }
 
-  // State -> Key -> Value
-  private static final Map<String, Map<String, Integer>> stateValuesMap = new 
HashMap<>();
-
-  @Before
-  public void before() {
-    stateValuesMap.clear();
-    stateValuesMap.put("valueState", new HashMap<>());
-    stateValuesMap.put("valueState2", new HashMap<>());
-  }
-
   // Special values which clear / write out state
   private static final int CLEAR_STATE = -1;
-  private static final int WRITE_STATE_TO_MAP = -2;
+  private static final int WRITE_STATE = -2;
 
-  @Test
+  @Test(timeout = 120_000)
   public void testExecution() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setRunner(CrashingRunner.class);
@@ -103,74 +90,93 @@ public void testExecution() throws Exception {
         .as(PortablePipelineOptions.class)
         .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
     Pipeline p = Pipeline.create(options);
-    p.apply(Impulse.create())
-        .apply(
-            ParDo.of(
-                new DoFn<byte[], KV<String, Integer>>() {
-                  @ProcessElement
-                  public void process(ProcessContext ctx) {
-                    // Values == -1 will clear the state
-                    ctx.output(KV.of("clearedState", 1));
-                    ctx.output(KV.of("clearedState", CLEAR_STATE));
-                    // values >= 1 will be added on top of each other
-                    ctx.output(KV.of("bla1", 42));
-                    ctx.output(KV.of("bla", 23));
-                    ctx.output(KV.of("bla2", 64));
-                    ctx.output(KV.of("bla", 1));
-                    ctx.output(KV.of("bla", 1));
-                    // values == -2 will write the state to a map
-                    ctx.output(KV.of("bla", WRITE_STATE_TO_MAP));
-                    ctx.output(KV.of("bla1", WRITE_STATE_TO_MAP));
-                    ctx.output(KV.of("bla2", WRITE_STATE_TO_MAP));
-                    ctx.output(KV.of("clearedState", -2));
-                  }
-                }))
-        .apply(
-            "statefulDoFn",
-            ParDo.of(
-                new DoFn<KV<String, Integer>, String>() {
-                  @StateId("valueState")
-                  private final StateSpec<ValueState<Integer>> valueStateSpec =
-                      StateSpecs.value(VarIntCoder.of());
-
-                  @StateId("valueState2")
-                  private final StateSpec<ValueState<Integer>> valueStateSpec2 
=
-                      StateSpecs.value(VarIntCoder.of());
-
-                  @ProcessElement
-                  public void process(
-                      ProcessContext ctx,
-                      @StateId("valueState") ValueState<Integer> valueState,
-                      @StateId("valueState2") ValueState<Integer> valueState2) 
{
-                    performStateUpdates("valueState", ctx, valueState);
-                    performStateUpdates("valueState2", ctx, valueState2);
-                  }
-
-                  private void performStateUpdates(
-                      String stateId, ProcessContext ctx, ValueState<Integer> 
valueState) {
-                    Map<String, Integer> stateValues = 
stateValuesMap.get(stateId);
-                    Integer value = ctx.element().getValue();
-                    if (value == null) {
-                      throw new IllegalStateException();
-                    }
-                    switch (value) {
-                      case CLEAR_STATE:
-                        valueState.clear();
-                        break;
-                      case WRITE_STATE_TO_MAP:
-                        stateValues.put(ctx.element().getKey(), 
valueState.read());
-                        break;
-                      default:
-                        Integer currentState = valueState.read();
-                        if (currentState == null) {
-                          currentState = value;
-                        } else {
-                          currentState += value;
+    PCollection<KV<String, String>> output =
+        p.apply(Impulse.create())
+            .apply(
+                ParDo.of(
+                    new DoFn<byte[], KV<String, Integer>>() {
+                      @ProcessElement
+                      public void process(ProcessContext ctx) {
+                        // Values == -1 will clear the state
+                        ctx.output(KV.of("clearedState", 1));
+                        ctx.output(KV.of("clearedState", CLEAR_STATE));
+                        // values >= 1 will be added on top of each other
+                        ctx.output(KV.of("bla1", 42));
+                        ctx.output(KV.of("bla", 23));
+                        ctx.output(KV.of("bla2", 64));
+                        ctx.output(KV.of("bla", 1));
+                        ctx.output(KV.of("bla", 1));
+                        // values == -2 will write the current state to the 
output
+                        ctx.output(KV.of("bla", WRITE_STATE));
+                        ctx.output(KV.of("bla1", WRITE_STATE));
+                        ctx.output(KV.of("bla2", WRITE_STATE));
+                        ctx.output(KV.of("clearedState", WRITE_STATE));
+                      }
+                    }))
+            .apply(
+                "statefulDoFn",
+                ParDo.of(
+                    new DoFn<KV<String, Integer>, KV<String, String>>() {
+                      @StateId("valueState")
+                      private final StateSpec<ValueState<Integer>> 
valueStateSpec =
+                          StateSpecs.value(VarIntCoder.of());
+
+                      @StateId("valueState2")
+                      private final StateSpec<ValueState<Integer>> 
valueStateSpec2 =
+                          StateSpecs.value(VarIntCoder.of());
+
+                      @ProcessElement
+                      public void process(
+                          ProcessContext ctx,
+                          @StateId("valueState") ValueState<Integer> 
valueState,
+                          @StateId("valueState2") ValueState<Integer> 
valueState2) {
+                        performStateUpdates(ctx, valueState);
+                        performStateUpdates(ctx, valueState2);
+                      }
+
+                      private void performStateUpdates(
+                          ProcessContext ctx, ValueState<Integer> valueState) {
+                        Integer value = ctx.element().getValue();
+                        if (value == null) {
+                          throw new IllegalStateException();
                         }
-                        valueState.write(currentState);
-                    }
-                  }
-                }));
+                        switch (value) {
+                          case CLEAR_STATE:
+                            valueState.clear();
+                            break;
+                          case WRITE_STATE:
+                            Integer read = valueState.read();
+                            ctx.output(
+                                KV.of(
+                                    ctx.element().getKey(),
+                                    read == null ? "null" : read.toString()));
+                            break;
+                          default:
+                            Integer currentState = valueState.read();
+                            if (currentState == null) {
+                              currentState = value;
+                            } else {
+                              currentState += value;
+                            }
+                            valueState.write(currentState);
+                        }
+                      }
+                    }));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            KV.of("bla", "25"),
+            KV.of("bla1", "42"),
+            KV.of("bla2", "64"),
+            KV.of("clearedState", "null"),
+            KV.of("bla", "25"),
+            KV.of("bla1", "42"),
+            KV.of("bla2", "64"),
+            KV.of("clearedState", "null"));
+
+    // This is line below required to convert the PAssert's read to an 
impulse, which is expected
+    // by the GreedyPipelineFuser.
+    
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
 
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
 
@@ -185,20 +191,9 @@ private void performStateUpdates(
             Collections.emptyList());
 
     jobInvocation.start();
-    long timeout = System.currentTimeMillis() + 60 * 1000;
-    while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis() 
< timeout) {
-      Thread.sleep(1000);
-    }
-    assertThat(jobInvocation.getState(), is(Enum.DONE));
 
-    Map<String, Integer> expected = new HashMap<>();
-    expected.put("bla", 25);
-    expected.put("bla1", 42);
-    expected.put("bla2", 64);
-    expected.put("clearedState", null);
-
-    for (Map<String, Integer> statesValues : stateValuesMap.values()) {
-      assertThat(statesValues, equalTo(expected));
+    while (jobInvocation.getState() != Enum.DONE) {
+      Thread.sleep(1000);
     }
   }
 }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index 58db1baad164..d9639b87162c 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -67,7 +67,7 @@
 @RunWith(Parameterized.class)
 public class PortableTimersExecutionTest implements Serializable {
 
-  @Parameters
+  @Parameters(name = "streaming: {0}")
   public static Object[] testModes() {
     return new Object[] {true, false};
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 178107)
    Time Spent: 50m  (was: 40m)

> Convert PortableStateTimerTest and PortableExecutionTest to using PAssert
> -------------------------------------------------------------------------
>
>                 Key: BEAM-6283
>                 URL: https://issues.apache.org/jira/browse/BEAM-6283
>             Project: Beam
>          Issue Type: Test
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Minor
>             Fix For: 2.10.0
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> The tests modify a map concurrently. They should synchronize on the map or be 
> converted to using PAssert, similar to PortableTimerTest.
> Just saw PortableStateTest fail here: 
> https://builds.apache.org/job/beam_PreCommit_Java_Phrase/508



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to