nsivabalan commented on code in PR #8514: URL: https://github.com/apache/hudi/pull/8514#discussion_r1183180023
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java: ########## @@ -276,7 +276,17 @@ public static class Config implements Serializable { + ". Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before " + "writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which " + "allows a SQL query templated to be passed as a transformation function). " - + "Pass a comma-separated list of subclass names to chain the transformations.") + + "Pass a comma-separated list of subclass names to chain the transformations. Transformer can also include " + + "an identifier. E:g - tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer. Here the identifier tr1 " + + "can be used along with property key like `hoodie.deltastreamer.transformer.sql.tr1` to identify properties related " + + "to the transformer. So effective value for `hoodie.deltastreamer.transformer.sql` is determined by key " + + "`hoodie.deltastreamer.transformer.sql.tr1` for this transformer. This is useful when there are two or more " + + "transformers using the same config keys and expect different values for those keys. If identifier is used, it should " + + "be specified for all the transformers. Further the order in which transformer is applied is determined by the occurrence " + + "of transformer irrespective of the identifier used for the transformer. For example: In the configured value below " + + "tr2:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer,tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer " + + ", tr2 is applied before tr1 based on order of occurrence." + ) Review Comment: can we call out that this identifier format is not strictly required unless users have a requirement to have multiple transformers of the same type. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java: ########## @@ -19,36 +19,137 @@ package org.apache.hudi.utilities.transform; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** * A {@link Transformer} to chain other {@link Transformer}s and apply sequentially. */ public class ChainedTransformer implements Transformer { - private List<Transformer> transformers; + // Delimiter used to separate class name and the property key suffix. The suffix comes first. + private static final String TRANSFORMER_CLASS_NAME_ID_DELIMITER = ":"; - public ChainedTransformer(List<Transformer> transformers) { - this.transformers = transformers; + private final List<TransformerInfo> transformers; + + public ChainedTransformer(List<Transformer> transformersList) { + this.transformers = new ArrayList<>(transformersList.size()); + for (Transformer transformer : transformersList) { + this.transformers.add(new TransformerInfo(transformer)); + } + } + + /** + * Creates a chained transformer using the input transformer class names. Refer {@link HoodieDeltaStreamer.Config#transformerClassNames} + * for more information on how the transformers can be configured. + * + * @param configuredTransformers List of configured transformer class names. + * @param ignore Added for avoiding two methods with same erasure. Ignored. + */ + public ChainedTransformer(List<String> configuredTransformers, int... ignore) { + this.transformers = new ArrayList<>(configuredTransformers.size()); + + Set<String> identifiers = new HashSet<>(); + for (String configuredTransformer : configuredTransformers) { + if (!configuredTransformer.contains(TRANSFORMER_CLASS_NAME_ID_DELIMITER)) { + transformers.add(new TransformerInfo(ReflectionUtils.loadClass(configuredTransformer))); + } else { + String[] splits = configuredTransformer.split(TRANSFORMER_CLASS_NAME_ID_DELIMITER); + if (splits.length > 2) { + throw new IllegalArgumentException("There should only be one colon in a configured transformer"); + } + String id = splits[0]; + validateIdentifier(id, identifiers, configuredTransformer); + Transformer transformer = ReflectionUtils.loadClass(splits[1]); + transformers.add(new TransformerInfo(transformer, id)); + } + } + + ValidationUtils.checkArgument(transformers.stream().allMatch(TransformerInfo::hasIdentifier) + || transformers.stream().noneMatch(TransformerInfo::hasIdentifier), + "Either all transformers should have identifier or none should"); } public List<String> getTransformersNames() { - return transformers.stream().map(t -> t.getClass().getName()).collect(Collectors.toList()); + return transformers.stream().map(t -> t.getTransformer().getClass().getName()).collect(Collectors.toList()); } @Override public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { Dataset<Row> dataset = rowDataset; - for (Transformer t : transformers) { - dataset = t.apply(jsc, sparkSession, dataset, properties); + for (TransformerInfo transformerInfo : transformers) { + Transformer transformer = transformerInfo.getTransformer(); + dataset = transformer.apply(jsc, sparkSession, dataset, transformerInfo.getProperties(properties)); } return dataset; } + + private void validateIdentifier(String id, Set<String> identifiers, String configuredTransformer) { + ValidationUtils.checkArgument(StringUtils.nonEmpty(id), String.format("Transformer identifier is empty for %s", configuredTransformer)); + if (identifiers.contains(id)) { + throw new IllegalArgumentException(String.format("Duplicate identifier %s found for transformer %s", id, configuredTransformer)); + } else { + identifiers.add(id); + } + } + + private static class TransformerInfo { + private final Transformer transformer; + private final Option<String> idOpt; + + private TransformerInfo(Transformer transformer, String idOpt) { + this.transformer = transformer; + this.idOpt = Option.of(idOpt); + } + + private TransformerInfo(Transformer transformer) { + this.transformer = transformer; + this.idOpt = Option.empty(); + } + + private Transformer getTransformer() { + return transformer; + } + + private boolean hasIdentifier() { + return idOpt.isPresent(); + } + + private TypedProperties getProperties(TypedProperties properties) { + TypedProperties transformerProps = properties; + if (idOpt.isPresent()) { + // Transformer specific property keys end with the id associated with the transformer. + // Ex. For id tr1, key `hoodie.deltastreamer.transformer.sql.tr1` would be converted to + // `hoodie.deltastreamer.transformer.sql` and then passed to the transformer. + String id = idOpt.get(); + transformerProps = new TypedProperties(properties); + Map<String, Object> overrideKeysMap = new HashMap<>(); + for (Map.Entry<Object, Object> entry : properties.entrySet()) { + String key = (String) entry.getKey(); + if (key.endsWith("." + id)) { Review Comment: don't we need to remove the corresponding props from transformerProps in addition to adding the trimmed ones? ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java: ########## @@ -19,36 +19,137 @@ package org.apache.hudi.utilities.transform; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** * A {@link Transformer} to chain other {@link Transformer}s and apply sequentially. */ public class ChainedTransformer implements Transformer { - private List<Transformer> transformers; + // Delimiter used to separate class name and the property key suffix. The suffix comes first. + private static final String TRANSFORMER_CLASS_NAME_ID_DELIMITER = ":"; - public ChainedTransformer(List<Transformer> transformers) { - this.transformers = transformers; + private final List<TransformerInfo> transformers; + + public ChainedTransformer(List<Transformer> transformersList) { + this.transformers = new ArrayList<>(transformersList.size()); + for (Transformer transformer : transformersList) { + this.transformers.add(new TransformerInfo(transformer)); + } + } + + /** + * Creates a chained transformer using the input transformer class names. Refer {@link HoodieDeltaStreamer.Config#transformerClassNames} + * for more information on how the transformers can be configured. + * + * @param configuredTransformers List of configured transformer class names. + * @param ignore Added for avoiding two methods with same erasure. Ignored. + */ + public ChainedTransformer(List<String> configuredTransformers, int... ignore) { + this.transformers = new ArrayList<>(configuredTransformers.size()); + + Set<String> identifiers = new HashSet<>(); + for (String configuredTransformer : configuredTransformers) { + if (!configuredTransformer.contains(TRANSFORMER_CLASS_NAME_ID_DELIMITER)) { + transformers.add(new TransformerInfo(ReflectionUtils.loadClass(configuredTransformer))); + } else { + String[] splits = configuredTransformer.split(TRANSFORMER_CLASS_NAME_ID_DELIMITER); + if (splits.length > 2) { + throw new IllegalArgumentException("There should only be one colon in a configured transformer"); + } + String id = splits[0]; + validateIdentifier(id, identifiers, configuredTransformer); + Transformer transformer = ReflectionUtils.loadClass(splits[1]); + transformers.add(new TransformerInfo(transformer, id)); + } + } + + ValidationUtils.checkArgument(transformers.stream().allMatch(TransformerInfo::hasIdentifier) + || transformers.stream().noneMatch(TransformerInfo::hasIdentifier), + "Either all transformers should have identifier or none should"); } public List<String> getTransformersNames() { - return transformers.stream().map(t -> t.getClass().getName()).collect(Collectors.toList()); + return transformers.stream().map(t -> t.getTransformer().getClass().getName()).collect(Collectors.toList()); } @Override public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { Dataset<Row> dataset = rowDataset; - for (Transformer t : transformers) { - dataset = t.apply(jsc, sparkSession, dataset, properties); + for (TransformerInfo transformerInfo : transformers) { + Transformer transformer = transformerInfo.getTransformer(); + dataset = transformer.apply(jsc, sparkSession, dataset, transformerInfo.getProperties(properties)); } return dataset; } + + private void validateIdentifier(String id, Set<String> identifiers, String configuredTransformer) { + ValidationUtils.checkArgument(StringUtils.nonEmpty(id), String.format("Transformer identifier is empty for %s", configuredTransformer)); + if (identifiers.contains(id)) { + throw new IllegalArgumentException(String.format("Duplicate identifier %s found for transformer %s", id, configuredTransformer)); + } else { + identifiers.add(id); + } + } + + private static class TransformerInfo { + private final Transformer transformer; + private final Option<String> idOpt; + + private TransformerInfo(Transformer transformer, String idOpt) { Review Comment: String id ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java: ########## @@ -191,15 +190,11 @@ public static SchemaPostProcessor createSchemaPostProcessor( } - public static Option<Transformer> createTransformer(List<String> classNames) throws IOException { + public static Option<Transformer> createTransformer(Option<List<String>> classNamesOpt) throws IOException { try { - List<Transformer> transformers = new ArrayList<>(); - for (String className : Option.ofNullable(classNames).orElse(Collections.emptyList())) { - transformers.add(ReflectionUtils.loadClass(className)); - } - return transformers.isEmpty() ? Option.empty() : Option.of(new ChainedTransformer(transformers)); + return classNamesOpt.map(classNames -> classNames.isEmpty() ? null : new ChainedTransformer(classNames)); } catch (Throwable e) { - throw new IOException("Could not load transformer class(es) " + classNames, e); + throw new IOException("Could not load transformer class(es) " + classNamesOpt, e); Review Comment: classNamesOpt.get() ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java: ########## @@ -19,36 +19,137 @@ package org.apache.hudi.utilities.transform; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** * A {@link Transformer} to chain other {@link Transformer}s and apply sequentially. */ public class ChainedTransformer implements Transformer { - private List<Transformer> transformers; + // Delimiter used to separate class name and the property key suffix. The suffix comes first. + private static final String TRANSFORMER_CLASS_NAME_ID_DELIMITER = ":"; Review Comment: ID_TRANSFORMER_CLASS_NAME_DELIMITER -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org