This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b81025c7e60445ff1d27cbb3f50c932ff6fcfe7e
Author: Alexey Romanenko <aromanenko....@gmail.com>
AuthorDate: Fri Jun 28 16:54:28 2019 +0200

    [BEAM-7589] Make KinesisIOIT compatible with all other ITs
---
 sdks/java/io/kinesis/build.gradle                  |   1 +
 .../apache/beam/sdk/io/kinesis/KinesisIOIT.java    | 124 ++++++++++++---------
 .../beam/sdk/io/kinesis/KinesisTestOptions.java    |  12 ++
 3 files changed, 86 insertions(+), 51 deletions(-)

diff --git a/sdks/java/io/kinesis/build.gradle 
b/sdks/java/io/kinesis/build.gradle
index ea6ca86..15b866c 100644
--- a/sdks/java/io/kinesis/build.gradle
+++ b/sdks/java/io/kinesis/build.gradle
@@ -41,6 +41,7 @@ dependencies {
   compile "com.amazonaws:amazon-kinesis-client:1.10.0"
   compile "com.amazonaws:amazon-kinesis-producer:0.12.11"
   compile "commons-lang:commons-lang:2.6"
+  testCompile project(path: ":sdks:java:io:common", configuration: 
"testRuntime")
   testCompile library.java.junit
   testCompile library.java.mockito_core
   testCompile library.java.guava_testlib
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
index 01004c3..5f3a003 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java
@@ -17,18 +17,19 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists.newArrayList;
-
 import com.amazonaws.regions.Regions;
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
-import java.util.List;
 import java.util.Random;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.TestRow;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
@@ -41,33 +42,43 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Integration test, that writes and reads data to and from real Kinesis. You 
need to provide all
- * {@link KinesisTestOptions} in order to run this.
+ * Integration test, that writes and reads data to and from real Kinesis. You 
need to provide {@link
+ * KinesisTestOptions} in order to run this.
  */
 @RunWith(JUnit4.class)
 public class KinesisIOIT implements Serializable {
-  public static final int NUM_RECORDS = 1000;
-  public static final int NUM_SHARDS = 2;
+  private static int numberOfShards;
+  private static int numberOfRows;
 
-  @Rule public final transient TestPipeline p = TestPipeline.create();
-  @Rule public final transient TestPipeline p2 = TestPipeline.create();
+  @Rule public TestPipeline pipelineWrite = TestPipeline.create();
+  @Rule public TestPipeline pipelineRead = TestPipeline.create();
 
   private static KinesisTestOptions options;
+  private static final Instant now = Instant.now();
 
   @BeforeClass
   public static void setup() {
     PipelineOptionsFactory.register(KinesisTestOptions.class);
     options = 
TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+    numberOfShards = options.getNumberOfShards();
+    numberOfRows = options.getNumberOfRecords();
   }
 
+  /** Test which write and then read data for a Kinesis stream. */
   @Test
-  public void testWriteThenRead() throws Exception {
-    Instant now = Instant.now();
-    List<byte[]> inputData = prepareData();
+  public void testWriteThenRead() {
+    runWrite();
+    runRead();
+  }
 
-    // Write data into stream
-    p.apply(Create.of(inputData))
+  /** Write test dataset into Kinesis stream. */
+  private void runWrite() {
+    pipelineWrite
+        .apply("Generate Sequence", GenerateSequence.from(0).to((long) 
numberOfRows))
+        .apply("Prepare TestRows", ParDo.of(new 
TestRow.DeterministicallyConstructTestRowFn()))
+        .apply("Prepare Kinesis input records", ParDo.of(new ConvertToBytes()))
         .apply(
+            "Write to Kinesis",
             KinesisIO.write()
                 .withStreamName(options.getAwsKinesisStream())
                 .withPartitioner(new RandomPartitioner())
@@ -75,51 +86,62 @@ public class KinesisIOIT implements Serializable {
                     options.getAwsAccessKey(),
                     options.getAwsSecretKey(),
                     Regions.fromName(options.getAwsKinesisRegion())));
-    p.run().waitUntilFinish();
-
-    // Read new data from stream that was just written before
-    PCollection<byte[]> output =
-        p2.apply(
-                KinesisIO.read()
-                    .withStreamName(options.getAwsKinesisStream())
-                    .withAWSClientsProvider(
-                        options.getAwsAccessKey(),
-                        options.getAwsSecretKey(),
-                        Regions.fromName(options.getAwsKinesisRegion()))
-                    .withMaxNumRecords(inputData.size())
-                    // to prevent endless running in case of error
-                    .withMaxReadTime(Duration.standardMinutes(5))
-                    
.withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
-                    .withInitialTimestampInStream(now)
-                    .withRequestRecordsLimit(1000))
-            .apply(
-                ParDo.of(
-                    new DoFn<KinesisRecord, byte[]>() {
-
-                      @ProcessElement
-                      public void processElement(ProcessContext c) {
-                        KinesisRecord record = c.element();
-                        byte[] data = record.getData().array();
-                        c.output(data);
-                      }
-                    }));
-    PAssert.that(output).containsInAnyOrder(inputData);
-    p2.run().waitUntilFinish();
+
+    pipelineWrite.run().waitUntilFinish();
+  }
+
+  /** Read test dataset from Kinesis stream. */
+  private void runRead() {
+    PCollection<KinesisRecord> output =
+        pipelineRead.apply(
+            KinesisIO.read()
+                .withStreamName(options.getAwsKinesisStream())
+                .withAWSClientsProvider(
+                    options.getAwsAccessKey(),
+                    options.getAwsSecretKey(),
+                    Regions.fromName(options.getAwsKinesisRegion()))
+                .withMaxNumRecords(numberOfRows)
+                // to prevent endless running in case of error
+                .withMaxReadTime(Duration.standardMinutes(10))
+                
.withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
+                .withInitialTimestampInStream(now)
+                .withRequestRecordsLimit(1000));
+
+    PAssert.thatSingleton(output.apply("Count All", Count.globally()))
+        .isEqualTo((long) numberOfRows);
+
+    PCollection<String> consolidatedHashcode =
+        output
+            .apply(ParDo.of(new ExtractDataValues()))
+            .apply("Hash row contents", Combine.globally(new 
HashingFn()).withoutDefaults());
+
+    PAssert.that(consolidatedHashcode)
+        .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+
+    pipelineRead.run().waitUntilFinish();
+  }
+
+  /** Produces test rows. */
+  private static class ConvertToBytes extends DoFn<TestRow, byte[]> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      
c.output(String.valueOf(c.element().name()).getBytes(StandardCharsets.UTF_8));
+    }
   }
 
-  private List<byte[]> prepareData() {
-    List<byte[]> data = newArrayList();
-    for (int i = 0; i < NUM_RECORDS; i++) {
-      data.add(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+  /** Read rows from Table. */
+  private static class ExtractDataValues extends DoFn<KinesisRecord, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(new String(c.element().getDataAsBytes(), 
StandardCharsets.UTF_8));
     }
-    return data;
   }
 
   private static final class RandomPartitioner implements KinesisPartitioner {
     @Override
     public String getPartitionKey(byte[] value) {
       Random rand = new Random();
-      int n = rand.nextInt(NUM_SHARDS) + 1;
+      int n = rand.nextInt(numberOfShards) + 1;
       return String.valueOf(n);
     }
 
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
index 30f1f86..185f953 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
@@ -47,4 +47,16 @@ public interface KinesisTestOptions extends 
TestPipelineOptions {
   String getAwsAccessKey();
 
   void setAwsAccessKey(String value);
+
+  @Description("Number of shards of stream")
+  @Default.Integer(2)
+  Integer getNumberOfShards();
+
+  void setNumberOfShards(Integer count);
+
+  @Description("Number of records that will be written and read by the test")
+  @Default.Integer(1000)
+  Integer getNumberOfRecords();
+
+  void setNumberOfRecords(Integer count);
 }

Reply via email to