This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d785b41f01004897e0d9c2882a58f79827f5d9fe Author: XuQianJin-Stars <forwar...@apache.com> AuthorDate: Sun Oct 23 17:09:49 2022 +0800 fix cherry pick err --- .../hudi/metrics/zhiyan/ZhiyanMetricsReporter.java | 6 ----- .../apache/hudi/configuration/FlinkOptions.java | 31 ++++++++++++++-------- .../main/java/org/apache/hudi/DataSourceUtils.java | 5 ++++ pom.xml | 12 ++++----- 4 files changed, 31 insertions(+), 23 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java index 323fe17106..6b820547a0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java @@ -23,7 +23,6 @@ import com.codahale.metrics.MetricRegistry; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metrics.MetricsReporter; -import java.io.Closeable; import java.util.concurrent.TimeUnit; public class ZhiyanMetricsReporter extends MetricsReporter { @@ -54,11 +53,6 @@ public class ZhiyanMetricsReporter extends MetricsReporter { reporter.report(); } - @Override - public Closeable getReporter() { - return reporter; - } - @Override public void stop() { reporter.stop(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 4a298839fb..31c8b554c0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -416,7 +416,7 @@ public class FlinkOptions extends HoodieConfig { .key("write.bucket_assign.tasks") .intType() .noDefaultValue() - .withDescription("Parallelism of tasks that do bucket assign, default is the parallelism of the execution environment"); + .withDescription("Parallelism of tasks that do bucket assign, default same as the write task parallelism"); public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions .key("write.tasks") @@ -585,7 +585,7 @@ public class FlinkOptions extends HoodieConfig { .stringType() .defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) .withDescription("Clean policy to manage the Hudi table. Available option: KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS." - + "Default is KEEP_LATEST_COMMITS."); + + "Default is KEEP_LATEST_COMMITS."); public static final ConfigOption<Integer> CLEAN_RETAIN_COMMITS = ConfigOptions .key("clean.retain_commits") @@ -594,6 +594,14 @@ public class FlinkOptions extends HoodieConfig { .withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" + "This also directly translates into how much you can incrementally pull on this table, default 30"); + public static final ConfigOption<Integer> CLEAN_RETAIN_HOURS = ConfigOptions + .key("clean.retain_hours") + .intType() + .defaultValue(24)// default 24 hours + .withDescription("Number of hours for which commits need to be retained. This config provides a more flexible option as" + + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group," + + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned."); + public static final ConfigOption<Integer> CLEAN_RETAIN_FILE_VERSIONS = ConfigOptions .key("clean.retain_file_versions") .intType() @@ -657,7 +665,7 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption<String> CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME = ConfigOptions .key("clustering.plan.partition.filter.mode") .stringType() - .defaultValue("NONE") + .defaultValue(ClusteringPlanPartitionFilterMode.NONE.name()) .withDescription("Partition filter mode used in the creation of clustering plan. Available values are - " + "NONE: do not filter table partition and thus the clustering plan will include all partitions that have clustering candidate." + "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '" @@ -665,16 +673,16 @@ public class FlinkOptions extends HoodieConfig { + "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '" + PARTITION_FILTER_END_PARTITION.key() + "']."); - public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions + public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions .key("clustering.plan.strategy.target.file.max.bytes") - .intType() - .defaultValue(1024 * 1024 * 1024) // default 1 GB + .longType() + .defaultValue(1024 * 1024 * 1024L) // default 1 GB .withDescription("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB"); - public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions + public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions .key("clustering.plan.strategy.small.file.limit") - .intType() - .defaultValue(600) // default 600 MB + .longType() + .defaultValue(600L) // default 600 MB .withDescription("Files smaller than the size specified here are candidates for clustering, default 600 MB"); public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions @@ -698,6 +706,7 @@ public class FlinkOptions extends HoodieConfig { // ------------------------------------------------------------------------ // Hive Sync Options // ------------------------------------------------------------------------ + public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions .key("hive_sync.enable") .booleanType() @@ -725,8 +734,8 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions .key("hive_sync.mode") .stringType() - .defaultValue("jdbc") - .withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'jdbc'"); + .defaultValue(HiveSyncMode.HMS.name()) + .withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'hms'"); public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions .key("hive_sync.username") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 0df3a0c8da..2b7d8f49b4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -194,6 +194,11 @@ public class DataSourceUtils { .withProps(parameters).build(); } + public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath, + String tblName, Map<String, String> parameters) { + return createHoodieClient(jssc, schemaStr, basePath, "default", tblName, parameters); + } + public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath, String dbName, String tblName, Map<String, String> parameters) { return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, dbName, tblName, parameters)); diff --git a/pom.xml b/pom.xml index 159ae2a841..60c13c8f07 100644 --- a/pom.xml +++ b/pom.xml @@ -375,12 +375,12 @@ <rules> <bannedDependencies> <excludes> - <exclude>org.slf4j:slf4j-log4j12</exclude> - <exclude>org.sl4fj:slf4j-simple</exclude> - <exclude>org.sl4fj:slf4j-jdk14</exclude> - <exclude>org.sl4fj:slf4j-nop</exclude> - <exclude>org.sl4fj:slf4j-jcl</exclude> - <exclude>log4j:log4j</exclude> +<!-- <exclude>org.slf4j:slf4j-log4j12</exclude>--> +<!-- <exclude>org.sl4fj:slf4j-simple</exclude>--> +<!-- <exclude>org.sl4fj:slf4j-jdk14</exclude>--> +<!-- <exclude>org.sl4fj:slf4j-nop</exclude>--> +<!-- <exclude>org.sl4fj:slf4j-jcl</exclude>--> +<!-- <exclude>log4j:log4j</exclude>--> <exclude>ch.qos.logback:logback-classic</exclude> <!-- NOTE: We're banning any HBase deps versions other than the approved ${hbase.version}, which is aimed at preventing the classpath collisions w/ transitive deps usually) -->