chamikaramj commented on a change in pull request #17051:
URL: https://github.com/apache/beam/pull/17051#discussion_r831240517



##########
File path: sdks/java/io/cdap/build.gradle
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+    id 'java'
+    id 'org.apache.beam.module'
+}
+
+applyJavaNature(
+        exportJavadoc: false,
+        automaticModuleName: 'org.apache.beam.sdk.io.cdap',
+)
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: CDAP :: Java"
+ext.summary = """Apache Beam SDK provides a simple, Java-based
+interface for integration with CDAP plugins."""
+
+/** Define the list of runners which execute a precommit test.
+ * Some runners are run from separate projects, see the preCommit task below
+ * for details.
+ */
+
+allprojects {
+    repositories {
+        maven { url 'https://jitpack.io' }

Review comment:
       Why was this needed ?

##########
File path: 
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/ConfigWrapper.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Class for building {@link PluginConfig} object of the specific class 
{@param <T>}. */
+public class ConfigWrapper<T extends PluginConfig> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConfigWrapper.class);
+
+  @Nullable private Map<String, Object> paramsMap = null;
+  private final Class<T> configClass;
+
+  public ConfigWrapper(Class<T> configClass) {
+    this.configClass = configClass;
+  }
+
+  public ConfigWrapper<T> fromJsonString(String jsonString) {
+    TypeReference<HashMap<String, Object>> typeRef =
+        new TypeReference<HashMap<String, Object>>() {};
+    try {
+      paramsMap = new ObjectMapper().readValue(jsonString, typeRef);
+    } catch (IOException e) {
+      LOG.error("Can not read json string to params map", e);

Review comment:
       Let's re-raise this so that the caller get's a hard notification ?

##########
File path: sdks/java/io/cdap/build.gradle
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+    id 'java'
+    id 'org.apache.beam.module'
+}
+
+applyJavaNature(
+        exportJavadoc: false,
+        automaticModuleName: 'org.apache.beam.sdk.io.cdap',
+)
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: CDAP :: Java"
+ext.summary = """Apache Beam SDK provides a simple, Java-based
+interface for integration with CDAP plugins."""
+
+/** Define the list of runners which execute a precommit test.
+ * Some runners are run from separate projects, see the preCommit task below
+ * for details.
+ */
+
+allprojects {
+    repositories {
+        maven { url 'https://jitpack.io' }
+    }
+}
+
+dependencies {
+    implementation project(path: ":sdks:java:core", configuration: "shadow")

Review comment:
       We should define version numbers in top level config [1] and just use 
the dependency identifiers here.
   
   [1] 
https://github.com/apache/beam/blob/f7de08eb86b77794195e74f90763703ce7cb88d7/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L448

##########
File path: 
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/ConfigWrapper.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Class for building {@link PluginConfig} object of the specific class 
{@param <T>}. */
+public class ConfigWrapper<T extends PluginConfig> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConfigWrapper.class);
+
+  @Nullable private Map<String, Object> paramsMap = null;
+  private final Class<T> configClass;
+
+  public ConfigWrapper(Class<T> configClass) {
+    this.configClass = configClass;
+  }
+
+  public ConfigWrapper<T> fromJsonString(String jsonString) {
+    TypeReference<HashMap<String, Object>> typeRef =
+        new TypeReference<HashMap<String, Object>>() {};
+    try {
+      paramsMap = new ObjectMapper().readValue(jsonString, typeRef);
+    } catch (IOException e) {
+      LOG.error("Can not read json string to params map", e);
+    }
+    return this;
+  }
+
+  public ConfigWrapper<T> fromJsonFile(File jsonFile) {
+    TypeReference<HashMap<String, Object>> typeRef =
+        new TypeReference<HashMap<String, Object>>() {};
+    try {
+      paramsMap = new ObjectMapper().readValue(jsonFile, typeRef);
+    } catch (IOException e) {
+      LOG.error("Can not read json file to params map", e);

Review comment:
       Ditto.

##########
File path: 
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for getting any filled {@link io.cdap.cdap.api.plugin.PluginConfig} 
configuration object.
+ */
+@SuppressWarnings({"unchecked", "assignment.type.incompatible"})
+public class PluginConfigInstantiationUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
+
+  /**
+   * @param params map of config fields, where key is the name of the field, 
value must be String or

Review comment:
       Please add a description.

##########
File path: 
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for getting any filled {@link io.cdap.cdap.api.plugin.PluginConfig} 
configuration object.
+ */
+@SuppressWarnings({"unchecked", "assignment.type.incompatible"})
+public class PluginConfigInstantiationUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
+
+  /**
+   * @param params map of config fields, where key is the name of the field, 
value must be String or
+   *     boxed primitive
+   * @return Config object for given map of arguments and configuration class
+   */
+  public static @Nullable <T extends PluginConfig> T getPluginConfig(
+      Map<String, Object> params, Class<T> configClass) {
+    // Validate configClass
+    if (configClass == null || configClass.isPrimitive() || 
configClass.isArray()) {
+      throw new IllegalArgumentException("Config class must be correct!");
+    }
+    List<Field> allFields = new ArrayList<>();
+    Class<?> currClass = configClass;
+    while (currClass != null && !currClass.equals(Object.class)) {
+      allFields.addAll(
+          Arrays.stream(currClass.getDeclaredFields())
+              .filter(
+                  f -> !Modifier.isStatic(f.getModifiers()) && 
f.isAnnotationPresent(Name.class))
+              .collect(Collectors.toList()));
+      currClass = currClass.getSuperclass();
+    }
+    T config = getEmptyObjectOf(configClass);
+
+    if (config != null) {
+      for (Field field : allFields) {
+        field.setAccessible(true);
+
+        Class<?> fieldType = field.getType();
+
+        Name declaredAnnotation = field.getDeclaredAnnotation(Name.class);
+        Object fieldValue =
+            declaredAnnotation != null ? 
params.get(declaredAnnotation.value()) : null;
+
+        if (fieldValue != null && fieldType.equals(fieldValue.getClass())) {
+          try {
+            field.set(config, fieldValue);
+          } catch (IllegalAccessException e) {
+            LOG.error("Can not set a field", e);
+          }
+        }
+      }
+    }
+    return config;
+  }
+
+  /** @return empty {@link Object} of {@param tClass} */

Review comment:
       Probably should be renamed to "getEmptyObjectFromDefaultValues".

##########
File path: 
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for getting any filled {@link io.cdap.cdap.api.plugin.PluginConfig} 
configuration object.
+ */
+@SuppressWarnings({"unchecked", "assignment.type.incompatible"})
+public class PluginConfigInstantiationUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
+
+  /**
+   * @param params map of config fields, where key is the name of the field, 
value must be String or
+   *     boxed primitive
+   * @return Config object for given map of arguments and configuration class
+   */
+  public static @Nullable <T extends PluginConfig> T getPluginConfig(
+      Map<String, Object> params, Class<T> configClass) {
+    // Validate configClass
+    if (configClass == null || configClass.isPrimitive() || 
configClass.isArray()) {
+      throw new IllegalArgumentException("Config class must be correct!");
+    }
+    List<Field> allFields = new ArrayList<>();
+    Class<?> currClass = configClass;
+    while (currClass != null && !currClass.equals(Object.class)) {
+      allFields.addAll(
+          Arrays.stream(currClass.getDeclaredFields())
+              .filter(
+                  f -> !Modifier.isStatic(f.getModifiers()) && 
f.isAnnotationPresent(Name.class))
+              .collect(Collectors.toList()));
+      currClass = currClass.getSuperclass();
+    }
+    T config = getEmptyObjectOf(configClass);
+
+    if (config != null) {
+      for (Field field : allFields) {
+        field.setAccessible(true);
+
+        Class<?> fieldType = field.getType();
+
+        Name declaredAnnotation = field.getDeclaredAnnotation(Name.class);
+        Object fieldValue =
+            declaredAnnotation != null ? 
params.get(declaredAnnotation.value()) : null;
+
+        if (fieldValue != null && fieldType.equals(fieldValue.getClass())) {
+          try {
+            field.set(config, fieldValue);
+          } catch (IllegalAccessException e) {
+            LOG.error("Can not set a field", e);
+          }
+        }
+      }
+    }
+    return config;
+  }
+
+  /** @return empty {@link Object} of {@param tClass} */
+  private static @Nullable <T> T getEmptyObjectOf(Class<T> tClass) {
+    for (Constructor<?> constructor : tClass.getDeclaredConstructors()) {
+      constructor.setAccessible(true);
+      Class<?>[] parameterTypes = constructor.getParameterTypes();
+      Object[] parameters = new Object[parameterTypes.length];
+      for (int i = 0; i < parameterTypes.length; i++) {
+        parameters[i] = getDefaultValue(parameterTypes[i]);
+      }
+      try {
+        return (T) constructor.newInstance(parameters);
+      } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException e) {
+        LOG.error("Can not instantiate an empty object", e);
+      }
+    }
+    return null;
+  }
+
+  /** @return default value for given {@param tClass} */
+  private static @Nullable Object getDefaultValue(@Nullable Class<?> tClass) {

Review comment:
       Seems like this is unused ?

##########
File path: 
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for getting any filled {@link io.cdap.cdap.api.plugin.PluginConfig} 
configuration object.
+ */
+@SuppressWarnings({"unchecked", "assignment.type.incompatible"})
+public class PluginConfigInstantiationUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
+
+  /**
+   * @param params map of config fields, where key is the name of the field, 
value must be String or
+   *     boxed primitive
+   * @return Config object for given map of arguments and configuration class
+   */
+  public static @Nullable <T extends PluginConfig> T getPluginConfig(
+      Map<String, Object> params, Class<T> configClass) {
+    // Validate configClass
+    if (configClass == null || configClass.isPrimitive() || 
configClass.isArray()) {
+      throw new IllegalArgumentException("Config class must be correct!");
+    }
+    List<Field> allFields = new ArrayList<>();
+    Class<?> currClass = configClass;
+    while (currClass != null && !currClass.equals(Object.class)) {
+      allFields.addAll(
+          Arrays.stream(currClass.getDeclaredFields())
+              .filter(
+                  f -> !Modifier.isStatic(f.getModifiers()) && 
f.isAnnotationPresent(Name.class))
+              .collect(Collectors.toList()));
+      currClass = currClass.getSuperclass();
+    }
+    T config = getEmptyObjectOf(configClass);
+
+    if (config != null) {
+      for (Field field : allFields) {
+        field.setAccessible(true);
+
+        Class<?> fieldType = field.getType();
+
+        Name declaredAnnotation = field.getDeclaredAnnotation(Name.class);
+        Object fieldValue =
+            declaredAnnotation != null ? 
params.get(declaredAnnotation.value()) : null;
+
+        if (fieldValue != null && fieldType.equals(fieldValue.getClass())) {
+          try {
+            field.set(config, fieldValue);
+          } catch (IllegalAccessException e) {
+            LOG.error("Can not set a field", e);
+          }
+        }
+      }
+    }
+    return config;
+  }
+
+  /** @return empty {@link Object} of {@param tClass} */
+  private static @Nullable <T> T getEmptyObjectOf(Class<T> tClass) {
+    for (Constructor<?> constructor : tClass.getDeclaredConstructors()) {
+      constructor.setAccessible(true);

Review comment:
       Do we expect to use non-public constructors ?

##########
File path: 
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for getting any filled {@link io.cdap.cdap.api.plugin.PluginConfig} 
configuration object.
+ */
+@SuppressWarnings({"unchecked", "assignment.type.incompatible"})
+public class PluginConfigInstantiationUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
+
+  /**
+   * @param params map of config fields, where key is the name of the field, 
value must be String or
+   *     boxed primitive
+   * @return Config object for given map of arguments and configuration class
+   */
+  public static @Nullable <T extends PluginConfig> T getPluginConfig(
+      Map<String, Object> params, Class<T> configClass) {
+    // Validate configClass
+    if (configClass == null || configClass.isPrimitive() || 
configClass.isArray()) {
+      throw new IllegalArgumentException("Config class must be correct!");

Review comment:
       Be more specific in the error ?

##########
File path: 
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for getting any filled {@link io.cdap.cdap.api.plugin.PluginConfig} 
configuration object.
+ */
+@SuppressWarnings({"unchecked", "assignment.type.incompatible"})
+public class PluginConfigInstantiationUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
+
+  /**
+   * @param params map of config fields, where key is the name of the field, 
value must be String or
+   *     boxed primitive
+   * @return Config object for given map of arguments and configuration class
+   */
+  public static @Nullable <T extends PluginConfig> T getPluginConfig(
+      Map<String, Object> params, Class<T> configClass) {
+    // Validate configClass
+    if (configClass == null || configClass.isPrimitive() || 
configClass.isArray()) {
+      throw new IllegalArgumentException("Config class must be correct!");
+    }
+    List<Field> allFields = new ArrayList<>();
+    Class<?> currClass = configClass;
+    while (currClass != null && !currClass.equals(Object.class)) {
+      allFields.addAll(
+          Arrays.stream(currClass.getDeclaredFields())
+              .filter(
+                  f -> !Modifier.isStatic(f.getModifiers()) && 
f.isAnnotationPresent(Name.class))
+              .collect(Collectors.toList()));
+      currClass = currClass.getSuperclass();
+    }
+    T config = getEmptyObjectOf(configClass);
+
+    if (config != null) {
+      for (Field field : allFields) {
+        field.setAccessible(true);
+
+        Class<?> fieldType = field.getType();
+
+        Name declaredAnnotation = field.getDeclaredAnnotation(Name.class);
+        Object fieldValue =
+            declaredAnnotation != null ? 
params.get(declaredAnnotation.value()) : null;
+
+        if (fieldValue != null && fieldType.equals(fieldValue.getClass())) {
+          try {
+            field.set(config, fieldValue);
+          } catch (IllegalAccessException e) {
+            LOG.error("Can not set a field", e);
+          }
+        }
+      }
+    }
+    return config;
+  }
+
+  /** @return empty {@link Object} of {@param tClass} */
+  private static @Nullable <T> T getEmptyObjectOf(Class<T> tClass) {
+    for (Constructor<?> constructor : tClass.getDeclaredConstructors()) {
+      constructor.setAccessible(true);
+      Class<?>[] parameterTypes = constructor.getParameterTypes();
+      Object[] parameters = new Object[parameterTypes.length];
+      for (int i = 0; i < parameterTypes.length; i++) {
+        parameters[i] = getDefaultValue(parameterTypes[i]);
+      }
+      try {
+        return (T) constructor.newInstance(parameters);
+      } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException e) {
+        LOG.error("Can not instantiate an empty object", e);
+      }
+    }
+    return null;
+  }
+
+  /** @return default value for given {@param tClass} */

Review comment:
       And mention that we return null for unknown types.

##########
File path: 
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for getting any filled {@link io.cdap.cdap.api.plugin.PluginConfig} 
configuration object.
+ */
+@SuppressWarnings({"unchecked", "assignment.type.incompatible"})
+public class PluginConfigInstantiationUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
+
+  /**
+   * @param params map of config fields, where key is the name of the field, 
value must be String or
+   *     boxed primitive
+   * @return Config object for given map of arguments and configuration class
+   */
+  public static @Nullable <T extends PluginConfig> T getPluginConfig(

Review comment:
       Can these just be package private ?

##########
File path: 
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for getting any filled {@link io.cdap.cdap.api.plugin.PluginConfig} 
configuration object.
+ */
+@SuppressWarnings({"unchecked", "assignment.type.incompatible"})
+public class PluginConfigInstantiationUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
+
+  /**
+   * @param params map of config fields, where key is the name of the field, 
value must be String or
+   *     boxed primitive
+   * @return Config object for given map of arguments and configuration class
+   */
+  public static @Nullable <T extends PluginConfig> T getPluginConfig(
+      Map<String, Object> params, Class<T> configClass) {
+    // Validate configClass
+    if (configClass == null || configClass.isPrimitive() || 
configClass.isArray()) {
+      throw new IllegalArgumentException("Config class must be correct!");
+    }
+    List<Field> allFields = new ArrayList<>();
+    Class<?> currClass = configClass;
+    while (currClass != null && !currClass.equals(Object.class)) {
+      allFields.addAll(
+          Arrays.stream(currClass.getDeclaredFields())
+              .filter(
+                  f -> !Modifier.isStatic(f.getModifiers()) && 
f.isAnnotationPresent(Name.class))
+              .collect(Collectors.toList()));
+      currClass = currClass.getSuperclass();
+    }
+    T config = getEmptyObjectOf(configClass);
+
+    if (config != null) {
+      for (Field field : allFields) {
+        field.setAccessible(true);
+
+        Class<?> fieldType = field.getType();
+
+        Name declaredAnnotation = field.getDeclaredAnnotation(Name.class);
+        Object fieldValue =
+            declaredAnnotation != null ? 
params.get(declaredAnnotation.value()) : null;
+
+        if (fieldValue != null && fieldType.equals(fieldValue.getClass())) {
+          try {
+            field.set(config, fieldValue);
+          } catch (IllegalAccessException e) {
+            LOG.error("Can not set a field", e);
+          }
+        }
+      }
+    }
+    return config;
+  }
+
+  /** @return empty {@link Object} of {@param tClass} */
+  private static @Nullable <T> T getEmptyObjectOf(Class<T> tClass) {
+    for (Constructor<?> constructor : tClass.getDeclaredConstructors()) {

Review comment:
       How do we know which constructor to use ? Seems like we are always use 
the first one currently.

##########
File path: 
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for getting any filled {@link io.cdap.cdap.api.plugin.PluginConfig} 
configuration object.
+ */
+@SuppressWarnings({"unchecked", "assignment.type.incompatible"})
+public class PluginConfigInstantiationUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
+
+  /**
+   * @param params map of config fields, where key is the name of the field, 
value must be String or
+   *     boxed primitive
+   * @return Config object for given map of arguments and configuration class
+   */
+  public static @Nullable <T extends PluginConfig> T getPluginConfig(
+      Map<String, Object> params, Class<T> configClass) {
+    // Validate configClass
+    if (configClass == null || configClass.isPrimitive() || 
configClass.isArray()) {
+      throw new IllegalArgumentException("Config class must be correct!");
+    }
+    List<Field> allFields = new ArrayList<>();
+    Class<?> currClass = configClass;
+    while (currClass != null && !currClass.equals(Object.class)) {
+      allFields.addAll(
+          Arrays.stream(currClass.getDeclaredFields())
+              .filter(
+                  f -> !Modifier.isStatic(f.getModifiers()) && 
f.isAnnotationPresent(Name.class))
+              .collect(Collectors.toList()));
+      currClass = currClass.getSuperclass();
+    }
+    T config = getEmptyObjectOf(configClass);
+
+    if (config != null) {
+      for (Field field : allFields) {
+        field.setAccessible(true);
+
+        Class<?> fieldType = field.getType();
+
+        Name declaredAnnotation = field.getDeclaredAnnotation(Name.class);
+        Object fieldValue =
+            declaredAnnotation != null ? 
params.get(declaredAnnotation.value()) : null;
+
+        if (fieldValue != null && fieldType.equals(fieldValue.getClass())) {
+          try {
+            field.set(config, fieldValue);
+          } catch (IllegalAccessException e) {
+            LOG.error("Can not set a field", e);

Review comment:
       Include the value of the field ?

##########
File path: 
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for getting any filled {@link io.cdap.cdap.api.plugin.PluginConfig} 
configuration object.
+ */
+@SuppressWarnings({"unchecked", "assignment.type.incompatible"})
+public class PluginConfigInstantiationUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
+
+  /**
+   * @param params map of config fields, where key is the name of the field, 
value must be String or
+   *     boxed primitive
+   * @return Config object for given map of arguments and configuration class
+   */
+  public static @Nullable <T extends PluginConfig> T getPluginConfig(
+      Map<String, Object> params, Class<T> configClass) {

Review comment:
       Please make sure that any non-private utils are tested.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to