[ https://issues.apache.org/jira/browse/BEAM-3912?focusedWorklogId=174952&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174952 ]
ASF GitHub Bot logged work on BEAM-3912: ---------------------------------------- Author: ASF GitHub Bot Created on: 13/Dec/18 17:24 Start Date: 13/Dec/18 17:24 Worklog Time Spent: 10m Work Description: aromanenko-dev closed pull request #6306: [BEAM-3912] Add HadoopOutputFormatIO support URL: https://github.com/apache/beam/pull/6306 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/hadoop-format/build.gradle b/sdks/java/io/hadoop-format/build.gradle new file mode 100644 index 000000000000..83c01535cf6e --- /dev/null +++ b/sdks/java/io/hadoop-format/build.gradle @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: org.apache.beam.gradle.BeamModulePlugin +applyJavaNature() +provideIntegrationTestingDependencies() +enableJavaPerformanceTesting() + +description = "Apache Beam :: SDKs :: Java :: IO :: Hadoop Format" +ext.summary = "IO to read data from sources and to write data to sinks that implement Hadoop MapReduce Format." + +def log4j_version = "2.6.2" + +// Ban dependencies from the test runtime classpath +configurations.testRuntimeClasspath { + // Ban hive-exec and mesos since they bundle protobuf without repackaging + exclude group: "org.apache.hive", module: "hive-exec" + exclude group: "org.apache.mesos", module: "mesos" + // Prevent a StackOverflow because of wiring LOG4J -> SLF4J -> LOG4J + exclude group: "org.slf4j", module: "log4j-over-slf4j" +} + +dependencies { + shadow project(path: ":beam-sdks-java-core", configuration: "shadow") + compile library.java.guava + shadow library.java.slf4j_api + shadow project(path: ":beam-sdks-java-io-hadoop-common", configuration: "shadow") + provided library.java.hadoop_common + provided library.java.hadoop_mapreduce_client_core + testCompile project(path: ":beam-runners-direct-java", configuration: "shadow") + testCompile project(path: ":beam-sdks-java-core", configuration: "shadowTest") + testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadow") + testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadowTest") + testCompile project(path: ":beam-sdks-java-io-jdbc", configuration: "shadow") + testCompile project(path: ":beam-sdks-java-io-hadoop-input-format", configuration: "shadowTest") + testCompile library.java.postgres + testCompile "org.apache.logging.log4j:log4j-core:$log4j_version" + testCompile library.java.junit + testCompile library.java.hamcrest_core + testCompile library.java.hamcrest_library + shadow library.java.commons_io_2x +} + diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java new file mode 100644 index 000000000000..cadcbd4b3fc9 --- /dev/null +++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hadoop.format; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TODO: For the purpose of unification of InputFormat and OutputFormat into one common + * HadoopFormatIO the code of old HadoopInputFormat should be moved to here and HadoopInputFormatIO + * becomes deprecated in "hadoop-input-format" module. + * + * <p>A {@link HadoopFormatIO.Write} is a Transform for writing data to any sink which implements + * Hadoop {@link OutputFormat}. For example - Cassandra, Elasticsearch, HBase, Redis, Postgres etc. + * {@link HadoopFormatIO.Write} has to make several performance trade-offs in connecting to {@link + * OutputFormat}, so if there is another Beam IO Transform specifically for connecting to your data + * sink of choice, we would recommend using that one, but this IO Transform allows you to connect to + * many data sinks that do not yet have a Beam IO Transform. + * + * <p>You will need to pass a Hadoop {@link Configuration} with parameters specifying how the write + * will occur. Many properties of the Configuration are optional, and some are required for certain + * {@link OutputFormat} classes, but the following properties must be set for all OutputFormats: + * + * <ul> + * <li>{@code mapreduce.job.outputformat.class}: The {@link OutputFormat} class used to connect to + * your data sink of choice. + * <li>{@code mapreduce.job.outputformat.key.class}: The key class passed to the {@link + * OutputFormat} in {@code mapreduce.job.outputformat.class}. + * <li>{@code mapreduce.job.outputformat.value.class}: The value class passed to the {@link + * OutputFormat} in {@code mapreduce.job.outputformat.class}. + * </ul> + * + * For example: + * + * <pre>{@code + * Configuration myHadoopConfiguration = new Configuration(false); + * // Set Hadoop OutputFormat, key and value class in configuration + * myHadoopConfiguration.setClass("mapreduce.job.outputformat.class", + * MyDbOutputFormatClass, OutputFormat.class); + * myHadoopConfiguration.setClass("mapreduce.job.outputformat.key.class", + * MyDbOutputFormatKeyClass, Object.class); + * myHadoopConfiguration.setClass("mapreduce.job.outputformat.value.class", + * MyDbOutputFormatValueClass, Object.class); + * }</pre> + * + * <p>You will need to set appropriate OutputFormat key and value class (i.e. + * "mapreduce.job.outputformat.key.class" and "mapreduce.job.outputformat.value.class") in Hadoop + * {@link Configuration}. If you set different OutputFormat key or value class than OutputFormat's + * actual key or value class then, it may result in an error like "unexpected extra bytes after + * decoding" while the decoding process of key/value object happens. Hence, it is important to set + * appropriate OutputFormat key and value class. + * + * <h3>Writing using {@link HadoopFormatIO}</h3> + * + * <pre>{@code + * Pipeline p = ...; // Create pipeline. + * // Read data only with Hadoop configuration. + * p.apply("read", + * HadoopFormatIO.<OutputFormatKeyClass, OutputFormatKeyClass>write() + * .withConfiguration(myHadoopConfiguration); + * }</pre> + */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public class HadoopFormatIO { + private static final Logger LOG = LoggerFactory.getLogger(HadoopFormatIO.class); + + public static final String OUTPUTFORMAT_CLASS = "mapreduce.job.outputformat.class"; + public static final String OUTPUTFORMAT_KEY_CLASS = "mapreduce.job.outputformat.key.class"; + public static final String OUTPUTFORMAT_VALUE_CLASS = "mapreduce.job.outputformat.value.class"; + + /** + * Creates an uninitialized {@link HadoopFormatIO.Write}. Before use, the {@code Write} must be + * initialized with a HadoopFormatIO.Write#withConfiguration(HadoopConfiguration) that specifies + * the sink. + */ + public static <K, V> Write<K, V> write() { + return new AutoValue_HadoopFormatIO_Write.Builder<K, V>().build(); + } + + /** + * A {@link PTransform} that writes to any data sink which implements Hadoop OutputFormat. For + * e.g. Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. See the class-level Javadoc on + * {@link HadoopFormatIO} for more information. + * + * @param <K> Type of keys to be written. + * @param <V> Type of values to be written. + * @see HadoopFormatIO + */ + @AutoValue + public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> { + // Returns the Hadoop Configuration which contains specification of sink. + @Nullable + public abstract SerializableConfiguration getConfiguration(); + + @Nullable + public abstract TypeDescriptor<?> getOutputFormatClass(); + + @Nullable + public abstract TypeDescriptor<?> getOutputFormatKeyClass(); + + @Nullable + public abstract TypeDescriptor<?> getOutputFormatValueClass(); + + abstract Builder<K, V> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<K, V> { + abstract Builder<K, V> setConfiguration(SerializableConfiguration configuration); + + abstract Builder<K, V> setOutputFormatClass(TypeDescriptor<?> outputFormatClass); + + abstract Builder<K, V> setOutputFormatKeyClass(TypeDescriptor<?> outputFormatKeyClass); + + abstract Builder<K, V> setOutputFormatValueClass(TypeDescriptor<?> outputFormatValueClass); + + abstract Write<K, V> build(); + } + + /** Write to the sink using the options provided by the given configuration. */ + @SuppressWarnings("unchecked") + public Write<K, V> withConfiguration(Configuration configuration) { + validateConfiguration(configuration); + TypeDescriptor<?> outputFormatClass = + TypeDescriptor.of(configuration.getClass(OUTPUTFORMAT_CLASS, null)); + TypeDescriptor<?> outputFormatKeyClass = + TypeDescriptor.of(configuration.getClass(OUTPUTFORMAT_KEY_CLASS, null)); + TypeDescriptor<?> outputFormatValueClass = + TypeDescriptor.of(configuration.getClass(OUTPUTFORMAT_VALUE_CLASS, null)); + Builder<K, V> builder = + toBuilder().setConfiguration(new SerializableConfiguration(configuration)); + builder.setOutputFormatClass(outputFormatClass); + builder.setOutputFormatKeyClass(outputFormatKeyClass); + builder.setOutputFormatValueClass(outputFormatValueClass); + + return builder.build(); + } + + /** + * Validates that the mandatory configuration properties such as OutputFormat class, + * OutputFormat key and value classes are provided in the Hadoop configuration. + */ + private void validateConfiguration(Configuration configuration) { + checkArgument(configuration != null, "Configuration can not be null"); + checkArgument( + configuration.get(OUTPUTFORMAT_CLASS) != null, + "Configuration must contain \"" + OUTPUTFORMAT_CLASS + "\""); + checkArgument( + configuration.get(OUTPUTFORMAT_KEY_CLASS) != null, + "Configuration must contain \"" + OUTPUTFORMAT_KEY_CLASS + "\""); + checkArgument( + configuration.get(OUTPUTFORMAT_VALUE_CLASS) != null, + "Configuration must contain \"" + OUTPUTFORMAT_VALUE_CLASS + "\""); + } + + @Override + public void validate(PipelineOptions pipelineOptions) {} + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + Configuration hadoopConfig = getConfiguration().get(); + if (hadoopConfig != null) { + builder.addIfNotNull( + DisplayData.item(OUTPUTFORMAT_CLASS, hadoopConfig.get(OUTPUTFORMAT_CLASS)) + .withLabel("OutputFormat Class")); + builder.addIfNotNull( + DisplayData.item(OUTPUTFORMAT_KEY_CLASS, hadoopConfig.get(OUTPUTFORMAT_KEY_CLASS)) + .withLabel("OutputFormat Key Class")); + builder.addIfNotNull( + DisplayData.item(OUTPUTFORMAT_VALUE_CLASS, hadoopConfig.get(OUTPUTFORMAT_VALUE_CLASS)) + .withLabel("OutputFormat Value Class")); + } + } + + @Override + public PDone expand(PCollection<KV<K, V>> input) { + input.apply(ParDo.of(new WriteFn<>(this))); + return PDone.in(input.getPipeline()); + } + } + + private static class WriteFn<K, V> extends DoFn<KV<K, V>, Void> { + private final Write<K, V> spec; + private final SerializableConfiguration conf; + private transient RecordWriter<K, V> recordWriter; + private transient OutputCommitter outputCommitter; + private transient OutputFormat<?, ?> outputFormatObj; + private transient TaskAttemptContext taskAttemptContext; + + WriteFn(Write<K, V> spec) { + this.spec = spec; + conf = spec.getConfiguration(); + } + + @Setup + public void setup() throws IOException { + if (recordWriter == null) { + + taskAttemptContext = new TaskAttemptContextImpl(conf.get(), new TaskAttemptID()); + + try { + outputFormatObj = + (OutputFormat<?, ?>) + conf.get() + .getClassByName(conf.get().get(OUTPUTFORMAT_CLASS)) + .getConstructor() + .newInstance(); + } catch (InstantiationException + | IllegalAccessException + | ClassNotFoundException + | NoSuchMethodException + | InvocationTargetException e) { + throw new IOException("Unable to create OutputFormat object: ", e); + } + + try { + LOG.info("Creating new OutputCommitter."); + outputCommitter = outputFormatObj.getOutputCommitter(taskAttemptContext); + if (outputCommitter != null) { + outputCommitter.setupJob(new JobContextImpl(conf.get(), new JobID())); + } else { + LOG.warn("OutputCommitter is null."); + } + } catch (Exception e) { + throw new IOException("Unable to create OutputCommitter object: ", e); + } + + try { + LOG.info("Creating new RecordWriter."); + recordWriter = (RecordWriter<K, V>) outputFormatObj.getRecordWriter(taskAttemptContext); + } catch (InterruptedException e) { + throw new IOException("Unable to create RecordWriter object: ", e); + } + } + } + + @ProcessElement + public void processElement(ProcessContext c) + throws ExecutionException, InterruptedException, IOException { + recordWriter.write(c.element().getKey(), c.element().getValue()); + } + + @Teardown + public void teardown() throws Exception { + if (recordWriter != null) { + LOG.info("Closing RecordWriter."); + recordWriter.close(taskAttemptContext); + recordWriter = null; + } + + if (outputCommitter != null && outputCommitter.needsTaskCommit(taskAttemptContext)) { + LOG.info("Commit task for id " + taskAttemptContext.getTaskAttemptID().getTaskID()); + outputCommitter.commitTask(taskAttemptContext); + } + } + } +} diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/package-info.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/package-info.java new file mode 100644 index 000000000000..99b4d203d2fe --- /dev/null +++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines transforms for writing to Data sinks that implement Hadoop Output Format. + * + * @see org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO + */ +package org.apache.beam.sdk.io.hadoop.format; diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/EmployeeOutputFormat.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/EmployeeOutputFormat.java new file mode 100644 index 000000000000..1fb6d296f495 --- /dev/null +++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/EmployeeOutputFormat.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hadoop.format; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.io.hadoop.inputformat.Employee; +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This is a valid OutputFormat for writing employee data, available in the form of {@code + * List<KV>}. {@linkplain EmployeeOutputFormat} is used to test the {@linkplain HadoopFormatIO } + * sink. + */ +public class EmployeeOutputFormat extends OutputFormat<Text, Employee> { + private static volatile List<KV<Text, Employee>> output; + + @Override + public RecordWriter<Text, Employee> getRecordWriter(TaskAttemptContext context) { + return new RecordWriter<Text, Employee>() { + @Override + public void write(Text key, Employee value) { + synchronized (output) { + output.add(KV.of(key, value)); + } + } + + @Override + public void close(TaskAttemptContext context) {} + }; + } + + @Override + public void checkOutputSpecs(JobContext context) {} + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) { + return null; + } + + public static synchronized void initWrittenOutput() { + output = Collections.synchronizedList(new ArrayList<>()); + } + + public static List<KV<Text, Employee>> getWrittenOutput() { + return output; + } +} diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java new file mode 100644 index 000000000000..2e0d8a1a0d71 --- /dev/null +++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hadoop.format; + +import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry; +import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions; +import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount; + +import java.sql.SQLException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.DatabaseTestHelper; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions; +import org.apache.beam.sdk.io.common.TestRow; +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; +import org.apache.beam.sdk.io.hadoop.inputformat.TestRowDBWritable; +import org.apache.beam.sdk.io.jdbc.JdbcIO; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.postgresql.ds.PGSimpleDataSource; + +/** + * A test of {@link org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO} on an independent Postgres + * instance. + * + * <p>This test requires a running instance of Postgres. Pass in connection information using + * PipelineOptions: + * + * <pre> + * ./gradlew integrationTest -p sdks/java/io/hadoop-format/ + * -DintegrationTestPipelineOptions='[ + * "--postgresServerName=1.2.3.4", + * "--postgresUsername=postgres", + * "--postgresDatabaseName=myfancydb", + * "--postgresPassword=mypass", + * "--postgresSsl=false", + * "--numberOfRecords=1000" ]' + * --tests org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOIT + * -DintegrationTestRunner=direct + * </pre> + * + * <p>Please see 'build_rules.gradle' file for instructions regarding running this test using Beam + * performance testing framework. + */ +public class HadoopFormatIOIT { + + private static PGSimpleDataSource dataSource; + private static Integer numberOfRows; + private static String tableName; + private static SerializableConfiguration hadoopConfiguration; + + @Rule public TestPipeline writePipeline = TestPipeline.create(); + @Rule public TestPipeline readPipeline = TestPipeline.create(); + + @BeforeClass + public static void setUp() throws Exception { + PostgresIOTestPipelineOptions options = + readIOTestPipelineOptions(PostgresIOTestPipelineOptions.class); + + dataSource = DatabaseTestHelper.getPostgresDataSource(options); + numberOfRows = options.getNumberOfRecords(); + tableName = DatabaseTestHelper.getTestTableName("HadoopFormatIOIT"); + + executeWithRetry(HadoopFormatIOIT::createTable); + setupHadoopConfiguration(options); + } + + private static void createTable() throws SQLException { + DatabaseTestHelper.createTable(dataSource, tableName); + } + + private static void setupHadoopConfiguration(PostgresIOTestPipelineOptions options) { + Configuration conf = new Configuration(); + DBConfiguration.configureDB( + conf, + "org.postgresql.Driver", + DatabaseTestHelper.getPostgresDBUrl(options), + options.getPostgresUsername(), + options.getPostgresPassword()); + conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName); + conf.set(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, "2"); + conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, "id", "name"); + + conf.setClass(HadoopFormatIO.OUTPUTFORMAT_KEY_CLASS, TestRowDBWritable.class, Object.class); + conf.setClass(HadoopFormatIO.OUTPUTFORMAT_VALUE_CLASS, NullWritable.class, Object.class); + conf.setClass(HadoopFormatIO.OUTPUTFORMAT_CLASS, DBOutputFormat.class, OutputFormat.class); + + hadoopConfiguration = new SerializableConfiguration(conf); + } + + @AfterClass + public static void tearDown() throws Exception { + executeWithRetry(HadoopFormatIOIT::deleteTable); + } + + private static void deleteTable() throws SQLException { + DatabaseTestHelper.deleteTable(dataSource, tableName); + } + + @Test + public void writeUsingHadooFormatIO() { + writePipeline + .apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows)) + .apply("Produce db rows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) + .apply("Construct rows for DBOutputFormat", ParDo.of(new ConstructDBOutputFormatRowFn())) + .apply( + "Write using HadoopFormatIO", + HadoopFormatIO.<TestRowDBWritable, NullWritable>write() + .withConfiguration(hadoopConfiguration.get())); + + writePipeline.run().waitUntilFinish(); + + PCollection<String> consolidatedHashcode = + readPipeline + .apply( + "Read using JDBCIO", + JdbcIO.<String>read() + .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)) + .withQuery(String.format("select name,id from %s;", tableName)) + .withRowMapper( + (JdbcIO.RowMapper<String>) resultSet -> resultSet.getString("name")) + .withCoder(StringUtf8Coder.of())) + .apply("Calculate hashcode", Combine.globally(new HashingFn())); + PAssert.thatSingleton(consolidatedHashcode).isEqualTo(getExpectedHashForRowCount(numberOfRows)); + + readPipeline.run().waitUntilFinish(); + } + + /** + * Uses the input {@link TestRow} values as seeds to produce new {@link KV}s for {@link + * HadoopFormatIO}. + */ + public static class ConstructDBOutputFormatRowFn + extends DoFn<TestRow, KV<TestRowDBWritable, NullWritable>> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output( + KV.of(new TestRowDBWritable(c.element().id(), c.element().name()), NullWritable.get())); + } + } +} diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTest.java new file mode 100644 index 000000000000..323ca901ebd7 --- /dev/null +++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hadoop.format; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; +import org.apache.beam.sdk.io.hadoop.inputformat.Employee; +import org.apache.beam.sdk.io.hadoop.inputformat.TestEmployeeDataSet; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link HadoopFormatIO}. */ +@RunWith(JUnit4.class) +public class HadoopFormatIOTest { + private static SerializableConfiguration serConf; + + @Rule public final transient TestPipeline p = TestPipeline.create(); + @Rule public ExpectedException thrown = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + EmployeeOutputFormat.initWrittenOutput(); + serConf = loadTestConfiguration(EmployeeOutputFormat.class, Text.class, Employee.class); + } + + private static SerializableConfiguration loadTestConfiguration( + Class<?> outputFormatClassName, Class<?> keyClass, Class<?> valueClass) { + Configuration conf = new Configuration(); + conf.setClass("mapreduce.job.outputformat.class", outputFormatClassName, OutputFormat.class); + conf.setClass("mapreduce.job.outputformat.key.class", keyClass, Object.class); + conf.setClass("mapreduce.job.outputformat.value.class", valueClass, Object.class); + return new SerializableConfiguration(conf); + } + + @Test + public void testWriteBuildsCorrectly() { + HadoopFormatIO.Write<Text, Employee> write = + HadoopFormatIO.<Text, Employee>write().withConfiguration(serConf.get()); + + assertEquals(serConf.get(), write.getConfiguration().get()); + assertEquals(EmployeeOutputFormat.class, write.getOutputFormatClass().getRawType()); + assertEquals(Text.class, write.getOutputFormatKeyClass().getRawType()); + assertEquals(Employee.class, write.getOutputFormatValueClass().getRawType()); + } + + /** + * This test validates {@link HadoopFormatIO.Write Write} transform object creation fails with + * null configuration. {@link HadoopFormatIO.Write#withConfiguration(Configuration) + * withConfiguration(Configuration)} method checks configuration is null and throws exception if + * it is null. + */ + @Test + public void testWriteObjectCreationFailsIfConfigurationIsNull() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Configuration can not be null"); + HadoopFormatIO.<Text, Employee>write().withConfiguration(null); + } + + /** + * This test validates functionality of {@link + * HadoopFormatIO.Write#withConfiguration(Configuration) withConfiguration(Configuration)} + * function when Hadoop OutputFormat class is not provided by the user in configuration. + */ + @Test + public void testWriteValidationFailsMissingOutputFormatInConf() { + Configuration configuration = new Configuration(); + configuration.setClass(HadoopFormatIO.OUTPUTFORMAT_KEY_CLASS, Text.class, Object.class); + configuration.setClass(HadoopFormatIO.OUTPUTFORMAT_VALUE_CLASS, Employee.class, Object.class); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Configuration must contain \"mapreduce.job.outputformat.class\""); + HadoopFormatIO.<Text, Employee>write().withConfiguration(configuration); + } + + /** + * This test validates functionality of {@link + * HadoopFormatIO.Write#withConfiguration(Configuration) withConfiguration(Configuration)} + * function when key class is not provided by the user in configuration. + */ + @Test + public void testWriteValidationFailsMissingKeyClassInConf() { + Configuration configuration = new Configuration(); + configuration.setClass( + HadoopFormatIO.OUTPUTFORMAT_CLASS, TextOutputFormat.class, OutputFormat.class); + configuration.setClass(HadoopFormatIO.OUTPUTFORMAT_VALUE_CLASS, Employee.class, Object.class); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Configuration must contain \"mapreduce.job.outputformat.key.class\""); + HadoopFormatIO.<Text, Employee>write().withConfiguration(configuration); + } + + /** + * This test validates functionality of {@link + * HadoopFormatIO.Write#withConfiguration(Configuration) withConfiguration(Configuration)} + * function when value class is not provided by the user in configuration. + */ + @Test + public void testWriteValidationFailsMissingValueClassInConf() { + Configuration configuration = new Configuration(); + configuration.setClass( + HadoopFormatIO.OUTPUTFORMAT_CLASS, TextOutputFormat.class, OutputFormat.class); + configuration.setClass(HadoopFormatIO.OUTPUTFORMAT_KEY_CLASS, Text.class, Object.class); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Configuration must contain \"mapreduce.job.outputformat.value.class\""); + HadoopFormatIO.<Text, Employee>write().withConfiguration(configuration); + } + + @Test + public void testWritingData() { + List<KV<Text, Employee>> data = TestEmployeeDataSet.getEmployeeData(); + PCollection<KV<Text, Employee>> input = p.apply(Create.of(data)); + input.apply("Write", HadoopFormatIO.<Text, Employee>write().withConfiguration(serConf.get())); + p.run(); + + List<KV<Text, Employee>> writtenOutput = EmployeeOutputFormat.getWrittenOutput(); + assertEquals(data.size(), writtenOutput.size()); + assertTrue(data.containsAll(writtenOutput)); + assertTrue(writtenOutput.containsAll(data)); + } + + @Test + public void testWritingDataFailInvalidKeyType() { + List<KV<String, Employee>> data = new ArrayList<>(); + data.add(KV.of("key", new Employee("name", "address"))); + PCollection<KV<String, Employee>> input = p.apply(Create.of(data)); + input.apply("Write", HadoopFormatIO.<String, Employee>write().withConfiguration(serConf.get())); + thrown.expect(Pipeline.PipelineExecutionException.class); + p.run(); + } + + @Test + public void testWritingDataFailInvalidValueType() { + List<KV<Text, Text>> data = new ArrayList<>(); + data.add(KV.of(new Text("key"), new Text("value"))); + PCollection<KV<Text, Text>> input = p.apply(Create.of(data)); + input.apply("Write", HadoopFormatIO.<Text, Text>write().withConfiguration(serConf.get())); + thrown.expect(Pipeline.PipelineExecutionException.class); + p.run(); + } + + /** + * This test validates functionality of {@link + * HadoopFormatIO.Write#populateDisplayData(DisplayData.Builder) + * populateDisplayData(DisplayData.Builder)}. + */ + @Test + public void testWriteDisplayData() { + HadoopFormatIO.Write<String, String> write = + HadoopFormatIO.<String, String>write().withConfiguration(serConf.get()); + DisplayData displayData = DisplayData.from(write); + + assertThat( + displayData, + hasDisplayItem( + HadoopFormatIO.OUTPUTFORMAT_CLASS, + serConf.get().get(HadoopFormatIO.OUTPUTFORMAT_CLASS))); + assertThat( + displayData, + hasDisplayItem( + HadoopFormatIO.OUTPUTFORMAT_KEY_CLASS, + serConf.get().get(HadoopFormatIO.OUTPUTFORMAT_KEY_CLASS))); + assertThat( + displayData, + hasDisplayItem( + HadoopFormatIO.OUTPUTFORMAT_VALUE_CLASS, + serConf.get().get(HadoopFormatIO.OUTPUTFORMAT_VALUE_CLASS))); + } +} diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java index ff36d5dbe51b..20c78c1cff69 100644 --- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java +++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java @@ -25,7 +25,7 @@ * Test Utils used in {@link EmployeeInputFormat} and {@link ReuseObjectsEmployeeInputFormat} for * computing splits. */ -class TestEmployeeDataSet { +public class TestEmployeeDataSet { public static final long NUMBER_OF_RECORDS_IN_EACH_SPLIT = 5L; public static final long NUMBER_OF_SPLITS = 3L; private static final List<KV<String, String>> data = new ArrayList<>(); diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java index ea3aeca7a13d..68aaaa8f9ef8 100644 --- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java +++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java @@ -35,11 +35,18 @@ * org.apache.hadoop.mapreduce.lib.db.DBInputFormat}. */ @DefaultCoder(AvroCoder.class) -class TestRowDBWritable extends TestRow implements DBWritable, Writable { +public class TestRowDBWritable extends TestRow implements DBWritable, Writable { private Integer id; private String name; + public TestRowDBWritable() {} + + public TestRowDBWritable(Integer id, String name) { + this.id = id; + this.name = name; + } + @Override public Integer id() { return id; diff --git a/sdks/java/javadoc/build.gradle b/sdks/java/javadoc/build.gradle index 61565b0a5fe1..e80f2634bdb8 100644 --- a/sdks/java/javadoc/build.gradle +++ b/sdks/java/javadoc/build.gradle @@ -59,6 +59,7 @@ def exportedJavadocProjects = [ ':beam-sdks-java-io-google-cloud-platform', ':beam-sdks-java-io-hadoop-common', ':beam-sdks-java-io-hadoop-file-system', + ':beam-sdks-java-io-hadoop-format', ':beam-sdks-java-io-hadoop-input-format', ':beam-sdks-java-io-hbase', ':beam-sdks-java-io-hcatalog', diff --git a/settings.gradle b/settings.gradle index 3ccdd4418c78..f7d2075857a6 100644 --- a/settings.gradle +++ b/settings.gradle @@ -130,6 +130,8 @@ include "beam-sdks-java-io-hadoop-file-system" project(":beam-sdks-java-io-hadoop-file-system").dir = file("sdks/java/io/hadoop-file-system") include "beam-sdks-java-io-hadoop-input-format" project(":beam-sdks-java-io-hadoop-input-format").dir = file("sdks/java/io/hadoop-input-format") +include "beam-sdks-java-io-hadoop-format" +project(":beam-sdks-java-io-hadoop-format").dir = file("sdks/java/io/hadoop-format") include "beam-sdks-java-io-hbase" project(":beam-sdks-java-io-hbase").dir = file("sdks/java/io/hbase") include "beam-sdks-java-io-hcatalog" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 174952) Time Spent: 10h 40m (was: 10.5h) > Add batching support for HadoopOutputFormatIO > --------------------------------------------- > > Key: BEAM-3912 > URL: https://issues.apache.org/jira/browse/BEAM-3912 > Project: Beam > Issue Type: Sub-task > Components: io-java-hadoop > Reporter: Alexey Romanenko > Assignee: Alexey Romanenko > Priority: Minor > Fix For: 2.10.0 > > Time Spent: 10h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)