[ 
https://issues.apache.org/jira/browse/BEAM-3912?focusedWorklogId=174952&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174952
 ]

ASF GitHub Bot logged work on BEAM-3912:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Dec/18 17:24
            Start Date: 13/Dec/18 17:24
    Worklog Time Spent: 10m 
      Work Description: aromanenko-dev closed pull request #6306: [BEAM-3912] 
Add HadoopOutputFormatIO support
URL: https://github.com/apache/beam/pull/6306
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/hadoop-format/build.gradle 
b/sdks/java/io/hadoop-format/build.gradle
new file mode 100644
index 000000000000..83c01535cf6e
--- /dev/null
+++ b/sdks/java/io/hadoop-format/build.gradle
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: org.apache.beam.gradle.BeamModulePlugin
+applyJavaNature()
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Hadoop Format"
+ext.summary = "IO to read data from sources and to write data to sinks that 
implement Hadoop MapReduce Format."
+
+def log4j_version = "2.6.2"
+
+// Ban dependencies from the test runtime classpath
+configurations.testRuntimeClasspath {
+  // Ban hive-exec and mesos since they bundle protobuf without repackaging
+  exclude group: "org.apache.hive", module: "hive-exec"
+  exclude group: "org.apache.mesos", module: "mesos"
+  // Prevent a StackOverflow because of wiring LOG4J -> SLF4J -> LOG4J
+  exclude group: "org.slf4j", module: "log4j-over-slf4j"
+}
+
+dependencies {
+  shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
+  compile library.java.guava
+  shadow library.java.slf4j_api
+  shadow project(path: ":beam-sdks-java-io-hadoop-common", configuration: 
"shadow")
+  provided library.java.hadoop_common
+  provided library.java.hadoop_mapreduce_client_core
+  testCompile project(path: ":beam-runners-direct-java", configuration: 
"shadow")
+  testCompile project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
+  testCompile project(path: ":beam-sdks-java-io-common", configuration: 
"shadow")
+  testCompile project(path: ":beam-sdks-java-io-common", configuration: 
"shadowTest")
+  testCompile project(path: ":beam-sdks-java-io-jdbc", configuration: "shadow")
+  testCompile project(path: ":beam-sdks-java-io-hadoop-input-format", 
configuration: "shadowTest")
+  testCompile library.java.postgres
+  testCompile "org.apache.logging.log4j:log4j-core:$log4j_version"
+  testCompile library.java.junit
+  testCompile library.java.hamcrest_core
+  testCompile library.java.hamcrest_library
+  shadow library.java.commons_io_2x
+}
+
diff --git 
a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
new file mode 100644
index 000000000000..cadcbd4b3fc9
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO: For the purpose of unification of InputFormat and OutputFormat into 
one common
+ * HadoopFormatIO the code of old HadoopInputFormat should be moved to here 
and HadoopInputFormatIO
+ * becomes deprecated in "hadoop-input-format" module.
+ *
+ * <p>A {@link HadoopFormatIO.Write} is a Transform for writing data to any 
sink which implements
+ * Hadoop {@link OutputFormat}. For example - Cassandra, Elasticsearch, HBase, 
Redis, Postgres etc.
+ * {@link HadoopFormatIO.Write} has to make several performance trade-offs in 
connecting to {@link
+ * OutputFormat}, so if there is another Beam IO Transform specifically for 
connecting to your data
+ * sink of choice, we would recommend using that one, but this IO Transform 
allows you to connect to
+ * many data sinks that do not yet have a Beam IO Transform.
+ *
+ * <p>You will need to pass a Hadoop {@link Configuration} with parameters 
specifying how the write
+ * will occur. Many properties of the Configuration are optional, and some are 
required for certain
+ * {@link OutputFormat} classes, but the following properties must be set for 
all OutputFormats:
+ *
+ * <ul>
+ *   <li>{@code mapreduce.job.outputformat.class}: The {@link OutputFormat} 
class used to connect to
+ *       your data sink of choice.
+ *   <li>{@code mapreduce.job.outputformat.key.class}: The key class passed to 
the {@link
+ *       OutputFormat} in {@code mapreduce.job.outputformat.class}.
+ *   <li>{@code mapreduce.job.outputformat.value.class}: The value class 
passed to the {@link
+ *       OutputFormat} in {@code mapreduce.job.outputformat.class}.
+ * </ul>
+ *
+ * For example:
+ *
+ * <pre>{@code
+ * Configuration myHadoopConfiguration = new Configuration(false);
+ * // Set Hadoop OutputFormat, key and value class in configuration
+ * myHadoopConfiguration.setClass(&quot;mapreduce.job.outputformat.class&quot;,
+ *    MyDbOutputFormatClass, OutputFormat.class);
+ * 
myHadoopConfiguration.setClass(&quot;mapreduce.job.outputformat.key.class&quot;,
+ *    MyDbOutputFormatKeyClass, Object.class);
+ * 
myHadoopConfiguration.setClass(&quot;mapreduce.job.outputformat.value.class&quot;,
+ *    MyDbOutputFormatValueClass, Object.class);
+ * }</pre>
+ *
+ * <p>You will need to set appropriate OutputFormat key and value class (i.e.
+ * "mapreduce.job.outputformat.key.class" and 
"mapreduce.job.outputformat.value.class") in Hadoop
+ * {@link Configuration}. If you set different OutputFormat key or value class 
than OutputFormat's
+ * actual key or value class then, it may result in an error like "unexpected 
extra bytes after
+ * decoding" while the decoding process of key/value object happens. Hence, it 
is important to set
+ * appropriate OutputFormat key and value class.
+ *
+ * <h3>Writing using {@link HadoopFormatIO}</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...; // Create pipeline.
+ * // Read data only with Hadoop configuration.
+ * p.apply("read",
+ *     HadoopFormatIO.<OutputFormatKeyClass, OutputFormatKeyClass>write()
+ *              .withConfiguration(myHadoopConfiguration);
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class HadoopFormatIO {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HadoopFormatIO.class);
+
+  public static final String OUTPUTFORMAT_CLASS = 
"mapreduce.job.outputformat.class";
+  public static final String OUTPUTFORMAT_KEY_CLASS = 
"mapreduce.job.outputformat.key.class";
+  public static final String OUTPUTFORMAT_VALUE_CLASS = 
"mapreduce.job.outputformat.value.class";
+
+  /**
+   * Creates an uninitialized {@link HadoopFormatIO.Write}. Before use, the 
{@code Write} must be
+   * initialized with a 
HadoopFormatIO.Write#withConfiguration(HadoopConfiguration) that specifies
+   * the sink.
+   */
+  public static <K, V> Write<K, V> write() {
+    return new AutoValue_HadoopFormatIO_Write.Builder<K, V>().build();
+  }
+
+  /**
+   * A {@link PTransform} that writes to any data sink which implements Hadoop 
OutputFormat. For
+   * e.g. Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. See the 
class-level Javadoc on
+   * {@link HadoopFormatIO} for more information.
+   *
+   * @param <K> Type of keys to be written.
+   * @param <V> Type of values to be written.
+   * @see HadoopFormatIO
+   */
+  @AutoValue
+  public abstract static class Write<K, V> extends 
PTransform<PCollection<KV<K, V>>, PDone> {
+    // Returns the Hadoop Configuration which contains specification of sink.
+    @Nullable
+    public abstract SerializableConfiguration getConfiguration();
+
+    @Nullable
+    public abstract TypeDescriptor<?> getOutputFormatClass();
+
+    @Nullable
+    public abstract TypeDescriptor<?> getOutputFormatKeyClass();
+
+    @Nullable
+    public abstract TypeDescriptor<?> getOutputFormatValueClass();
+
+    abstract Builder<K, V> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+      abstract Builder<K, V> setConfiguration(SerializableConfiguration 
configuration);
+
+      abstract Builder<K, V> setOutputFormatClass(TypeDescriptor<?> 
outputFormatClass);
+
+      abstract Builder<K, V> setOutputFormatKeyClass(TypeDescriptor<?> 
outputFormatKeyClass);
+
+      abstract Builder<K, V> setOutputFormatValueClass(TypeDescriptor<?> 
outputFormatValueClass);
+
+      abstract Write<K, V> build();
+    }
+
+    /** Write to the sink using the options provided by the given 
configuration. */
+    @SuppressWarnings("unchecked")
+    public Write<K, V> withConfiguration(Configuration configuration) {
+      validateConfiguration(configuration);
+      TypeDescriptor<?> outputFormatClass =
+          TypeDescriptor.of(configuration.getClass(OUTPUTFORMAT_CLASS, null));
+      TypeDescriptor<?> outputFormatKeyClass =
+          TypeDescriptor.of(configuration.getClass(OUTPUTFORMAT_KEY_CLASS, 
null));
+      TypeDescriptor<?> outputFormatValueClass =
+          TypeDescriptor.of(configuration.getClass(OUTPUTFORMAT_VALUE_CLASS, 
null));
+      Builder<K, V> builder =
+          toBuilder().setConfiguration(new 
SerializableConfiguration(configuration));
+      builder.setOutputFormatClass(outputFormatClass);
+      builder.setOutputFormatKeyClass(outputFormatKeyClass);
+      builder.setOutputFormatValueClass(outputFormatValueClass);
+
+      return builder.build();
+    }
+
+    /**
+     * Validates that the mandatory configuration properties such as 
OutputFormat class,
+     * OutputFormat key and value classes are provided in the Hadoop 
configuration.
+     */
+    private void validateConfiguration(Configuration configuration) {
+      checkArgument(configuration != null, "Configuration can not be null");
+      checkArgument(
+          configuration.get(OUTPUTFORMAT_CLASS) != null,
+          "Configuration must contain \"" + OUTPUTFORMAT_CLASS + "\"");
+      checkArgument(
+          configuration.get(OUTPUTFORMAT_KEY_CLASS) != null,
+          "Configuration must contain \"" + OUTPUTFORMAT_KEY_CLASS + "\"");
+      checkArgument(
+          configuration.get(OUTPUTFORMAT_VALUE_CLASS) != null,
+          "Configuration must contain \"" + OUTPUTFORMAT_VALUE_CLASS + "\"");
+    }
+
+    @Override
+    public void validate(PipelineOptions pipelineOptions) {}
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      Configuration hadoopConfig = getConfiguration().get();
+      if (hadoopConfig != null) {
+        builder.addIfNotNull(
+            DisplayData.item(OUTPUTFORMAT_CLASS, 
hadoopConfig.get(OUTPUTFORMAT_CLASS))
+                .withLabel("OutputFormat Class"));
+        builder.addIfNotNull(
+            DisplayData.item(OUTPUTFORMAT_KEY_CLASS, 
hadoopConfig.get(OUTPUTFORMAT_KEY_CLASS))
+                .withLabel("OutputFormat Key Class"));
+        builder.addIfNotNull(
+            DisplayData.item(OUTPUTFORMAT_VALUE_CLASS, 
hadoopConfig.get(OUTPUTFORMAT_VALUE_CLASS))
+                .withLabel("OutputFormat Value Class"));
+      }
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<K, V>> input) {
+      input.apply(ParDo.of(new WriteFn<>(this)));
+      return PDone.in(input.getPipeline());
+    }
+  }
+
+  private static class WriteFn<K, V> extends DoFn<KV<K, V>, Void> {
+    private final Write<K, V> spec;
+    private final SerializableConfiguration conf;
+    private transient RecordWriter<K, V> recordWriter;
+    private transient OutputCommitter outputCommitter;
+    private transient OutputFormat<?, ?> outputFormatObj;
+    private transient TaskAttemptContext taskAttemptContext;
+
+    WriteFn(Write<K, V> spec) {
+      this.spec = spec;
+      conf = spec.getConfiguration();
+    }
+
+    @Setup
+    public void setup() throws IOException {
+      if (recordWriter == null) {
+
+        taskAttemptContext = new TaskAttemptContextImpl(conf.get(), new 
TaskAttemptID());
+
+        try {
+          outputFormatObj =
+              (OutputFormat<?, ?>)
+                  conf.get()
+                      .getClassByName(conf.get().get(OUTPUTFORMAT_CLASS))
+                      .getConstructor()
+                      .newInstance();
+        } catch (InstantiationException
+            | IllegalAccessException
+            | ClassNotFoundException
+            | NoSuchMethodException
+            | InvocationTargetException e) {
+          throw new IOException("Unable to create OutputFormat object: ", e);
+        }
+
+        try {
+          LOG.info("Creating new OutputCommitter.");
+          outputCommitter = 
outputFormatObj.getOutputCommitter(taskAttemptContext);
+          if (outputCommitter != null) {
+            outputCommitter.setupJob(new JobContextImpl(conf.get(), new 
JobID()));
+          } else {
+            LOG.warn("OutputCommitter is null.");
+          }
+        } catch (Exception e) {
+          throw new IOException("Unable to create OutputCommitter object: ", 
e);
+        }
+
+        try {
+          LOG.info("Creating new RecordWriter.");
+          recordWriter = (RecordWriter<K, V>) 
outputFormatObj.getRecordWriter(taskAttemptContext);
+        } catch (InterruptedException e) {
+          throw new IOException("Unable to create RecordWriter object: ", e);
+        }
+      }
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c)
+        throws ExecutionException, InterruptedException, IOException {
+      recordWriter.write(c.element().getKey(), c.element().getValue());
+    }
+
+    @Teardown
+    public void teardown() throws Exception {
+      if (recordWriter != null) {
+        LOG.info("Closing RecordWriter.");
+        recordWriter.close(taskAttemptContext);
+        recordWriter = null;
+      }
+
+      if (outputCommitter != null && 
outputCommitter.needsTaskCommit(taskAttemptContext)) {
+        LOG.info("Commit task for id " + 
taskAttemptContext.getTaskAttemptID().getTaskID());
+        outputCommitter.commitTask(taskAttemptContext);
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/package-info.java
 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/package-info.java
new file mode 100644
index 000000000000..99b4d203d2fe
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines transforms for writing to Data sinks that implement Hadoop Output 
Format.
+ *
+ * @see org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO
+ */
+package org.apache.beam.sdk.io.hadoop.format;
diff --git 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/EmployeeOutputFormat.java
 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/EmployeeOutputFormat.java
new file mode 100644
index 000000000000..1fb6d296f495
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/EmployeeOutputFormat.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.hadoop.inputformat.Employee;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This is a valid OutputFormat for writing employee data, available in the 
form of {@code
+ * List<KV>}. {@linkplain EmployeeOutputFormat} is used to test the 
{@linkplain HadoopFormatIO }
+ * sink.
+ */
+public class EmployeeOutputFormat extends OutputFormat<Text, Employee> {
+  private static volatile List<KV<Text, Employee>> output;
+
+  @Override
+  public RecordWriter<Text, Employee> getRecordWriter(TaskAttemptContext 
context) {
+    return new RecordWriter<Text, Employee>() {
+      @Override
+      public void write(Text key, Employee value) {
+        synchronized (output) {
+          output.add(KV.of(key, value));
+        }
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) {}
+    };
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context) {}
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+    return null;
+  }
+
+  public static synchronized void initWrittenOutput() {
+    output = Collections.synchronizedList(new ArrayList<>());
+  }
+
+  public static List<KV<Text, Employee>> getWrittenOutput() {
+    return output;
+  }
+}
diff --git 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
new file mode 100644
index 000000000000..2e0d8a1a0d71
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
+import static 
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount;
+
+import java.sql.SQLException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.io.hadoop.inputformat.TestRowDBWritable;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.postgresql.ds.PGSimpleDataSource;
+
+/**
+ * A test of {@link org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO} on an 
independent Postgres
+ * instance.
+ *
+ * <p>This test requires a running instance of Postgres. Pass in connection 
information using
+ * PipelineOptions:
+ *
+ * <pre>
+ *  ./gradlew integrationTest -p sdks/java/io/hadoop-format/
+ *   -DintegrationTestPipelineOptions='[
+ *     "--postgresServerName=1.2.3.4",
+ *     "--postgresUsername=postgres",
+ *     "--postgresDatabaseName=myfancydb",
+ *     "--postgresPassword=mypass",
+ *     "--postgresSsl=false",
+ *     "--numberOfRecords=1000" ]'
+ *  --tests org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOIT
+ *  -DintegrationTestRunner=direct
+ * </pre>
+ *
+ * <p>Please see 'build_rules.gradle' file for instructions regarding running 
this test using Beam
+ * performance testing framework.
+ */
+public class HadoopFormatIOIT {
+
+  private static PGSimpleDataSource dataSource;
+  private static Integer numberOfRows;
+  private static String tableName;
+  private static SerializableConfiguration hadoopConfiguration;
+
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    PostgresIOTestPipelineOptions options =
+        readIOTestPipelineOptions(PostgresIOTestPipelineOptions.class);
+
+    dataSource = DatabaseTestHelper.getPostgresDataSource(options);
+    numberOfRows = options.getNumberOfRecords();
+    tableName = DatabaseTestHelper.getTestTableName("HadoopFormatIOIT");
+
+    executeWithRetry(HadoopFormatIOIT::createTable);
+    setupHadoopConfiguration(options);
+  }
+
+  private static void createTable() throws SQLException {
+    DatabaseTestHelper.createTable(dataSource, tableName);
+  }
+
+  private static void setupHadoopConfiguration(PostgresIOTestPipelineOptions 
options) {
+    Configuration conf = new Configuration();
+    DBConfiguration.configureDB(
+        conf,
+        "org.postgresql.Driver",
+        DatabaseTestHelper.getPostgresDBUrl(options),
+        options.getPostgresUsername(),
+        options.getPostgresPassword());
+    conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
+    conf.set(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, "2");
+    conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, "id", "name");
+
+    conf.setClass(HadoopFormatIO.OUTPUTFORMAT_KEY_CLASS, 
TestRowDBWritable.class, Object.class);
+    conf.setClass(HadoopFormatIO.OUTPUTFORMAT_VALUE_CLASS, NullWritable.class, 
Object.class);
+    conf.setClass(HadoopFormatIO.OUTPUTFORMAT_CLASS, DBOutputFormat.class, 
OutputFormat.class);
+
+    hadoopConfiguration = new SerializableConfiguration(conf);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    executeWithRetry(HadoopFormatIOIT::deleteTable);
+  }
+
+  private static void deleteTable() throws SQLException {
+    DatabaseTestHelper.deleteTable(dataSource, tableName);
+  }
+
+  @Test
+  public void writeUsingHadooFormatIO() {
+    writePipeline
+        .apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows))
+        .apply("Produce db rows", ParDo.of(new 
TestRow.DeterministicallyConstructTestRowFn()))
+        .apply("Construct rows for DBOutputFormat", ParDo.of(new 
ConstructDBOutputFormatRowFn()))
+        .apply(
+            "Write using HadoopFormatIO",
+            HadoopFormatIO.<TestRowDBWritable, NullWritable>write()
+                .withConfiguration(hadoopConfiguration.get()));
+
+    writePipeline.run().waitUntilFinish();
+
+    PCollection<String> consolidatedHashcode =
+        readPipeline
+            .apply(
+                "Read using JDBCIO",
+                JdbcIO.<String>read()
+                    
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+                    .withQuery(String.format("select name,id from %s;", 
tableName))
+                    .withRowMapper(
+                        (JdbcIO.RowMapper<String>) resultSet -> 
resultSet.getString("name"))
+                    .withCoder(StringUtf8Coder.of()))
+            .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+    
PAssert.thatSingleton(consolidatedHashcode).isEqualTo(getExpectedHashForRowCount(numberOfRows));
+
+    readPipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * Uses the input {@link TestRow} values as seeds to produce new {@link KV}s 
for {@link
+   * HadoopFormatIO}.
+   */
+  public static class ConstructDBOutputFormatRowFn
+      extends DoFn<TestRow, KV<TestRowDBWritable, NullWritable>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(
+          KV.of(new TestRowDBWritable(c.element().id(), c.element().name()), 
NullWritable.get()));
+    }
+  }
+}
diff --git 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTest.java
 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTest.java
