[ 
https://issues.apache.org/jira/browse/HUDI-3242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479631#comment-17479631
 ] 

Harsha Teja Kanna commented on HUDI-3242:
-----------------------------------------

Hi,

Initially I saw the behavior of not picking the files from the partition, but 
that dataset was differently produced. meaning source dataset is unloaded from 
Warehouse and files in different partitions do not have linear timestamps. So I 
thought that be might be cause


Now I am seeing this in all the tables, When passing checkpoint 0 to reprocess 
a partition, it is just skipping the partition. Only processing the current 
partition. So I found that checkpoint is ignored. I see it Deltastreamer logs 
and the job ends in 20 seconds

2) No clustering commits are pending, only one Deltastreamer is running and it 
successfully completed.. I can see the cluster commit on the timeline.

3) I can reproduce it consistently, not being able to backfill the tables 
currently.

4) Contents of replace commit

{
  "partitionToWriteStats" : {
    "date=2022/01/19" : [ {
      "fileId" : "c8b06d0b-1c8a-434e-b54a-15b6525b738a-0",
      "path" : 
"date=2022/01/19/c8b06d0b-1c8a-434e-b54a-15b6525b738a-0_0-78-996_20220120085344674.parquet",
      "prevCommit" : "null",
      "numWrites" : 297106,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 297106,
      "totalWriteBytes" : 12468914,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "date=2022/01/19",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 12468914,
      "minEventTime" : null,
      "maxEventTime" : null
    } ]
  },
  "compacted" : false,
  "extraMetadata" : {
    "schema" : 
"{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[REDACTED]"
  },
  "operationType" : "CLUSTER",
  "partitionToReplaceFileIds" : {
    "date=2022/01/19" : [ "4c5a04e9-9288-4000-9909-4c2640c5b779-0" ]
  },
  "totalRecordsDeleted" : 0,
  "totalLogRecordsCompacted" : 0,
  "totalLogFilesCompacted" : 0,
  "totalCompactedRecordsUpdated" : 0,
  "totalLogFilesSize" : 0,
  "totalScanTime" : 0,
  "totalCreateTime" : 9815,
  "totalUpsertTime" : 0,
  "minAndMaxEventTime" : {
    "Optional.empty" : {
      "val" : null,
      "present" : false
    }
  },
  "writePartitionPaths" : [ "date=2022/01/19" ],
  "fileIdAndRelativePaths" : {
    "c8b06d0b-1c8a-434e-b54a-15b6525b738a-0" : 
"date=2022/01/19/c8b06d0b-1c8a-434e-b54a-15b6525b738a-0_0-78-996_20220120085344674.parquet"
  }
}

