[ 
https://issues.apache.org/jira/browse/BEAM-3060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16282749#comment-16282749
 ] 

ASF GitHub Bot commented on BEAM-3060:
--------------------------------------

jkff closed pull request #4209: [BEAM-3060] AvroIOIT
URL: https://github.com/apache/beam/pull/4209
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/file-based-io-tests/pom.xml 
b/sdks/java/io/file-based-io-tests/pom.xml
index 812bfea363a..fc523f614fd 100644
--- a/sdks/java/io/file-based-io-tests/pom.xml
+++ b/sdks/java/io/file-based-io-tests/pom.xml
@@ -196,5 +196,11 @@
             <artifactId>beam-sdks-java-io-common</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 </project>
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
new file mode 100644
index 00000000000..ce8da3357c9
--- /dev/null
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.avro;
+
+import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix;
+import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
+import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * An integration test for {@link AvroIO}.
+ *
+ * <p>Run this test using the command below. Pass in connection information 
via PipelineOptions:
+ * <pre>
+ *  mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests
+ *  -Dit.test=org.apache.beam.sdk.io.avro.AvroIOIT
+ *  -DintegrationTestPipelineOptions='[
+ *  "--numberOfRecords=100000",
+ *  "--filenamePrefix=output_file_path"
+ *  ]'
+ * </pre>
+ * </p>
+ * <p>Please see 'sdks/java/io/file-based-io-tests/pom.xml' for instructions 
regarding
+ * running this test using Beam performance testing framework.</p>
+ */
+@RunWith(JUnit4.class)
+public class AvroIOIT {
+
+
+  private static final Schema AVRO_SCHEMA = new Schema.Parser().parse("{\n"
+      + " \"namespace\": \"ioitavro\",\n"
+      + " \"type\": \"record\",\n"
+      + " \"name\": \"TestAvroLine\",\n"
+      + " \"fields\": [\n"
+      + "     {\"name\": \"row\", \"type\": \"string\"}\n"
+      + " ]\n"
+      + "}");
+
+  private static String filenamePrefix;
+  private static Long numberOfTextLines;
+
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setup() {
+    IOTestPipelineOptions options = readTestPipelineOptions();
+
+    numberOfTextLines = options.getNumberOfRecords();
+    filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
+  }
+
+  @Test
+  public void writeThenReadAll() {
+
+    PCollection<String> testFilenames = pipeline
+        .apply("Generate sequence", 
GenerateSequence.from(0).to(numberOfTextLines))
+        .apply("Produce text lines",
+            ParDo.of(new 
FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
+        .apply(
+            "Produce Avro records",
+            ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
+        .setCoder(AvroCoder.of(AVRO_SCHEMA))
+        .apply(
+            "Write Avro records to files",
+            AvroIO.writeGenericRecords(AVRO_SCHEMA).to(filenamePrefix)
+                .withOutputFilenames().withSuffix(".avro"))
+        .getPerDestinationOutputFilenames().apply(Values.<String>create());
+
+    PCollection<String> consolidatedHashcode = testFilenames
+        .apply("Read all files", AvroIO.readAllGenericRecords(AVRO_SCHEMA))
+        .apply("Parse Avro records to Strings", ParDo.of(new 
ParseAvroRecordsFn()))
+        .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+    String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
+    PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+    testFilenames.apply("Delete test files", ParDo.of(new 
FileBasedIOITHelper.DeleteFileFn())
+        
.withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static class DeterministicallyConstructAvroRecordsFn extends 
DoFn<String, GenericRecord> {
+    @ProcessElement
+    public void processElement(ProcessContext c){
+      c.output(
+          new GenericRecordBuilder(AVRO_SCHEMA).set("row", c.element()).build()
+      );
+    }
+  }
+
+  private static class ParseAvroRecordsFn extends DoFn<GenericRecord, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c){
+      c.output(String.valueOf(c.element().get("row")));
+    }
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add performance tests for commonly used file-based I/O PTransforms
> ------------------------------------------------------------------
>
>                 Key: BEAM-3060
>                 URL: https://issues.apache.org/jira/browse/BEAM-3060
>             Project: Beam
>          Issue Type: Test
>          Components: sdk-java-core
>            Reporter: Chamikara Jayalath
>            Assignee: Szymon Nieradka
>
> We recently added a performance testing framework [1] that can be used to do 
> following.
> (1) Execute Beam tests using PerfkitBenchmarker
> (2) Manage Kubernetes-based deployments of data stores.
> (3) Easily publish benchmark results. 
> I think it will be useful to add performance tests for commonly used 
> file-based I/O PTransforms using this framework. I suggest looking into 
> following formats initially.
> (1) AvroIO
> (2) TextIO
> (3) Compressed text using TextIO
> (4) TFRecordIO
> It should be possibly to run these tests for various Beam runners (Direct, 
> Dataflow, Flink, Spark, etc.) and file-systems (GCS, local, HDFS, etc.) 
> easily.
> In the initial version, tests can be made manually triggerable for PRs 
> through Jenkins. Later, we could make some of these tests run periodically 
> and publish benchmark results (to BigQuery) through PerfkitBenchmarker.
> [1] https://beam.apache.org/documentation/io/testing/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to