nsivabalan commented on code in PR #8514:
URL: https://github.com/apache/hudi/pull/8514#discussion_r1183180023


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -276,7 +276,17 @@ public static class Config implements Serializable {
             + ". Allows transforming raw source Dataset to a target Dataset 
(conforming to target schema) before "
             + "writing. Default : Not set. E:g - 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
             + "allows a SQL query templated to be passed as a transformation 
function). "
-            + "Pass a comma-separated list of subclass names to chain the 
transformations.")
+            + "Pass a comma-separated list of subclass names to chain the 
transformations. Transformer can also include "
+            + "an identifier. E:g - 
tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer. Here the 
identifier tr1 "
+            + "can be used along with property key like 
`hoodie.deltastreamer.transformer.sql.tr1` to identify properties related "
+            + "to the transformer. So effective value for 
`hoodie.deltastreamer.transformer.sql` is determined by key "
+            + "`hoodie.deltastreamer.transformer.sql.tr1` for this 
transformer. This is useful when there are two or more "
+            + "transformers using the same config keys and expect different 
values for those keys. If identifier is used, it should "
+            + "be specified for all the transformers. Further the order in 
which transformer is applied is determined by the occurrence "
+            + "of transformer irrespective of the identifier used for the 
transformer. For example: In the configured value below "
+            + 
"tr2:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer,tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer
 "
+            + ", tr2 is applied before tr1 based on order of occurrence."
+    )

Review Comment:
   can we call out that this identifier format is not strictly required unless 
users have a requirement to have multiple transformers of the same type. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -19,36 +19,137 @@
 package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
  * A {@link Transformer} to chain other {@link Transformer}s and apply 
