[ 
https://issues.apache.org/jira/browse/BEAM-5309?focusedWorklogId=154253&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154253
 ]

ASF GitHub Bot logged work on BEAM-5309:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Oct/18 11:01
            Start Date: 15/Oct/18 11:01
    Worklog Time Spent: 10m 
      Work Description: dmvk commented on a change in pull request #6691: 
WIP:[BEAM-5309] Add streaming support for HadoopFormatIO
URL: https://github.com/apache/beam/pull/6691#discussion_r225123003
 
 

 ##########
 File path: 
sdks/java/io/hadoop-output-format/src/main/java/org/apache/beam/sdk/io/hadoop/outputformat/HadoopOutputFormatIO.java
 ##########
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.beam.sdk.io.hadoop.outputformat;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link HadoopOutputFormatIO} is a Transform for writing data to any sink 
which implements
+ * Hadoop {@link OutputFormat}. For example - Cassandra, Elasticsearch, HBase, 
Redis, Postgres etc.
+ * {@link HadoopOutputFormatIO} has to make several performance trade-offs in 
connecting to {@link
+ * OutputFormat}, so if there is another Beam IO Transform specifically for 
connecting to your data
+ * sink of choice, we would recommend using that one, but this IO Transform 
allows you to connect to
+ * many data sinks that do not yet have a Beam IO Transform.
+ *
+ * <p>You will need to pass a Hadoop {@link Configuration} with parameters 
specifying how the write
+ * will occur. Many properties of the Configuration are optional, and some are 
required for certain
+ * {@link OutputFormat} classes, but the following properties must be set for 
all OutputFormats:
+ *
+ * <ul>
+ *   <li>{@code mapreduce.job.outputformat.class}: The {@link OutputFormat} 
class used to connect to
+ *       your data sink of choice.
+ *   <li>{@code mapreduce.job.outputformat.key.class}: The key class passed to 
the {@link
+ *       OutputFormat} in {@code mapreduce.job.outputformat.class}.
+ *   <li>{@code mapreduce.job.outputformat.value.class}: The value class 
passed to the {@link
+ *       OutputFormat} in {@code mapreduce.job.outputformat.class}.
+ * </ul>
+ *
+ * For example:
+ *
+ * <pre>{@code
+ * Configuration myHadoopConfiguration = new Configuration(false);
+ * // Set Hadoop OutputFormat, key and value class in configuration
+ * myHadoopConfiguration.setClass(&quot;mapreduce.job.outputformat.class&quot;,
+ *    MyDbOutputFormatClass, OutputFormat.class);
+ * 
myHadoopConfiguration.setClass(&quot;mapreduce.job.outputformat.key.class&quot;,
+ *    MyDbOutputFormatKeyClass, Object.class);
+ * 
myHadoopConfiguration.setClass(&quot;mapreduce.job.outputformat.value.class&quot;,
+ *    MyDbOutputFormatValueClass, Object.class);
+ * }</pre>
+ *
+ * <p>You will need to set appropriate OutputFormat key and value class (i.e.
+ * "mapreduce.job.outputformat.key.class" and 
"mapreduce.job.outputformat.value.class") in Hadoop
+ * {@link Configuration}. If you set different OutputFormat key or value class 
than OutputFormat's
+ * actual key or value class then, it may result in an error like "unexpected 
extra bytes after
+ * decoding" while the decoding process of key/value object happens. Hence, it 
is important to set
+ * appropriate OutputFormat key and value class.
+ *
+ * <h3>Writing using {@link HadoopOutputFormatIO}</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...; // Create pipeline.
+ * // Read data only with Hadoop configuration.
+ * p.apply("read",
+ *     HadoopOutputFormatIO.<OutputFormatKeyClass, OutputFormatKeyClass>write()
+ *              .withConfiguration(myHadoopConfiguration);
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class HadoopOutputFormatIO {
 
 Review comment:
   obsolete?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 154253)
    Time Spent: 0.5h  (was: 20m)

> Add streaming support for HadoopOutputFormatIO
> ----------------------------------------------
>
>                 Key: BEAM-5309
>                 URL: https://issues.apache.org/jira/browse/BEAM-5309
>             Project: Beam
>          Issue Type: Sub-task
>          Components: io-java-hadoop
>            Reporter: Alexey Romanenko
>            Assignee: David Hrbacek
>            Priority: Minor
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> design doc: https://s.apache.org/beam-streaming-hofio



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to