bkosuru opened a new issue #3892: URL: https://github.com/apache/hudi/issues/3892
Hello, I am trying to copy 2.4 TB of data stored in hudi to another location in hdfs because we lost commit history due to default commit settings. The source had 3411 files where as the copy is creating 44764 small files(~50MB each). How do I keep the file count about same as the original? It takes longer to load data from the copy when you run queries. ("Listing leaf files and directories " job takes more time). The data was inserted into the source over a period of time. Source data has 151 partitions. Data is skewed with 21 partitions having data greater than 50G, 8 partitions with ~2GB and rest of them less than 1G. The destination also the same 151 partitions. I tried different values for INSERT_PARALLELISM 2000, 5000, 9000. I get the same 44764 files in the output. I also tried setting .option(PARQUET_SMALL_FILE_LIMIT_BYTES, 128 * 1024 * 1024) This does not seem to impact INSERT **Environment Description** * Hudi version : 0.8.0 * Spark version : 2.4.4 * Storage (HDFS/S3/GCS..) : HDFS * Running on Docker? (yes/no) : No * Table type: COW Spark settings: new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.ui.enabled", "false") .set("spark.sql.parquet.mergeSchema", "false") .set("spark.sql.files.ignoreCorruptFiles", "true") .set("spark.sql.hive.convertMetastoreParquet", "false") spark-submit: spark-submit \ --master yarn \ --deploy-mode cluster \ --name kg-copy \ --driver-memory 24G \ --executor-memory 50G \ --executor-cores 6 \ --num-executors 500 \ --conf spark.dynamicAllocation.enabled=False \ --conf spark.network.timeout=240s \ --conf spark.shuffle.sasl.timeout=60000 \ --conf spark.driver.maxResultSize=20g \ --conf spark.port.maxRetries=60 \ --conf spark.shuffle.service.enabled=True \ --conf spark.sql.shuffle.partitions=3000 \ --conf "spark.driver.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" \ --conf "spark.executor.extraJavaOptions=-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" \ --conf spark.driver.memoryOverhead=1024 \ --conf spark.executor.memoryOverhead=3072 \ --conf spark.yarn.max.executor.failures=100 \ --conf spark.kryoserializer.buffer.max=512m \ --conf spark.task.maxFailures=4 \ --conf spark.rdd.compress=True private val AVG_RECORD_SIZE: Int = 256 // approx bytes of our average record, contra Hudi default assumption of 1024 private val ONE_GIGABYTE: Int = 1024 * 1024 * 1024 // used for Parquet file size & block size private val BLOOM_MAX_ENTRIES: Int = ONE_GIGABYTE / (2 * AVG_RECORD_SIZE) df.write .format("hudi") // DataSourceWriteOptions .option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true") .option( KEYGENERATOR_CLASS_OPT_KEY,"com.xyz.SpoKeyGenerator") .option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL) .option(INSERT_DROP_DUPS_OPT_KEY, value = false) .option(INSERT_PARALLELISM, 2000) .option(PARTITIONPATH_FIELD_OPT_KEY, "g,p") .option(PRECOMBINE_FIELD_OPT_KEY, "isDeleted") .option(RECORDKEY_FIELD_OPT_KEY, "s,o") .option(URL_ENCODE_PARTITIONING_OPT_KEY, value = true) // HoodieIndexConfig .option(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_MAX_ENTRIES) .option(BLOOM_INDEX_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name) // HoodieStorageConfig .option(LOGFILE_SIZE_MAX_BYTES, ONE_GIGABYTE / 0.35) .option(PARQUET_BLOCK_SIZE_BYTES, ONE_GIGABYTE) .option(PARQUET_FILE_MAX_BYTES,ONE_GIGABYTE) // Commit history .option(CLEANER_COMMITS_RETAINED_PROP, Integer.MAX_VALUE - 2) .option(MIN_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE - 1) .option(MAX_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE) // HoodieWriteConfig .option(EMBEDDED_TIMELINE_SERVER_ENABLED, "false") .option(TABLE_NAME, "spog") .mode(SaveMode.Append) class SpoKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) { def hash128(s: String): String = { val h: Array[Long] = MurmurHash3.hash128(s.getBytes) h(0).toString + h(1).toString } override def getRecordKey(record: GenericRecord): String = { val s = HoodieAvroUtils.getNestedFieldValAsString(record, "s", false) val o = HoodieAvroUtils.getNestedFieldValAsString(record, "o", false) genKey(s, o) } private def genKey(s: String, o: String): String = hash128(s + o) override def getRecordKey(row: Row): String = { val s = row.getAs(0).toString val o = row.getAs(1).toString genKey(s, o) } } Thanks, Bindu -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org