Amar3tto commented on code in PR #22584:
URL: https://github.com/apache/beam/pull/22584#discussion_r993325610


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/MappingUtils.java:
##########
@@ -33,23 +38,118 @@
 import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSource;
 import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormat;
 import io.cdap.plugin.zendesk.source.batch.ZendeskInputFormatProvider;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/** Util class for mapping plugins. */
 public class MappingUtils {
 
-  public static Plugin getPluginByClass(Class<?> pluginClass) {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MappingUtils.class);
+  private static final String HUBSPOT_ID_FIELD = "vid";
+  private static final Gson GSON = new Gson();
+
+  private static final Map<
+          Class<?>, Pair<SerializableFunction<?, Long>, ReceiverBuilder<?, ? 
extends Receiver<?>>>>
+      REGISTERED_PLUGINS;
+
+  static {
+    REGISTERED_PLUGINS = new HashMap<>();
+  }
+
+  /** Gets a {@link Plugin} by its class. */
+  static Plugin getPluginByClass(Class<?> pluginClass) {
     checkArgument(pluginClass != null, "Plugin class can not be null!");
     if (pluginClass.equals(SalesforceBatchSource.class)) {
-      return Plugin.create(
+      return Plugin.createBatch(
           pluginClass, SalesforceInputFormat.class, 
SalesforceInputFormatProvider.class);
     } else if (pluginClass.equals(HubspotBatchSource.class)) {
-      return Plugin.create(pluginClass, HubspotInputFormat.class, 
HubspotInputFormatProvider.class);
+      return Plugin.createBatch(
+          pluginClass, HubspotInputFormat.class, 
HubspotInputFormatProvider.class);
     } else if (pluginClass.equals(ZendeskBatchSource.class)) {
-      return Plugin.create(pluginClass, ZendeskInputFormat.class, 
ZendeskInputFormatProvider.class);
+      return Plugin.createBatch(
+          pluginClass, ZendeskInputFormat.class, 
ZendeskInputFormatProvider.class);
     } else if (pluginClass.equals(HubspotBatchSink.class)) {
-      return Plugin.create(pluginClass, HubspotOutputFormat.class, 
SourceInputFormatProvider.class);
+      return Plugin.createBatch(
+          pluginClass, HubspotOutputFormat.class, 
SourceInputFormatProvider.class);
     } else if (pluginClass.equals(ServiceNowSource.class)) {
-      return Plugin.create(
+      return Plugin.createBatch(
           pluginClass, ServiceNowInputFormat.class, 
SourceInputFormatProvider.class);
+    } else if (pluginClass.equals(HubspotStreamingSource.class)) {
+      return Plugin.createStreaming(pluginClass);
+    }
+    throw new UnsupportedOperationException(

Review Comment:
   Yes, we already add such ability. There are two methods in `CdapIO`: 
   
   - `withCdapPluginClass()` - it will try to get `Plugin` by class from 
predefined list in `MappingUtils`, as we can see here.
   - `withCdapPlugin()` - we can pass manually `Plugin.create(...)` with all 
needed parameters, it is used for example in tests. (without using 
`MappingUtils` class)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to