[ 
https://issues.apache.org/jira/browse/HUDI-3242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17538175#comment-17538175
 ] 

Charlie Briggs commented on HUDI-3242:
--------------------------------------

We've also been confused by this behaviour (Hudi 0.10.1, but applies to 0.11.0).

* The checkpointing mechanism, with "deltastreamer.checkpoint.reset_key" is not 
well described in the documentation
* We assume that to load a given input using Deltastreamer, we can run with 
`–checkpoint 0` (I think this is intuative, maybe it is only intuitive for me?)
* When setting `–checkpoint 0` and `deltastreamer.checkpoint.reset_key = 0`, 
the `–checkpoint` flag is actually ignored, in Hudi 0.10.1 at least there are 
no logs of this behaviour being chosen

I would expect that regardless of past commits, or epoch times, if I explicitly 
specific a checkpoint, it should be used by Deltastreamer.

For reference our use case is this:

* We have two Deltastreamer jobs, one which loads at an interval from S3, one 
which loads daily from a separate S3 bucket (both parquet inputs)
* We use Deltastreamer for both jobs as schema validation fails when enabling 
it and using Deltastreamer/Spark writer concurrently 
(https://issues.apache.org/jira/browse/HUDI-2755) We do not currently wish to 
run without schema validation as we've had issues breaking tables in the past 
when disabling this.
* We can't disable checkpointing on either of the two jobs or copy information 
from the previous commit (we previously used the support described here for 
this https://issues.apache.org/jira/browse/HUDI-2719)

As such, we instead try to write first to an ephemeral location, and then load 
from there using `–checkpoint 0` to force all data in the input to be loaded, 
regardless of write times.
_Fortunately this checkpointing behaviour described above is not currently 
necessary as we only allow one writer in parallel, so the ephemeral data is 
guaranteed to be newer than the previous checkpoint, but it is confusing._

> Checkpoint 0 is ignored -Partial parquet file discovery after the first commit
> ------------------------------------------------------------------------------
>
>                 Key: HUDI-3242
>                 URL: https://issues.apache.org/jira/browse/HUDI-3242
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: spark, writer-core
>    Affects Versions: 0.10.1
>         Environment: AWS
> EMR 6.4.0
> Spark 3.1.2
> Hudi - 0.10.1-rc
>            Reporter: Harsha Teja Kanna
>            Assignee: sivabalan narayanan
>            Priority: Minor
>              Labels: hudi-on-call, sev:critical, user-support-issues
>         Attachments: Screen Shot 2022-01-13 at 2.40.55 AM.png, Screen Shot 
> 2022-01-13 at 2.55.35 AM.png, Screen Shot 2022-01-20 at 1.36.48 PM.png
>
>   Original Estimate: 3h
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> Hi, I am testing release branch 0.10.1 as I needed few bug fixes from it.
> However, I see for a certain table. Only partial discovery of files happening 
> after the initial commit of the table.
> But if the second partition is given as input for the first commit, all the 
> files are getting discovered.
> First partition : 2021/01 has 744 files and all of them are discovered
> Second partition: 2021/02 has 762 files but only 72 are discovered.
> Checkpoint is set to 0. 
> No errors in the logs.
> {code:java}
> spark-submit \
> --master yarn \
> --deploy-mode cluster \
> --driver-cores 30 \
> --driver-memory 32g \
> --executor-cores 5 \
> --executor-memory 32g \
> --num-executors 120 \
> --jars 
> s3://bucket/apps/datalake/jars/unused-1.0.0.jar,s3://bucket/apps/datalake/jars/spark-avro_2.12-3.1.2.jar
>  \
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
> s3://bucket/apps/datalake/jars/hudi-0.10.0/hudi-utilities-bundle_2.12-0.10.0.jar
>  \
> --table-type COPY_ON_WRITE \
> --source-ordering-field timestamp \
> --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
> --target-base-path s3a://datalake-hudi/datastream/v1/sessions_by_date \
> --target-table sessions_by_date \
> --transformer-class 
> org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
> --op INSERT \
> --checkpoint 0 \
> --hoodie-conf hoodie.clean.automatic=true \
> --hoodie-conf hoodie.cleaner.commits.retained=1 \
> --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
> --hoodie-conf hoodie.clustering.inline=false \
> --hoodie-conf hoodie.clustering.inline.max.commits=1 \
> --hoodie-conf 
> hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy
>  \
> --hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=1000000 \
> --hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=250000000 \
> --hoodie-conf hoodie.clustering.plan.strategy.sort.columns=sid,id \
> --hoodie-conf hoodie.clustering.plan.strategy.target.file.max.bytes=268435456 
> \
> --hoodie-conf hoodie.clustering.preserve.commit.metadata=true \
> --hoodie-conf hoodie.datasource.hive_sync.database=datalake-hudi \
> --hoodie-conf hoodie.datasource.hive_sync.enable=false \
> --hoodie-conf hoodie.datasource.hive_sync.ignore_exceptions=true \
> --hoodie-conf hoodie.datasource.hive_sync.mode=hms \
> --hoodie-conf 
> hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.HiveStylePartitionValueExtractor
>  \
> --hoodie-conf hoodie.datasource.hive_sync.table=sessions_by_date \
> --hoodie-conf hoodie.datasource.hive_sync.use_jdbc=false \
> --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
> --hoodie-conf 
> hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
>  \
> --hoodie-conf hoodie.datasource.write.operation=insert \
> --hoodie-conf hoodie.datasource.write.partitionpath.field=date:TIMESTAMP \
> --hoodie-conf hoodie.datasource.write.precombine.field=timestamp \
> --hoodie-conf hoodie.datasource.write.recordkey.field=id,qid,aid \
> --hoodie-conf 
> hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy/MM/dd \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.input.timezone=GMT \
> --hoodie-conf 
> hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.timezone=GMT \
> --hoodie-conf 
> hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING \
> --hoodie-conf 
> hoodie.deltastreamer.source.dfs.root=s3://datalake-hudi/history/datastream/v1/sessions/2021/02
>  \
> --hoodie-conf 
> hoodie.deltastreamer.source.input.selector=org.apache.hudi.utilities.sources.helpers.DFSPathSelector
>  \
> --hoodie-conf "\"hoodie.deltastreamer.transformer.sql=SELECT id, qid, aid, 
> to_timestamp(timestamp) as timestamp, sid, 
> date_format(to_timestamp(timestamp), 'yyyy/MM/dd') AS date FROM <SRC> a \"" \
> --hoodie-conf hoodie.file.listing.parallelism=256 \
> --hoodie-conf hoodie.finalize.write.parallelism=256 \
> --hoodie-conf 
> hoodie.generate.consistent.timestamp.logical.for.key.generator=true \
> --hoodie-conf hoodie.insert.shuffle.parallelism=1000 \
> --hoodie-conf hoodie.metadata.enable=true \
> --hoodie-conf hoodie.metadata.metrics.enable=true \
> --hoodie-conf 
> hoodie.metrics.cloudwatch.metric.prefix=emr.datalake-service.prd.insert.sessions_by_date
>  \
> --hoodie-conf hoodie.metrics.on=true \
> --hoodie-conf hoodie.metrics.reporter.type=CLOUDWATCH \
> --hoodie-conf hoodie.parquet.block.size=268435456 \
> --hoodie-conf hoodie.parquet.compression.codec=snappy \
> --hoodie-conf hoodie.parquet.max.file.size=268435456 \
> --hoodie-conf hoodie.parquet.small.file.limit=250000000 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to