sequentially.
  */
 public class ChainedTransformer implements Transformer {
 
-  private List<Transformer> transformers;
+  // Delimiter used to separate class name and the property key suffix. The 
suffix comes first.
+  private static final String TRANSFORMER_CLASS_NAME_ID_DELIMITER = ":";
 
-  public ChainedTransformer(List<Transformer> transformers) {
-    this.transformers = transformers;
+  private final List<TransformerInfo> transformers;
+
+  public ChainedTransformer(List<Transformer> transformersList) {
+    this.transformers = new ArrayList<>(transformersList.size());
+    for (Transformer transformer : transformersList) {
+      this.transformers.add(new TransformerInfo(transformer));
+    }
+  }
+
+  /**
+   * Creates a chained transformer using the input transformer class names. 
Refer {@link HoodieDeltaStreamer.Config#transformerClassNames}
+   * for more information on how the transformers can be configured.
+   *
+   * @param configuredTransformers List of configured transformer class names.
+   * @param ignore Added for avoiding two methods with same erasure. Ignored.
+   */
+  public ChainedTransformer(List<String> configuredTransformers, int... 
ignore) {
+    this.transformers = new ArrayList<>(configuredTransformers.size());
+
+    Set<String> identifiers = new HashSet<>();
+    for (String configuredTransformer : configuredTransformers) {
+      if 
(!configuredTransformer.contains(TRANSFORMER_CLASS_NAME_ID_DELIMITER)) {
+        transformers.add(new 
TransformerInfo(ReflectionUtils.loadClass(configuredTransformer)));
+      } else {
+        String[] splits = 
configuredTransformer.split(TRANSFORMER_CLASS_NAME_ID_DELIMITER);
+        if (splits.length > 2) {
+          throw new IllegalArgumentException("There should only be one colon 
in a configured transformer");
+        }
+        String id = splits[0];
+        validateIdentifier(id, identifiers, configuredTransformer);
+        Transformer transformer = ReflectionUtils.loadClass(splits[1]);
+        transformers.add(new TransformerInfo(transformer, id));
+      }
+    }
+
+    
ValidationUtils.checkArgument(transformers.stream().allMatch(TransformerInfo::hasIdentifier)
+            || transformers.stream().noneMatch(TransformerInfo::hasIdentifier),
+        "Either all transformers should have identifier or none should");
   }
 
   public List<String> getTransformersNames() {
-    return transformers.stream().map(t -> 
t.getClass().getName()).collect(Collectors.toList());
+    return transformers.stream().map(t -> 
t.getTransformer().getClass().getName()).collect(Collectors.toList());
   }
 
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
-    for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+    for (TransformerInfo transformerInfo : transformers) {
+      Transformer transformer = transformerInfo.getTransformer();
+      dataset = transformer.apply(jsc, sparkSession, dataset, 
transformerInfo.getProperties(properties));
     }
     return dataset;
   }
+
+  private void validateIdentifier(String id, Set<String> identifiers, String 
configuredTransformer) {
+    ValidationUtils.checkArgument(StringUtils.nonEmpty(id), 
String.format("Transformer identifier is empty for %s", configuredTransformer));
+    if (identifiers.contains(id)) {
+      throw new IllegalArgumentException(String.format("Duplicate identifier 
%s found for transformer %s", id, configuredTransformer));
+    } else {
+      identifiers.add(id);
+    }
+  }
+
+  private static class TransformerInfo {
+    private final Transformer transformer;
+    private final Option<String> idOpt;
+
+    private TransformerInfo(Transformer transformer, String idOpt) {
+      this.transformer = transformer;
+      this.idOpt = Option.of(idOpt);
+    }
+
+    private TransformerInfo(Transformer transformer) {
+      this.transformer = transformer;
+      this.idOpt = Option.empty();
+    }
+
+    private Transformer getTransformer() {
+      return transformer;
+    }
+
+    private boolean hasIdentifier() {
+      return idOpt.isPresent();
+    }
+
+    private TypedProperties getProperties(TypedProperties properties) {
+      TypedProperties transformerProps = properties;
+      if (idOpt.isPresent()) {
+        // Transformer specific property keys end with the id associated with 
the transformer.
+        // Ex. For id tr1, key `hoodie.deltastreamer.transformer.sql.tr1` 
would be converted to
+        // `hoodie.deltastreamer.transformer.sql` and then passed to the 
transformer.
+        String id = idOpt.get();
+        transformerProps = new TypedProperties(properties);
+        Map<String, Object> overrideKeysMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+          String key = (String) entry.getKey();
+          if (key.endsWith("." + id)) {

Review Comment:
   don't we need to remove the corresponding props from transformerProps in 
addition to adding the trimmed ones? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -19,36 +19,137 @@
 package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
  * A {@link Transformer} to chain other {@link Transformer}s and apply 
sequentially.
  */
 public class ChainedTransformer implements Transformer {
 
-  private List<Transformer> transformers;
+  // Delimiter used to separate class name and the property key suffix. The 
suffix comes first.
+  private static final String TRANSFORMER_CLASS_NAME_ID_DELIMITER = ":";
 
-  public ChainedTransformer(List<Transformer> transformers) {
-    this.transformers = transformers;
+  private final List<TransformerInfo> transformers;
+
+  public ChainedTransformer(List<Transformer> transformersList) {
+    this.transformers = new ArrayList<>(transformersList.size());
+    for (Transformer transformer : transformersList) {
+      this.transformers.add(new TransformerInfo(transformer));
+    }
+  }
+
+  /**
+   * Creates a chained transformer using the input transformer class names. 
Refer {@link HoodieDeltaStreamer.Config#transformerClassNames}
+   * for more information on how the transformers can be configured.
+   *
+   * @param configuredTransformers List of configured transformer class names.
+   * @param ignore Added for avoiding two methods with same erasure. Ignored.
+   */
+  public ChainedTransformer(List<String> configuredTransformers, int... 
ignore) {
+    this.transformers = new ArrayList<>(configuredTransformers.size());
+
+    Set<String> identifiers = new HashSet<>();
+    for (String configuredTransformer : configuredTransformers) {
+      if 
(!configuredTransformer.contains(TRANSFORMER_CLASS_NAME_ID_DELIMITER)) {
+        transformers.add(new 
TransformerInfo(ReflectionUtils.loadClass(configuredTransformer)));
+      } else {
+        String[] splits = 
configuredTransformer.split(TRANSFORMER_CLASS_NAME_ID_DELIMITER);
+        if (splits.length > 2) {
+          throw new IllegalArgumentException("There should only be one colon 
in a configured transformer");
+        }
+        String id = splits[0];
+        validateIdentifier(id, identifiers, configuredTransformer);
+        Transformer transformer = ReflectionUtils.loadClass(splits[1]);
+        transformers.add(new TransformerInfo(transformer, id));
+      }
+    }
+
+    
ValidationUtils.checkArgument(transformers.stream().allMatch(TransformerInfo::hasIdentifier)
+            || transformers.stream().noneMatch(TransformerInfo::hasIdentifier),
+        "Either all transformers should have identifier or none should");
   }
 
   public List<String> getTransformersNames() {
-    return transformers.stream().map(t -> 
t.getClass().getName()).collect(Collectors.toList());
+    return transformers.stream().map(t -> 
t.getTransformer().getClass().getName()).collect(Collectors.toList());
   }
 
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
-    for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+    for (TransformerInfo transformerInfo : transformers) {
+      Transformer transformer = transformerInfo.getTransformer();
+      dataset = transformer.apply(jsc, sparkSession, dataset, 
transformerInfo.getProperties(properties));
     }
     return dataset;
   }
+
+  private void validateIdentifier(String id, Set<String> identifiers, String 
configuredTransformer) {
+    ValidationUtils.checkArgument(StringUtils.nonEmpty(id), 
String.format("Transformer identifier is empty for %s", configuredTransformer));
+    if (identifiers.contains(id)) {
+      throw new IllegalArgumentException(String.format("Duplicate identifier 
%s found for transformer %s", id, configuredTransformer));
+    } else {
+      identifiers.add(id);
+    }
+  }
+
+  private static class TransformerInfo {
+    private final Transformer transformer;
+    private final Option<String> idOpt;
+
+    private TransformerInfo(Transformer transformer, String idOpt) {

Review Comment:
   String id



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java:
##########
@@ -191,15 +190,11 @@ public static SchemaPostProcessor 
createSchemaPostProcessor(
 
   }
 
-  public static Option<Transformer> createTransformer(List<String> classNames) 
throws IOException {
+  public static Option<Transformer> createTransformer(Option<List<String>> 
classNamesOpt) throws IOException {
     try {
-      List<Transformer> transformers = new ArrayList<>();
-      for (String className : 
Option.ofNullable(classNames).orElse(Collections.emptyList())) {
-        transformers.add(ReflectionUtils.loadClass(className));
-      }
-      return transformers.isEmpty() ? Option.empty() : Option.of(new 
ChainedTransformer(transformers));
+      return classNamesOpt.map(classNames -> classNames.isEmpty() ? null : new 
ChainedTransformer(classNames));
     } catch (Throwable e) {
-      throw new IOException("Could not load transformer class(es) " + 
classNames, e);
+      throw new IOException("Could not load transformer class(es) " + 
classNamesOpt, e);

Review Comment:
   classNamesOpt.get() 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -19,36 +19,137 @@
 package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
  * A {@link Transformer} to chain other {@link Transformer}s and apply 
sequentially.
  */
 public class ChainedTransformer implements Transformer {
 
-  private List<Transformer> transformers;
+  // Delimiter used to separate class name and the property key suffix. The 
suffix comes first.
+  private static final String TRANSFORMER_CLASS_NAME_ID_DELIMITER = ":";

Review Comment:
   ID_TRANSFORMER_CLASS_NAME_DELIMITER



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to