bradleyhurley opened a new issue #2068:
URL: https://github.com/apache/hudi/issues/2068


   **Describe the problem you faced**
   
   We are using the HoodieDeltaStreamer in COPY_ON_WRITE (COW) mode to process 
data out of Kafka. We are currently working though what is similar to a bulk 
load scenario, but we can't guarantee that updates won't exist as the data is 
processed out of Kafka. For this example let's assume that the table we are 
processing out of kafka is 1 billion rows and 40 columns wide.
   
   The 100 million rows number is coming from 
`hoodie.deltastreamer.kafka.source.maxEvents=100000000`
   
   The first execution of the deltastreamer took 23 minutes to process 100 
million rows. Running the exact same job again should result in 200 million 
rows being written to S3 and accessible via Athena, but the job(s) fail to 
complete.
   
   Most of the tables I am processing are under 100M rows, but another scenario 
is a table with 101,500,000 rows in kafka. The initial deltastreamer took 23 
minutes, and the second execution to go from 100,000,000 to 101,500,000 (+1.5M) 
rows took 1 hour and 23 minutes.
   
   Upserts on relativly large datasets are slow and sometimes fail to complete.
   
   **Full Spark Submit**
   ```
   spark-submit 
    --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
    --jars s3://hudi-ext-1.0-SNAPSHOT.jar
    --master yarn 
    --deploy-mode client 
    --num-executors 25 
    --executor-cores 3 
    --executor-memory 4G 
    --driver-memory 6g 
    --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps" 
    --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps" 
    --conf spark.task.maxFailures=10 
    --conf spark.memory.fraction=0.4 
    --conf spark.rdd.compress=true 
    --conf spark.kryoserializer.buffer.max=200m 
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
    --conf spark.dynamicAllocation.enabled=False 
    --conf spark.port.maxRetries=100 
    --table-type COPY_ON_WRITE 
    /usr/lib/hudi/hudi-utilities-bundle.jar 
    --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
    --source-ordering-field MODIFIEDDATE 
    --target-base-path {{ s3_path }}
    --target-table {{ table_name }} 
    --props s3://{{ s3 bucket}}/hudi/config/hudi.properties (empty file)
    --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider 
    --hoodie-conf hoodie.embed.timeline.server=true 
    --hoodie-conf hoodie.filesystem.view.type=EMBEDDED_KV_STORE 
    --hoodie-conf hoodie.compact.inline=false 
    --hoodie-conf hoodie.datasource.write.recordkey.field=ID 
    --hoodie-conf hoodie.datasource.write.partitionpath.field=ID 
    --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexNonPartKeyGenDecimalSupport
 
    --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url= {{ url }}
    --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.targetUrl={{ url 
}}
    --hoodie-conf schema.registry.url={{ schema_registry_url }}
    --hoodie-conf hoodie.deltastreamer.source.kafka.topic={{ kafka_topic }} 
    --hoodie-conf group.id={{ group_id }} 
    --hoodie-conf enable.auto.commit=false 
    --hoodie-conf bootstrap.servers={{ bootstrap_servers_url }}
    --hoodie-conf auto.offset.reset=earliest 
    --hoodie-conf hoodie.consistency.check.enabled=true 
    --hoodie-conf security.protocol=SSL 
    --hoodie-conf ssl.keystore.location=/usr/lib/jvm/jre/lib/security/cacerts 
    --hoodie-conf ssl.keystore.password={{ password }} 
    --hoodie-conf hoodie.deltastreamer.kafka.source.maxEvents=100000000 
    --enable-hive-sync 
    --hoodie-conf hoodie.datasource.hive_sync.table={{ table_name }}
    --hoodie-conf hoodie.datasource.hive_sync.database={{ db_name }}
    --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
    ```
   
   From the tuning guide I am curious if we should really be using 
`spark.executor.cores 1` vs ` --executor-cores 3` and if our `--num-executors 
25` is too low.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Load Kafka with 100M+ rows
   2. Use provided spark-submit
   3. Run deltastreamer to get to 100M rows
   4. Run deltastreamer again to go from 100M to 200M or total number of unique 
rows in kafka
   
   **Expected behavior**
   
   Unsure, what is considered a reasonable amount of time, or if the maxEvents 
value of 100M is too large, but based on processing tables with just over 100M 
rows in kafka and the second execution taking a significant amount of time I 
feel like my configuration is fundamentally wrong, or something is going on.
   
   **Environment Description**
   
   AWS EMR 5.30. Spark Jobs are submitted via the EMR Step API.
   
   * Hudi version : 0.5.2-inc
   
   * Spark version : 2.4.5
   
   * Hive version : 2.3.6
   
   * Hadoop version : 2.8.5
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   
   **Additional context**
   
   Please let me know what additional spark logs would be helpful and I can 
provide them.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to