new file mode 100644
index 000000000000..323ca901ebd7
--- /dev/null
+++ 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.io.hadoop.inputformat.Employee;
+import org.apache.beam.sdk.io.hadoop.inputformat.TestEmployeeDataSet;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link HadoopFormatIO}. */
+@RunWith(JUnit4.class)
+public class HadoopFormatIOTest {
+  private static SerializableConfiguration serConf;
+
+  @Rule public final transient TestPipeline p = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @BeforeClass
+  public static void setUp() {
+    EmployeeOutputFormat.initWrittenOutput();
+    serConf = loadTestConfiguration(EmployeeOutputFormat.class, Text.class, 
Employee.class);
+  }
+
+  private static SerializableConfiguration loadTestConfiguration(
+      Class<?> outputFormatClassName, Class<?> keyClass, Class<?> valueClass) {
+    Configuration conf = new Configuration();
+    conf.setClass("mapreduce.job.outputformat.class", outputFormatClassName, 
OutputFormat.class);
+    conf.setClass("mapreduce.job.outputformat.key.class", keyClass, 
Object.class);
+    conf.setClass("mapreduce.job.outputformat.value.class", valueClass, 
Object.class);
+    return new SerializableConfiguration(conf);
+  }
+
+  @Test
+  public void testWriteBuildsCorrectly() {
+    HadoopFormatIO.Write<Text, Employee> write =
+        HadoopFormatIO.<Text, 
Employee>write().withConfiguration(serConf.get());
+
+    assertEquals(serConf.get(), write.getConfiguration().get());
+    assertEquals(EmployeeOutputFormat.class, 
write.getOutputFormatClass().getRawType());
+    assertEquals(Text.class, write.getOutputFormatKeyClass().getRawType());
+    assertEquals(Employee.class, 
write.getOutputFormatValueClass().getRawType());
+  }
+
+  /**
+   * This test validates {@link HadoopFormatIO.Write Write} transform object 
creation fails with
+   * null configuration. {@link 
HadoopFormatIO.Write#withConfiguration(Configuration)
+   * withConfiguration(Configuration)} method checks configuration is null and 
throws exception if
+   * it is null.
+   */
+  @Test
+  public void testWriteObjectCreationFailsIfConfigurationIsNull() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Configuration can not be null");
+    HadoopFormatIO.<Text, Employee>write().withConfiguration(null);
+  }
+
+  /**
+   * This test validates functionality of {@link
+   * HadoopFormatIO.Write#withConfiguration(Configuration) 
withConfiguration(Configuration)}
+   * function when Hadoop OutputFormat class is not provided by the user in 
configuration.
+   */
+  @Test
+  public void testWriteValidationFailsMissingOutputFormatInConf() {
+    Configuration configuration = new Configuration();
+    configuration.setClass(HadoopFormatIO.OUTPUTFORMAT_KEY_CLASS, Text.class, 
Object.class);
+    configuration.setClass(HadoopFormatIO.OUTPUTFORMAT_VALUE_CLASS, 
Employee.class, Object.class);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Configuration must contain 
\"mapreduce.job.outputformat.class\"");
+    HadoopFormatIO.<Text, Employee>write().withConfiguration(configuration);
+  }
+
+  /**
+   * This test validates functionality of {@link
+   * HadoopFormatIO.Write#withConfiguration(Configuration) 
withConfiguration(Configuration)}
+   * function when key class is not provided by the user in configuration.
+   */
+  @Test
+  public void testWriteValidationFailsMissingKeyClassInConf() {
+    Configuration configuration = new Configuration();
+    configuration.setClass(
+        HadoopFormatIO.OUTPUTFORMAT_CLASS, TextOutputFormat.class, 
OutputFormat.class);
+    configuration.setClass(HadoopFormatIO.OUTPUTFORMAT_VALUE_CLASS, 
Employee.class, Object.class);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Configuration must contain 
\"mapreduce.job.outputformat.key.class\"");
+    HadoopFormatIO.<Text, Employee>write().withConfiguration(configuration);
+  }
+
+  /**
+   * This test validates functionality of {@link
+   * HadoopFormatIO.Write#withConfiguration(Configuration) 
withConfiguration(Configuration)}
+   * function when value class is not provided by the user in configuration.
+   */
+  @Test
+  public void testWriteValidationFailsMissingValueClassInConf() {
+    Configuration configuration = new Configuration();
+    configuration.setClass(
+        HadoopFormatIO.OUTPUTFORMAT_CLASS, TextOutputFormat.class, 
OutputFormat.class);
+    configuration.setClass(HadoopFormatIO.OUTPUTFORMAT_KEY_CLASS, Text.class, 
Object.class);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Configuration must contain 
\"mapreduce.job.outputformat.value.class\"");
+    HadoopFormatIO.<Text, Employee>write().withConfiguration(configuration);
+  }
+
+  @Test
+  public void testWritingData() {
+    List<KV<Text, Employee>> data = TestEmployeeDataSet.getEmployeeData();
+    PCollection<KV<Text, Employee>> input = p.apply(Create.of(data));
+    input.apply("Write", HadoopFormatIO.<Text, 
Employee>write().withConfiguration(serConf.get()));
+    p.run();
+
+    List<KV<Text, Employee>> writtenOutput = 
EmployeeOutputFormat.getWrittenOutput();
+    assertEquals(data.size(), writtenOutput.size());
+    assertTrue(data.containsAll(writtenOutput));
+    assertTrue(writtenOutput.containsAll(data));
+  }
+
+  @Test
+  public void testWritingDataFailInvalidKeyType() {
+    List<KV<String, Employee>> data = new ArrayList<>();
+    data.add(KV.of("key", new Employee("name", "address")));
+    PCollection<KV<String, Employee>> input = p.apply(Create.of(data));
+    input.apply("Write", HadoopFormatIO.<String, 
Employee>write().withConfiguration(serConf.get()));
+    thrown.expect(Pipeline.PipelineExecutionException.class);
+    p.run();
+  }
+
+  @Test
+  public void testWritingDataFailInvalidValueType() {
+    List<KV<Text, Text>> data = new ArrayList<>();
+    data.add(KV.of(new Text("key"), new Text("value")));
+    PCollection<KV<Text, Text>> input = p.apply(Create.of(data));
+    input.apply("Write", HadoopFormatIO.<Text, 
Text>write().withConfiguration(serConf.get()));
+    thrown.expect(Pipeline.PipelineExecutionException.class);
+    p.run();
+  }
+
+  /**
+   * This test validates functionality of {@link
+   * HadoopFormatIO.Write#populateDisplayData(DisplayData.Builder)
+   * populateDisplayData(DisplayData.Builder)}.
+   */
+  @Test
+  public void testWriteDisplayData() {
+    HadoopFormatIO.Write<String, String> write =
+        HadoopFormatIO.<String, 
String>write().withConfiguration(serConf.get());
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(
+        displayData,
+        hasDisplayItem(
+            HadoopFormatIO.OUTPUTFORMAT_CLASS,
+            serConf.get().get(HadoopFormatIO.OUTPUTFORMAT_CLASS)));
+    assertThat(
+        displayData,
+        hasDisplayItem(
+            HadoopFormatIO.OUTPUTFORMAT_KEY_CLASS,
+            serConf.get().get(HadoopFormatIO.OUTPUTFORMAT_KEY_CLASS)));
+    assertThat(
+        displayData,
+        hasDisplayItem(
+            HadoopFormatIO.OUTPUTFORMAT_VALUE_CLASS,
+            serConf.get().get(HadoopFormatIO.OUTPUTFORMAT_VALUE_CLASS)));
+  }
+}
diff --git 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
index ff36d5dbe51b..20c78c1cff69 100644
--- 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
+++ 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
@@ -25,7 +25,7 @@
  * Test Utils used in {@link EmployeeInputFormat} and {@link 
ReuseObjectsEmployeeInputFormat} for
  * computing splits.
  */
