This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit b81025c7e60445ff1d27cbb3f50c932ff6fcfe7e Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Fri Jun 28 16:54:28 2019 +0200 [BEAM-7589] Make KinesisIOIT compatible with all other ITs --- sdks/java/io/kinesis/build.gradle | 1 + .../apache/beam/sdk/io/kinesis/KinesisIOIT.java | 124 ++++++++++++--------- .../beam/sdk/io/kinesis/KinesisTestOptions.java | 12 ++ 3 files changed, 86 insertions(+), 51 deletions(-) diff --git a/sdks/java/io/kinesis/build.gradle b/sdks/java/io/kinesis/build.gradle index ea6ca86..15b866c 100644 --- a/sdks/java/io/kinesis/build.gradle +++ b/sdks/java/io/kinesis/build.gradle @@ -41,6 +41,7 @@ dependencies { compile "com.amazonaws:amazon-kinesis-client:1.10.0" compile "com.amazonaws:amazon-kinesis-producer:0.12.11" compile "commons-lang:commons-lang:2.6" + testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime") testCompile library.java.junit testCompile library.java.mockito_core testCompile library.java.guava_testlib diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java index 01004c3..5f3a003 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java @@ -17,18 +17,19 @@ */ package org.apache.beam.sdk.io.kinesis; -import static org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists.newArrayList; - import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.util.List; import java.util.Random; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.io.common.TestRow; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; @@ -41,33 +42,43 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** - * Integration test, that writes and reads data to and from real Kinesis. You need to provide all - * {@link KinesisTestOptions} in order to run this. + * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link + * KinesisTestOptions} in order to run this. */ @RunWith(JUnit4.class) public class KinesisIOIT implements Serializable { - public static final int NUM_RECORDS = 1000; - public static final int NUM_SHARDS = 2; + private static int numberOfShards; + private static int numberOfRows; - @Rule public final transient TestPipeline p = TestPipeline.create(); - @Rule public final transient TestPipeline p2 = TestPipeline.create(); + @Rule public TestPipeline pipelineWrite = TestPipeline.create(); + @Rule public TestPipeline pipelineRead = TestPipeline.create(); private static KinesisTestOptions options; + private static final Instant now = Instant.now(); @BeforeClass public static void setup() { PipelineOptionsFactory.register(KinesisTestOptions.class); options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class); + numberOfShards = options.getNumberOfShards(); + numberOfRows = options.getNumberOfRecords(); } + /** Test which write and then read data for a Kinesis stream. */ @Test - public void testWriteThenRead() throws Exception { - Instant now = Instant.now(); - List<byte[]> inputData = prepareData(); + public void testWriteThenRead() { + runWrite(); + runRead(); + } - // Write data into stream - p.apply(Create.of(inputData)) + /** Write test dataset into Kinesis stream. */ + private void runWrite() { + pipelineWrite + .apply("Generate Sequence", GenerateSequence.from(0).to((long) numberOfRows)) + .apply("Prepare TestRows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) + .apply("Prepare Kinesis input records", ParDo.of(new ConvertToBytes())) .apply( + "Write to Kinesis", KinesisIO.write() .withStreamName(options.getAwsKinesisStream()) .withPartitioner(new RandomPartitioner()) @@ -75,51 +86,62 @@ public class KinesisIOIT implements Serializable { options.getAwsAccessKey(), options.getAwsSecretKey(), Regions.fromName(options.getAwsKinesisRegion()))); - p.run().waitUntilFinish(); - - // Read new data from stream that was just written before - PCollection<byte[]> output = - p2.apply( - KinesisIO.read() - .withStreamName(options.getAwsKinesisStream()) - .withAWSClientsProvider( - options.getAwsAccessKey(), - options.getAwsSecretKey(), - Regions.fromName(options.getAwsKinesisRegion())) - .withMaxNumRecords(inputData.size()) - // to prevent endless running in case of error - .withMaxReadTime(Duration.standardMinutes(5)) - .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP) - .withInitialTimestampInStream(now) - .withRequestRecordsLimit(1000)) - .apply( - ParDo.of( - new DoFn<KinesisRecord, byte[]>() { - - @ProcessElement - public void processElement(ProcessContext c) { - KinesisRecord record = c.element(); - byte[] data = record.getData().array(); - c.output(data); - } - })); - PAssert.that(output).containsInAnyOrder(inputData); - p2.run().waitUntilFinish(); + + pipelineWrite.run().waitUntilFinish(); + } + + /** Read test dataset from Kinesis stream. */ + private void runRead() { + PCollection<KinesisRecord> output = + pipelineRead.apply( + KinesisIO.read() + .withStreamName(options.getAwsKinesisStream()) + .withAWSClientsProvider( + options.getAwsAccessKey(), + options.getAwsSecretKey(), + Regions.fromName(options.getAwsKinesisRegion())) + .withMaxNumRecords(numberOfRows) + // to prevent endless running in case of error + .withMaxReadTime(Duration.standardMinutes(10)) + .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP) + .withInitialTimestampInStream(now) + .withRequestRecordsLimit(1000)); + + PAssert.thatSingleton(output.apply("Count All", Count.globally())) + .isEqualTo((long) numberOfRows); + + PCollection<String> consolidatedHashcode = + output + .apply(ParDo.of(new ExtractDataValues())) + .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults()); + + PAssert.that(consolidatedHashcode) + .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows)); + + pipelineRead.run().waitUntilFinish(); + } + + /** Produces test rows. */ + private static class ConvertToBytes extends DoFn<TestRow, byte[]> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(String.valueOf(c.element().name()).getBytes(StandardCharsets.UTF_8)); + } } - private List<byte[]> prepareData() { - List<byte[]> data = newArrayList(); - for (int i = 0; i < NUM_RECORDS; i++) { - data.add(String.valueOf(i).getBytes(StandardCharsets.UTF_8)); + /** Read rows from Table. */ + private static class ExtractDataValues extends DoFn<KinesisRecord, String> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8)); } - return data; } private static final class RandomPartitioner implements KinesisPartitioner { @Override public String getPartitionKey(byte[] value) { Random rand = new Random(); - int n = rand.nextInt(NUM_SHARDS) + 1; + int n = rand.nextInt(numberOfShards) + 1; return String.valueOf(n); } diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java index 30f1f86..185f953 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java @@ -47,4 +47,16 @@ public interface KinesisTestOptions extends TestPipelineOptions { String getAwsAccessKey(); void setAwsAccessKey(String value); + + @Description("Number of shards of stream") + @Default.Integer(2) + Integer getNumberOfShards(); + + void setNumberOfShards(Integer count); + + @Description("Number of records that will be written and read by the test") + @Default.Integer(1000) + Integer getNumberOfRecords(); + + void setNumberOfRecords(Integer count); }