> Checkpoint 0 is ignored -Partial parquet file discovery after the first commit
> ------------------------------------------------------------------------------
>
>                 Key: HUDI-3242
>                 URL: https://issues.apache.org/jira/browse/HUDI-3242
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: spark, writer-core
>    Affects Versions: 0.10.1
>         Environment: AWS
> EMR 6.4.0
> Spark 3.1.2
> Hudi - 0.10.1-rc
>            Reporter: Harsha Teja Kanna
>            Assignee: sivabalan narayanan
>            Priority: Critical
>              Labels: hudi-on-call, sev:critical, user-support-issues
>         Attachments: Screen Shot 2022-01-13 at 2.40.55 AM.png, Screen Shot 
> 2022-01-13 at 2.55.35 AM.png
>
>   Original Estimate: 3h
>  Remaining Estimate: 3h
>
> Hi, I am testing release branch 0.10.1 as I needed few bug fixes from it.
> However, I see for a certain table. Only partial discovery of files happening 
> after the initial commit of the table.
> But if the second partition is given as input for the first commit, all the 
> files are getting discovered.
> First partition : 2021/01 has 744 files and all of them are discovered
> Second partition: 2021/02 has 762 files but only 72 are discovered.
> Checkpoint is set to 0. 
> No errors in the logs.
> {code:java}
> spark-submit \
> --master yarn \
> --deploy-mode cluster \
> --driver-cores 30 \
> --driver-memory 32g \
> --executor-cores 5 \
> --executor-memory 32g \
> --num-executors 120 \
> --jars 
> s3://bucket/apps/datalake/jars/unused-1.0.0.jar,s3://bucket/apps/datalake/jars/spark-avro_2.12-3.1.2.jar
>  \
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
> s3://bucket/apps/datalake/jars/hudi-0.10.0/hudi-utilities-bundle_2.12-0.10.0.jar
>  \
> --table-type COPY_ON_WRITE \
> --source-ordering-field timestamp \
> --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
> --target-base-path s3a://datalake-hudi/datastream/v1/sessions_by_date \
> --target-table sessions_by_date \
> --transformer-class 
> org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
> --op INSERT \
> --checkpoint 0 \
> --hoodie-conf hoodie.clean.automatic=true \
> --hoodie-conf hoodie.cleaner.commits.retained=1 \
> --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
> --hoodie-conf hoodie.clustering.inline=false \
> --hoodie-conf hoodie.clustering.inline.max.commits=1 \
> --hoodie-conf 
> hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy
>  \
> --hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=1000000 \
> --hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=250000000 \
> --hoodie-conf hoodie.clustering.plan.strategy.sort.columns=sid,id \
> --hoodie-conf hoodie.clustering.plan.strategy.target.file.max.bytes=268435456 
> \
> --hoodie-conf hoodie.clustering.preserve.commit.metadata=true \
> --hoodie-conf hoodie.datasource.hive_sync.database=datalake-hudi \
> --hoodie-conf hoodie.datasource.hive_sync.enable=false \
> --hoodie-conf hoodie.datasource.hive_sync.ignore_exceptions=true \
> --hoodie-conf hoodie.datasource.hive_sync.mode=hms \
> --hoodie-conf 
> hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.HiveStylePartitionValueExtractor
>  \
> --hoodie-conf hoodie.datasource.hive_sync.table=sessions_by_date \
> --hoodie-conf hoodie.datasource.hive_sync.use_jdbc=false \
> --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
> --hoodie-conf 
> hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
>  \
> --hoodie-conf hoodie.datasource.write.operation=insert \
> --hoodie-conf hoodie.datasource.write.partitionpath.field=date:TIMESTAMP \
> --hoodie-conf hoodie.datasource.write.precombine.field=timestamp \
> --hoodie-conf hoodie.datasource.write.recordkey.field=id,qid,aid \
> --hoodie-conf 
> hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy/MM/dd \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.input.timezone=GMT \
> --hoodie-conf 
> hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd \
> --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.timezone=GMT \
> --hoodie-conf 
> hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING \
> --hoodie-conf 
> hoodie.deltastreamer.source.dfs.root=s3://datalake-hudi/history/datastream/v1/sessions/2021/02
>  \
> --hoodie-conf 
> hoodie.deltastreamer.source.input.selector=org.apache.hudi.utilities.sources.helpers.DFSPathSelector
>  \
> --hoodie-conf "\"hoodie.deltastreamer.transformer.sql=SELECT id, qid, aid, 
> to_timestamp(timestamp) as timestamp, sid, 
> date_format(to_timestamp(timestamp), 'yyyy/MM/dd') AS date FROM <SRC> a \"" \
> --hoodie-conf hoodie.file.listing.parallelism=256 \
> --hoodie-conf hoodie.finalize.write.parallelism=256 \
> --hoodie-conf 
> hoodie.generate.consistent.timestamp.logical.for.key.generator=true \
> --hoodie-conf hoodie.insert.shuffle.parallelism=1000 \
> --hoodie-conf hoodie.metadata.enable=true \
> --hoodie-conf hoodie.metadata.metrics.enable=true \
> --hoodie-conf 
> hoodie.metrics.cloudwatch.metric.prefix=emr.datalake-service.prd.insert.sessions_by_date
>  \
> --hoodie-conf hoodie.metrics.on=true \
> --hoodie-conf hoodie.metrics.reporter.type=CLOUDWATCH \
> --hoodie-conf hoodie.parquet.block.size=268435456 \
> --hoodie-conf hoodie.parquet.compression.codec=snappy \
> --hoodie-conf hoodie.parquet.max.file.size=268435456 \
> --hoodie-conf hoodie.parquet.small.file.limit=250000000 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to