Amar3tto commented on code in PR #22584: URL: https://github.com/apache/beam/pull/22584#discussion_r990157807
########## sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/CdapIO.java: ########## @@ -17,29 +17,165 @@ */ package org.apache.beam.sdk.io.cdap; -import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.io.cdap.MappingUtils.getOffsetFnForPluginClass; +import static org.apache.beam.sdk.io.cdap.MappingUtils.getPluginByClass; +import static org.apache.beam.sdk.io.cdap.MappingUtils.getReceiverBuilderByPluginClass; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import com.google.auto.value.AutoValue; import io.cdap.cdap.api.plugin.PluginConfig; +import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization; import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO; +import org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.OutputFormat; import org.checkerframework.checker.nullness.qual.Nullable; /** - * An unbounded/bounded sources and sinks from <a - * href="https://github.com/data-integrations">CDAP</a> plugins. + * A {@link CdapIO} is a Transform for reading data from source or writing data to sink of a Cdap + * Plugin. It uses {@link HadoopFormatIO} for Batch and SparkReceiverIO for Streaming. + * + * <h2>Read from Cdap Plugin Bounded Source</h2> + * + * <p>To configure {@link CdapIO} source, you must specify Cdap {@link Plugin}, Cdap {@link + * PluginConfig}, key and value classes. + * + * <p>{@link Plugin} is the Wrapper class for the Cdap Plugin. It contains main information about + * the Plugin. The object of the {@link Plugin} class can be created with the {@link + * Plugin#createBatch(Class, Class, Class)} method. Method requires the following parameters: + * + * <ul> + * <li>{@link io.cdap.cdap.etl.api.batch.BatchSource} class + * <li>{@link InputFormat} class + * <li>{@link io.cdap.cdap.api.data.batch.InputFormatProvider} class + * </ul> + * + * <p>For more information about the InputFormat and InputFormatProvider, see {@link + * HadoopFormatIO}. + * + * <p>Every Cdap Plugin has its {@link PluginConfig} class with necessary fields to configure the + * Plugin. You can set the {@link Map} of your parameters with the {@link + * ConfigWrapper#withParams(Map)} method where the key is the field name. + * + * <p>For example, to create a basic {@link CdapIO#read()} transform: + * + * <pre>{@code + * Pipeline p = ...; // Create pipeline. + * + * // Create PluginConfig for specific plugin + * EmployeeConfig pluginConfig = + * new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build(); + * + * // Read using CDAP batch plugin + * p.apply("ReadBatch", + * CdapIO.<String, String>read() + * .withCdapPlugin( + * Plugin.createBatch( + * EmployeeBatchSource.class, + * EmployeeInputFormat.class, + * EmployeeInputFormatProvider.class)) + * .withPluginConfig(pluginConfig) + * .withKeyClass(String.class) + * .withValueClass(String.class)); + * }</pre> + * + * <h2>Write to Cdap Plugin Bounded Sink</h2> + * + * <p>To configure {@link CdapIO} sink, just as {@link CdapIO#read()} Cdap {@link Plugin}, Cdap + * {@link PluginConfig}, key, value classes must be specified. In addition, it's necessary to + * determine locks directory path {@link CdapIO.Write#withLocksDirPath(String)}. It's used for + * {@link HDFSSynchronization} configuration for {@link HadoopFormatIO}. More info can be found in + * {@link HadoopFormatIO} documentation. + * + * <p>To create the object of the {@link Plugin} class with the {@link Plugin#createBatch(Class, + * Class, Class)} method, need to specify the following parameters: + * + * <ul> + * <li>{@link io.cdap.cdap.etl.api.batch.BatchSink} class + * <li>{@link OutputFormat} class + * <li>{@link io.cdap.cdap.api.data.batch.OutputFormatProvider} class + * </ul> + * + * <p>For more information about the OutputFormat and OutputFormatProvider, see {@link + * HadoopFormatIO}. + * + * <p>Example of {@link CdapIO#write()} usage: + * + * <pre>{@code + * Pipeline p = ...; // Create pipeline. + * + * // Get or create data to write + * PCollection<KV<String, String>> input = p.apply(Create.of(data)); + * + * // Create PluginConfig for specific plugin + * EmployeeConfig pluginConfig = + * new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build(); + * + * // Write using CDAP batch plugin + * input.apply( + * "WriteBatch", + * CdapIO.<String, String>write() + * .withCdapPlugin( + * Plugin.createBatch( + * EmployeeBatchSink.class, + * EmployeeOutputFormat.class, + * EmployeeOutputFormatProvider.class)) + * .withPluginConfig(pluginConfig) + * .withKeyClass(String.class) + * .withValueClass(String.class) + * .withLocksDirPath(tmpFolder.getRoot().getAbsolutePath())); + * p.run(); + * }</pre> + * + * <h2>Read from Cdap Plugin Streaming Source</h2> + * + * <p>To configure {@link CdapIO} source, you must specify Cdap {@link Plugin}, Cdap {@link + * PluginConfig}, key and value classes. + * + * <p>{@link Plugin} is the Wrapper class for the Cdap Plugin. It contains main information about + * the Plugin. The object of the {@link Plugin} class can be created with the {@link + * Plugin#createStreaming(Class)} method. Method requires {@link + * io.cdap.cdap.etl.api.streaming.StreamingSource} class parameter. + * + * <p>Every Cdap Plugin has its {@link PluginConfig} class with necessary fields to configure the + * Plugin. You can set the {@link Map} of your parameters with the {@link + * ConfigWrapper#withParams(Map)} method where the key is the field name. + * + * <p>For example, to create a basic {@link CdapIO#read()} transform: + * + * <pre>{@code + * Pipeline p = ...; // Create pipeline. + * + * // Create PluginConfig for specific plugin + * EmployeeConfig pluginConfig = + * new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build(); + * + * // Read using CDAP streaming plugin + * p.apply("ReadStreaming", + * CdapIO.<String, String>read() + * .withCdapPlugin(Plugin.createStreaming(EmployeeStreamingSource.class)) + * .withPluginConfig(pluginConfig) + * .withKeyClass(String.class) + * .withValueClass(String.class)); + * }</pre> */ @Experimental(Kind.SOURCE_SINK) +@SuppressWarnings("nullness") Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