-class TestEmployeeDataSet {
+public class TestEmployeeDataSet {
   public static final long NUMBER_OF_RECORDS_IN_EACH_SPLIT = 5L;
   public static final long NUMBER_OF_SPLITS = 3L;
   private static final List<KV<String, String>> data = new ArrayList<>();
diff --git 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
index ea3aeca7a13d..68aaaa8f9ef8 100644
--- 
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
+++ 
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
@@ -35,11 +35,18 @@
  * org.apache.hadoop.mapreduce.lib.db.DBInputFormat}.
  */
 @DefaultCoder(AvroCoder.class)
-class TestRowDBWritable extends TestRow implements DBWritable, Writable {
+public class TestRowDBWritable extends TestRow implements DBWritable, Writable 
{
 
   private Integer id;
   private String name;
 
+  public TestRowDBWritable() {}
+
+  public TestRowDBWritable(Integer id, String name) {
+    this.id = id;
+    this.name = name;
+  }
+
   @Override
   public Integer id() {
     return id;
diff --git a/sdks/java/javadoc/build.gradle b/sdks/java/javadoc/build.gradle
index 61565b0a5fe1..e80f2634bdb8 100644
--- a/sdks/java/javadoc/build.gradle
+++ b/sdks/java/javadoc/build.gradle
@@ -59,6 +59,7 @@ def exportedJavadocProjects = [
   ':beam-sdks-java-io-google-cloud-platform',
   ':beam-sdks-java-io-hadoop-common',
   ':beam-sdks-java-io-hadoop-file-system',
+  ':beam-sdks-java-io-hadoop-format',
   ':beam-sdks-java-io-hadoop-input-format',
   ':beam-sdks-java-io-hbase',
   ':beam-sdks-java-io-hcatalog',
diff --git a/settings.gradle b/settings.gradle
index 3ccdd4418c78..f7d2075857a6 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -130,6 +130,8 @@ include "beam-sdks-java-io-hadoop-file-system"
 project(":beam-sdks-java-io-hadoop-file-system").dir = 
file("sdks/java/io/hadoop-file-system")
 include "beam-sdks-java-io-hadoop-input-format"
 project(":beam-sdks-java-io-hadoop-input-format").dir = 
file("sdks/java/io/hadoop-input-format")
+include "beam-sdks-java-io-hadoop-format"
+project(":beam-sdks-java-io-hadoop-format").dir = 
file("sdks/java/io/hadoop-format")
 include "beam-sdks-java-io-hbase"
 project(":beam-sdks-java-io-hbase").dir = file("sdks/java/io/hbase")
 include "beam-sdks-java-io-hcatalog"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 174952)
    Time Spent: 10h 40m  (was: 10.5h)

> Add batching support for HadoopOutputFormatIO
> ---------------------------------------------
>
>                 Key: BEAM-3912
>                 URL: https://issues.apache.org/jira/browse/BEAM-3912
>             Project: Beam
>          Issue Type: Sub-task
>          Components: io-java-hadoop
>            Reporter: Alexey Romanenko
>            Assignee: Alexey Romanenko
>            Priority: Minor
>             Fix For: 2.10.0
>
>          Time Spent: 10h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to