bkosuru opened a new issue #3892:
URL: https://github.com/apache/hudi/issues/3892


   Hello,
   
   I am trying to copy 2.4 TB of data stored in hudi to another location in 
hdfs because we lost commit history due to default commit settings. 
   
   The source had 3411 files where as the copy is creating 44764 small 
files(~50MB each). How do I keep the file count about same as the original? It 
takes longer to load data from the copy when you run queries.  ("Listing leaf 
files and directories " job takes more time). The data was inserted into the 
source over a period of time.
   
   Source data has 151 partitions. Data is skewed with 21 partitions having 
data greater than 50G, 8 partitions with ~2GB and rest of them less than 1G. 
The destination also the same 151 partitions. 
   
   I tried different values for INSERT_PARALLELISM 2000, 5000, 9000. I get the 
same 44764 files in the output.
   
   I also tried setting
   .option(PARQUET_SMALL_FILE_LIMIT_BYTES, 128 * 1024 * 1024)
   This does not seem to impact INSERT
    
   **Environment Description**
   
   * Hudi version : 0.8.0
   
   * Spark version : 2.4.4
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : No
   
   * Table type: COW
   
   Spark settings:
   new SparkConf()
             .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
             .set("spark.ui.enabled", "false")
             .set("spark.sql.parquet.mergeSchema", "false")
             .set("spark.sql.files.ignoreCorruptFiles", "true")
             .set("spark.sql.hive.convertMetastoreParquet", "false")
             
   spark-submit:
   spark-submit \
   --master yarn \
   --deploy-mode cluster \
   --name kg-copy \
   --driver-memory 24G \
   --executor-memory 50G \
   --executor-cores 6 \
   --num-executors 500 \
   --conf spark.dynamicAllocation.enabled=False \
   --conf spark.network.timeout=240s \
   --conf spark.shuffle.sasl.timeout=60000 \
   --conf spark.driver.maxResultSize=20g \
   --conf spark.port.maxRetries=60 \
   --conf spark.shuffle.service.enabled=True \
   --conf spark.sql.shuffle.partitions=3000 \
   --conf "spark.driver.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" \
   --conf "spark.executor.extraJavaOptions=-XX:NewSize=1g -XX:SurvivorRatio=2 
-XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC 
-XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" \
   --conf spark.driver.memoryOverhead=1024 \
   --conf spark.executor.memoryOverhead=3072 \
   --conf spark.yarn.max.executor.failures=100 \
   --conf spark.kryoserializer.buffer.max=512m \
   --conf spark.task.maxFailures=4 \
   --conf spark.rdd.compress=True
   
   private val AVG_RECORD_SIZE: Int =
       256 // approx bytes of our average record, contra Hudi default 
assumption of 1024
     private val ONE_GIGABYTE: Int =
       1024 * 1024 * 1024 // used for Parquet file size & block size
     private val BLOOM_MAX_ENTRIES: Int = ONE_GIGABYTE / (2 * AVG_RECORD_SIZE)
     
   df.write
         .format("hudi")
         // DataSourceWriteOptions
         .option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
         .option( KEYGENERATOR_CLASS_OPT_KEY,"com.xyz.SpoKeyGenerator")
         .option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL) 
         .option(INSERT_DROP_DUPS_OPT_KEY, value = false)
         .option(INSERT_PARALLELISM, 2000)
         .option(PARTITIONPATH_FIELD_OPT_KEY, "g,p")
         .option(PRECOMBINE_FIELD_OPT_KEY, "isDeleted")
         .option(RECORDKEY_FIELD_OPT_KEY, "s,o")
         .option(URL_ENCODE_PARTITIONING_OPT_KEY, value = true)
         // HoodieIndexConfig
         .option(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, 
BLOOM_MAX_ENTRIES)
         .option(BLOOM_INDEX_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name)
         // HoodieStorageConfig
         .option(LOGFILE_SIZE_MAX_BYTES, ONE_GIGABYTE / 0.35)
         .option(PARQUET_BLOCK_SIZE_BYTES, ONE_GIGABYTE)
         .option(PARQUET_FILE_MAX_BYTES,ONE_GIGABYTE) 
         // Commit history
         .option(CLEANER_COMMITS_RETAINED_PROP, Integer.MAX_VALUE - 2)
         .option(MIN_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE - 1)
         .option(MAX_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE)
         // HoodieWriteConfig
         .option(EMBEDDED_TIMELINE_SERVER_ENABLED, "false")
         .option(TABLE_NAME, "spog")
         .mode(SaveMode.Append)
             
            
   class SpoKeyGenerator(props: TypedProperties)
       extends ComplexKeyGenerator(props) {
   
     def hash128(s: String): String = {
       val h: Array[Long] = MurmurHash3.hash128(s.getBytes)
       h(0).toString + h(1).toString
     }
   
     override def getRecordKey(record: GenericRecord): String = {
       val s = HoodieAvroUtils.getNestedFieldValAsString(record, "s", false)
       val o = HoodieAvroUtils.getNestedFieldValAsString(record, "o", false)
       genKey(s, o)
     }
   
     private def genKey(s: String, o: String): String = hash128(s + o)
   
     override def getRecordKey(row: Row): String = {
       val s = row.getAs(0).toString
       val o = row.getAs(1).toString
       genKey(s, o)
     }
   
   }
   
   Thanks,
   Bindu
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to