[ 
https://issues.apache.org/jira/browse/HUDI-3242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479616#comment-17479616
 ] 

sivabalan narayanan commented on HUDI-3242:
-------------------------------------------

hey [~h7kanna] : let me try to understand the problem. 

I see you have reported 2 issues:
 # some files are missing to be read while querying (from 2nd partition in your 
example)
 # backfilling is not working. the checkpoint which you set is not being picked 
up.

Let's go one by one. 

(2): wondering how come your checkpoint in last commit metadata is 0. If you 
made some commits, checkpoint should have progressed to some non 0 value in my 
understanding. We need to identify the root cause for this. But if your 
checkpoint in your latest commit metadata is non 0, if you set checkpoint value 
to 0 when you restart your deltastreamer, it should been picked up. 

 

(1): Can you reproduce this consistently? I see you have clustering enabled. We 
had a [bug|https://github.com/apache/hudi/pull/4172] sometime back where if 
there was a pending clustering operation, some files were missed to be read 
while querying. Can you wait until clustering completes, and see if you could 
see all data. 

btw, how did you confirm that some files are missed to be read/ discovered? Did 
you check the sql tab in spark UI ? or is it based on the query results 
returned for your read query. 

 

 

> Checkpoint 0 is ignored -Partial parquet file discovery after the first commit
> ------------------------------------------------------------------------------
>
>                 Key: HUDI-3242
>                 URL: https://issues.apache.org/jira/browse/HUDI-3242
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: spark, writer-core
>    Affects Versions: 0.10.1
>         Environment: AWS
> EMR 6.4.0
> Spark 3.1.2
> Hudi - 0.10.1-rc
>            Reporter: Harsha Teja Kanna
>            Assignee: sivabalan narayanan
>            Priority: Critical
>              Labels: hudi-on-call, sev:critical, user-support-issues
>         Attachments: Screen Shot 2022-01-13 at 2.40.55 AM.png, Screen Shot 
> 2022-01-13 at 2.55.35 AM.png
>
>   Original Estimate: 3h
>  Remaining Estimate: 3h
>
> Hi, I am testing release branch 0.10.1 as I needed few bug fixes from it.
> However, I see for a certain table. Only partial discovery of files happening 
> after the initial commit of the table.
> But if the second partition is given as input for the first commit, all the 
> files are getting discovered.
> First partition : 2021/01 has 744 files and all of them are discovered
> Second partition: 2021/02 has 762 files but only 72 are discovered.
> Checkpoint is set to 0. 
> No errors in the logs.
> {code:java}
> spark-submit \
> --master yarn \
> --deploy-mode cluster \
> --driver-cores 30 \
> --driver-memory 32g \
> --executor-cores 5 \
> --executor-memory 32g \
> --num-executors 120 \
> --jars 
> s3://bucket/apps/datalake/jars/unused-1.0.0.jar,s3://bucket/apps/datalake/jars/spark-avro_2.12-3.1.2.jar
>  \
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
> s3://bucket/apps/datalake/jars/hudi-0.10.0/hudi-utilities-bundle_2.12-0.10.0.jar
>  \
> --table-type COPY_ON_WRITE \
> --source-ordering-field timestamp \
> --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
> --target-base-path s3a://datalake-hudi/datastream/v1/sessions_by_date \
> --target-table sessions_by_date \
> --transformer-class 
> org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
> --op INSERT \
> --checkpoint 0 \
> --hoodie-conf hoodie.clean.automatic=true \
> --hoodie-conf hoodie.cleaner.commits.retained=1 \
> --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
> --hoodie-conf hoodie.clustering.inline=false \
> --hoodie-conf hoodie.clustering.inline.max.commits=1 \
> --hoodie-conf 
> hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy
>  \
> --hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=1000000 \
> --hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=250000000 \
> --hoodie-conf hoodie.clustering.plan.strategy.sort.columns=sid,id \
> --hoodie-conf hoodie.clustering.plan.strategy.target.file.max.bytes=268435456 
> \
> --hoodie-conf hoodie.clustering.preserve.commit.metadata=true \
> --hoodie-conf hoodie.datasource.hive_sync.database=datalake-hudi \
> --hoodie-conf hoodie.datasource.hive_sync.enable=false \
> --hoodie-conf hoodie.datasource.hive_sync.ignore_exceptions=true \
> --hoodie-conf hoodie.datasource.hive_sync.mode=hms \
> --hoodie-conf 
> hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.HiveStylePartitionValueExtractor
>  \
> --hoodie-conf hoodie.datasource.hive_sync.table=sessions_by_date \
> --hoodie-conf hoodie.datasource.hive_sync.use_jdbc=false \
> --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
> --hoodie-conf 
> hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
>  \
> --hoodie-conf hoodie.datasource.write.operation=insert \
> --hoodie-conf hoodie.datasource.write.partitionpath.field=date:TIMESTAMP \
> --hoodie-conf hoodie.datasource.write.precombine.field=timestamp \
> --hoodie-conf hoodie.datasource.write.recordkey.field=id,qid,aid \
> --hoodie-conf 
> hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy/MM/dd \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.input.timezone=GMT \
> --hoodie-conf 
> hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.timezone=GMT \
> --hoodie-conf 
> hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING \
> --hoodie-conf 
> hoodie.deltastreamer.source.dfs.root=s3://datalake-hudi/history/datastream/v1/sessions/2021/02
>  \
> --hoodie-conf 
> hoodie.deltastreamer.source.input.selector=org.apache.hudi.utilities.sources.helpers.DFSPathSelector
>  \
> --hoodie-conf "\"hoodie.deltastreamer.transformer.sql=SELECT id, qid, aid, 
> to_timestamp(timestamp) as timestamp, sid, 
> date_format(to_timestamp(timestamp), 'yyyy/MM/dd') AS date FROM <SRC> a \"" \
> --hoodie-conf hoodie.file.listing.parallelism=256 \
> --hoodie-conf hoodie.finalize.write.parallelism=256 \
> --hoodie-conf 
> hoodie.generate.consistent.timestamp.logical.for.key.generator=true \
> --hoodie-conf hoodie.insert.shuffle.parallelism=1000 \
> --hoodie-conf hoodie.metadata.enable=true \
> --hoodie-conf hoodie.metadata.metrics.enable=true \
> --hoodie-conf 
> hoodie.metrics.cloudwatch.metric.prefix=emr.datalake-service.prd.insert.sessions_by_date
>  \
> --hoodie-conf hoodie.metrics.on=true \
> --hoodie-conf hoodie.metrics.reporter.type=CLOUDWATCH \
> --hoodie-conf hoodie.parquet.block.size=268435456 \
> --hoodie-conf hoodie.parquet.compression.codec=snappy \
> --hoodie-conf hoodie.parquet.max.file.size=268435456 \
> --hoodie-conf hoodie.parquet.small.file.limit=250000000 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to