pontisa95 opened a new issue, #11011:
URL: https://github.com/apache/hudi/issues/11011
Hello, we are facing the fact that some pyspark job that rely on Hudi seems
to be blocked, in fact if we go over the spark console we can see the following
situation:
![Screenshot_20
1](https://github.com/apache/hudi/assets/166909082/3117f307-37c1-4295-8cbc-cccdf8f4fdfd)
we can see that we have 71 completed jobs but those are CDC process that
should read from Kafka topic continuously. We verified yet that there are
messages queued over the kafka topic. If you kill the application and then
restart in some cases the job will act normally and other times the job still
remain stacked.
Our deploy condition are the following:
We read INSERT, UPDATE and DELETE operation from a Kafka topic and we
replicate them in a target hudi table stored on Hive via a pyspark job running
24/7
**Expected behavior**
We would like to know if there is a way to reduce, or at least to keep
constant, the writing latency on the hudi table and understand if there is
something we can improve in the deploy condition or in other configuration
described below.
**Environment Description**
Hudi version : 0.12.1-amzn-0
Spark version : 3.3.0
Hive version : 3.1.3
Hadoop version : 3.3.3 amz
Storage (HDFS/S3/GCS..) : S3
Running on Docker? (yes/no) : no (EMR 6.9.0)
Additional context
HOODIE TABLE PROPERTIES:
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.hive_style_partitioning':'true',
'hoodie.index.type':'GLOBAL_BLOOM',
'hoodie.simple.index.update.partition.path':'true',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.mode': 'hms',
'hoodie.copyonwrite.record.size.estimate':285,
'hoodie.parquet.small.file.limit': 104857600,
'hoodie.parquet.max.file.size': 12000,
'hoodie.cleaner.commits.retained': 1
KAFKA READ CONFIG:
.readStream
.format("kafka")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.enabled.protocols", "TLSv1.2, TLSv1.1, TLSv1")
.option("kafka.ssl.protocol", "TLS")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "true")
.option("maxOffsetsPerTrigger", 2000)
.option("kafka.group.id",CG_NAME)
.load()
PYSPARK WRITE
df_source.writeStream.foreachBatch(foreach_batch_write_function)
FOR EACH BATCH FUNCTION:
#management of delete messages
batchDF_deletes.write.format('hudi') \
.option('hoodie.datasource.write.operation', 'delete') \
.options(**hudiOptions_table) \
.mode('append') \
.save(S3_OUTPUT_PATH)
#management of update and insert messages
batchDF_upserts.write.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.options(**hudiOptions_table) \
.mode('append') \
.save(S3_OUTPUT_PATH)
SPARK SUBMIT
spark-submit --master yarn --deploy-mode cluster --num-executors 1
--executor-memory 1G --executor-cores 2 --conf
spark.dynamicAllocation.enabled=false --packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf
spark.sql.hive.convertMetastoreParquet=false --jars
/usr/lib/hudi/hudi-spark-bundle.jar
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org