Amar3tto commented on code in PR #22584:
URL: https://github.com/apache/beam/pull/22584#discussion_r990157807


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java:
##########
@@ -17,29 +17,165 @@
  */
 package org.apache.beam.sdk.io.cdap;
 
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static 
org.apache.beam.sdk.io.cdap.MappingUtils.getOffsetFnForPluginClass;
+import static org.apache.beam.sdk.io.cdap.MappingUtils.getPluginByClass;
+import static 
org.apache.beam.sdk.io.cdap.MappingUtils.getReceiverBuilderByPluginClass;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.value.AutoValue;
 import io.cdap.cdap.api.plugin.PluginConfig;
+import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
 import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
+import org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
- * An unbounded/bounded sources and sinks from <a
- * href="https://github.com/data-integrations";>CDAP</a> plugins.
+ * A {@link CdapIO} is a Transform for reading data from source or writing 
data to sink of a Cdap
+ * Plugin. It uses {@link HadoopFormatIO} for Batch and SparkReceiverIO for 
Streaming.
+ *
+ * <h2>Read from Cdap Plugin Bounded Source</h2>
+ *
+ * <p>To configure {@link CdapIO} source, you must specify Cdap {@link 
Plugin}, Cdap {@link
+ * PluginConfig}, key and value classes.
+ *
+ * <p>{@link Plugin} is the Wrapper class for the Cdap Plugin. It contains 
main information about
+ * the Plugin. The object of the {@link Plugin} class can be created with the 
{@link
+ * Plugin#createBatch(Class, Class, Class)} method. Method requires the 
following parameters:
+ *
+ * <ul>
+ *   <li>{@link io.cdap.cdap.etl.api.batch.BatchSource} class
+ *   <li>{@link InputFormat} class
+ *   <li>{@link io.cdap.cdap.api.data.batch.InputFormatProvider} class
+ * </ul>
+ *
+ * <p>For more information about the InputFormat and InputFormatProvider, see 
{@link
+ * HadoopFormatIO}.
+ *
+ * <p>Every Cdap Plugin has its {@link PluginConfig} class with necessary 
fields to configure the
+ * Plugin. You can set the {@link Map} of your parameters with the {@link
+ * ConfigWrapper#withParams(Map)} method where the key is the field name.
+ *
+ * <p>For example, to create a basic {@link CdapIO#read()} transform:
+ *
+ * <pre>{@code
+ * Pipeline p = ...; // Create pipeline.
+ *
+ * // Create PluginConfig for specific plugin
+ * EmployeeConfig pluginConfig =
+ *         new 
ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
+ *
+ * // Read using CDAP batch plugin
+ * p.apply("ReadBatch",
+ * CdapIO.<String, String>read()
+ *             .withCdapPlugin(
+ *                 Plugin.createBatch(
+ *                     EmployeeBatchSource.class,
+ *                     EmployeeInputFormat.class,
+ *                     EmployeeInputFormatProvider.class))
+ *             .withPluginConfig(pluginConfig)
+ *             .withKeyClass(String.class)
+ *             .withValueClass(String.class));
+ * }</pre>
+ *
+ * <h2>Write to Cdap Plugin Bounded Sink</h2>
+ *
+ * <p>To configure {@link CdapIO} sink, just as {@link CdapIO#read()} Cdap 
{@link Plugin}, Cdap
+ * {@link PluginConfig}, key, value classes must be specified. In addition, 
it's necessary to
+ * determine locks directory path {@link 
CdapIO.Write#withLocksDirPath(String)}. It's used for
+ * {@link HDFSSynchronization} configuration for {@link HadoopFormatIO}. More 
info can be found in
+ * {@link HadoopFormatIO} documentation.
+ *
+ * <p>To create the object of the {@link Plugin} class with the {@link 
Plugin#createBatch(Class,
+ * Class, Class)} method, need to specify the following parameters:
+ *
+ * <ul>
+ *   <li>{@link io.cdap.cdap.etl.api.batch.BatchSink} class
+ *   <li>{@link OutputFormat} class
+ *   <li>{@link io.cdap.cdap.api.data.batch.OutputFormatProvider} class
+ * </ul>
+ *
+ * <p>For more information about the OutputFormat and OutputFormatProvider, 
see {@link
+ * HadoopFormatIO}.
+ *
+ * <p>Example of {@link CdapIO#write()} usage:
+ *
+ * <pre>{@code
+ * Pipeline p = ...; // Create pipeline.
+ *
+ * // Get or create data to write
+ * PCollection<KV<String, String>> input = p.apply(Create.of(data));
+ *
+ * // Create PluginConfig for specific plugin
+ * EmployeeConfig pluginConfig =
+ *         new 
ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
+ *
+ * // Write using CDAP batch plugin
+ * input.apply(
+ *         "WriteBatch",
+ *         CdapIO.<String, String>write()
+ *             .withCdapPlugin(
+ *                 Plugin.createBatch(
+ *                     EmployeeBatchSink.class,
+ *                     EmployeeOutputFormat.class,
+ *                     EmployeeOutputFormatProvider.class))
+ *             .withPluginConfig(pluginConfig)
+ *             .withKeyClass(String.class)
+ *             .withValueClass(String.class)
+ *             .withLocksDirPath(tmpFolder.getRoot().getAbsolutePath()));
+ *     p.run();
+ * }</pre>
+ *
+ * <h2>Read from Cdap Plugin Streaming Source</h2>
+ *
+ * <p>To configure {@link CdapIO} source, you must specify Cdap {@link 
Plugin}, Cdap {@link
+ * PluginConfig}, key and value classes.
+ *
+ * <p>{@link Plugin} is the Wrapper class for the Cdap Plugin. It contains 
main information about
+ * the Plugin. The object of the {@link Plugin} class can be created with the 
{@link
+ * Plugin#createStreaming(Class)} method. Method requires {@link
+ * io.cdap.cdap.etl.api.streaming.StreamingSource} class parameter.
+ *
+ * <p>Every Cdap Plugin has its {@link PluginConfig} class with necessary 
fields to configure the
+ * Plugin. You can set the {@link Map} of your parameters with the {@link
+ * ConfigWrapper#withParams(Map)} method where the key is the field name.
+ *
+ * <p>For example, to create a basic {@link CdapIO#read()} transform:
+ *
+ * <pre>{@code
+ * Pipeline p = ...; // Create pipeline.
+ *
+ * // Create PluginConfig for specific plugin
+ * EmployeeConfig pluginConfig =
+ *         new 
ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
+ *
+ * // Read using CDAP streaming plugin
+ * p.apply("ReadStreaming",
+ * CdapIO.<String, String>read()
+ *             
.withCdapPlugin(Plugin.createStreaming(EmployeeStreamingSource.class))
+ *             .withPluginConfig(pluginConfig)
+ *             .withKeyClass(String.class)
+ *             .withValueClass(String.class));
+ * }</pre>
  */
 @Experimental(Kind.SOURCE_SINK)
+@SuppressWarnings("nullness")

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to