This is an automated email from the ASF dual-hosted git repository. nagarwal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new ea983ff [HUDI-1137] Add option to configure different path selector ea983ff is described below commit ea983ff912dab8604eec3085bc1c041cb6e60bc8 Author: Satish Kotha <satishko...@uber.com> AuthorDate: Mon Aug 24 11:11:10 2020 -0700 [HUDI-1137] Add option to configure different path selector --- .../integ/testsuite/helpers/DFSTestSuitePathSelector.java | 14 ++++++++++++-- .../main/java/org/apache/hudi/utilities/UtilHelpers.java | 9 +++++++-- .../org/apache/hudi/utilities/sources/AvroDFSSource.java | 3 +-- .../hudi/utilities/sources/helpers/DFSPathSelector.java | 1 + 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java index b67e21f..bfc8368 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java @@ -32,12 +32,17 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob; import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A custom dfs path selector used only for the hudi test suite. To be used only if workload is not run inline. */ public class DFSTestSuitePathSelector extends DFSPathSelector { + private static volatile Logger log = LoggerFactory.getLogger(HoodieTestSuiteJob.class); public DFSTestSuitePathSelector(TypedProperties props, Configuration hadoopConf) { super(props, hadoopConf); @@ -54,9 +59,12 @@ public class DFSTestSuitePathSelector extends DFSPathSelector { lastBatchId = Integer.parseInt(lastCheckpointStr.get()); nextBatchId = lastBatchId + 1; } else { - lastBatchId = -1; - nextBatchId = 0; + lastBatchId = 0; + nextBatchId = 1; } + + log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: " + sourceLimit + + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId); // obtain all eligible files for the batch List<FileStatus> eligibleFiles = new ArrayList<>(); FileStatus[] fileStatuses = fs.globStatus( @@ -73,6 +81,8 @@ public class DFSTestSuitePathSelector extends DFSPathSelector { } } } + + log.info("Reading " + eligibleFiles.size() + " files. "); // no data to readAvro if (eligibleFiles.size() == 0) { return new ImmutablePair<>(Option.empty(), diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 0531196..14e16ab 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -352,12 +352,17 @@ public class UtilHelpers { } } - public static DFSPathSelector createSourceSelector(String sourceSelectorClass, TypedProperties props, + public static DFSPathSelector createSourceSelector(TypedProperties props, Configuration conf) throws IOException { + String sourceSelectorClass = + props.getString(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, DFSPathSelector.class.getName()); try { - return (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass, + DFSPathSelector selector = (DFSPathSelector) ReflectionUtils.loadClass(sourceSelectorClass, new Class<?>[]{TypedProperties.class, Configuration.class}, props, conf); + + LOG.info("Using path selector " + selector.getClass().getName()); + return selector; } catch (Throwable e) { throw new IOException("Could not load source selector class " + sourceSelectorClass, e); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java index b5ce96f..b8f29e8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java @@ -47,8 +47,7 @@ public class AvroDFSSource extends AvroSource { SchemaProvider schemaProvider) throws IOException { super(props, sparkContext, sparkSession, schemaProvider); this.pathSelector = UtilHelpers - .createSourceSelector(DFSPathSelector.class.getName(), props, sparkContext - .hadoopConfiguration()); + .createSourceSelector(props, sparkContext.hadoopConfiguration()); } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java index 59263e4..5d56f2a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java @@ -52,6 +52,7 @@ public class DFSPathSelector { public static class Config { public static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root"; + public static final String SOURCE_INPUT_SELECTOR = "hoodie.deltastreamer.source.input.selector"; } protected static final List<String> IGNORE_FILEPREFIX_LIST = Arrays.asList(".", "_");