[GitHub] [hudi] shivabansal1046 opened a new issue #2656: HUDI insert operation is working same as upsert
shivabansal1046 opened a new issue #2656: URL: https://github.com/apache/hudi/issues/2656 I am trying to append my spark dataframe using insert (option(OPERATION_OPT_KEY, "INSERT")) But its still behaving like upsert i.e. updating the matching rows This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on pull request #2378: [HUDI-1491] Support partition pruning for MOR snapshot query
garyli1019 commented on pull request #2378: URL: https://github.com/apache/hudi/pull/2378#issuecomment-795014068 > @garyli1019 : whats the status of the PR in general. when do you think we can get this landed. @nsivabalan my concern is `CatalystScan` is not a stable API, not sure if this could be a problem. Need a Spark expert to take a look here. @umehrot2 @zhedoubushishi @pengzhiwei2018 any of you guys able to take a look here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan opened a new pull request #2655: [WIP] [HUDI-1615] Fixing null schema for delete operation for spark datasource
nsivabalan opened a new pull request #2655: URL: https://github.com/apache/hudi/pull/2655 ## *Tips* - *Thank you very much for contributing to Apache Hudi.* - *Please review https://hudi.apache.org/contributing.html before opening a pull request.* ## What is the purpose of the pull request *(For example: This pull request adds quick-start document.)* ## Brief change log *(for example:)* - *Modify AnnotationLocation checkstyle rule in checkstyle.xml* ## Verify this pull request *(Please pick either of the following options)* This pull request is a trivial rework / code cleanup without any test coverage. *(or)* This pull request is already covered by existing tests, such as *(please describe tests)*. (or) This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end.* - *Added HoodieClientWriteTest to verify the change.* - *Manually verified the change by running a job locally.* ## Committer checklist - [ ] Has a corresponding JIRA in PR title & commit - [ ] Commit message is descriptive of the change - [ ] CI is green - [ ] Necessary doc changes done or have another open PR - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] t0il3ts0ap edited a comment on issue #2515: [HUDI-1615] [SUPPORT] ERROR HoodieTimelineArchiveLog: Failed to archive commits
t0il3ts0ap edited a comment on issue #2515: URL: https://github.com/apache/hudi/issues/2515#issuecomment-795002646 @nsivabalan ``` public class DebeziumTransformer implements Transformer { public Dataset apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset dataset, TypedProperties typedProperties) { Dataset transformedDataset = dataset .withColumn("__deleted", dataset.col("__deleted").cast(DataTypes.BooleanType)) .withColumnRenamed("__deleted", "_hoodie_is_deleted") .drop("__op", "__source_ts_ms"); log.info("TRANSFORMER SCHEMA STARTS"); transformedDataset.printSchema(); transformedDataset.show(); log.info("TRANSFORMER SCHEMA ENDS"); return transformedDataset; } } ``` There was a issue with column addition as well https://github.com/apache/hudi/issues/2589, so I raised this. Here you will find related logs if need be. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] t0il3ts0ap commented on issue #2515: [HUDI-1615] [SUPPORT] ERROR HoodieTimelineArchiveLog: Failed to archive commits
t0il3ts0ap commented on issue #2515: URL: https://github.com/apache/hudi/issues/2515#issuecomment-795002646 @nsivabalan ``` public class DebeziumTransformer implements Transformer { public Dataset apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset dataset, TypedProperties typedProperties) { Dataset transformedDataset = dataset .withColumn("__deleted", dataset.col("__deleted").cast(DataTypes.BooleanType)) .withColumnRenamed("__deleted", "_hoodie_is_deleted") .drop("__op", "__source_ts_ms"); log.info("TRANSFORMER SCHEMA STARTS"); transformedDataset.printSchema(); transformedDataset.show(); log.info("TRANSFORMER SCHEMA ENDS"); return transformedDataset; } } ``` There was a issue with column addition as well https://github.com/apache/hudi/issues/2589, so I raised this. Here you will find related logs as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
nsivabalan commented on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-794997822 @vinothchandar : Vlad and stackfun certified the fix works. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2653: [WIP] [HUDI 1615] Fixing null schema in bulk_insert row writer path
nsivabalan commented on a change in pull request #2653: URL: https://github.com/apache/hudi/pull/2653#discussion_r591131225 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ## @@ -286,30 +287,38 @@ private[hudi] object HoodieSparkSqlWriter { basePath: Path, path: Option[String], instantTime: String): (Boolean, common.util.Option[String]) = { -val structName = s"${tblName}_record" -val nameSpace = s"hoodie.${tblName}" -val writeConfig = DataSourceUtils.createHoodieConfig(null, path.get, tblName, mapAsJavaMap(parameters)) +val sparkContext = sqlContext.sparkContext +// register classes & schemas +val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName) +sparkContext.getConf.registerKryoClasses( Review comment: Not sure if we really need to register the schema as we are not leveraging it for bulk insert w/ row writer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2654: [HUDI-1615] [WIP] diff to fix null target schema in delastreamer
nsivabalan commented on a change in pull request #2654: URL: https://github.com/apache/hudi/pull/2654#discussion_r591130448 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java ## @@ -352,6 +356,10 @@ public void refreshTimeline() throws IOException { Option> transformed = dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props)); checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); + if(transformed.isPresent()){ Review comment: @vinothchandar : do you think we could do something like this. store the df schema and use it later when we initialize the writeclient config, where in we use this schema when target schema is null ? This is just WIP diff. haven't fully completed yet. but should give you an idea. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan opened a new pull request #2654: [HUDI-1615] [WIP] diff to fix null target schema in delastreamer
nsivabalan opened a new pull request #2654: URL: https://github.com/apache/hudi/pull/2654 ## *Tips* - *Thank you very much for contributing to Apache Hudi.* - *Please review https://hudi.apache.org/contributing.html before opening a pull request.* ## What is the purpose of the pull request *(For example: This pull request adds quick-start document.)* ## Brief change log *(for example:)* - *Modify AnnotationLocation checkstyle rule in checkstyle.xml* ## Verify this pull request *(Please pick either of the following options)* This pull request is a trivial rework / code cleanup without any test coverage. *(or)* This pull request is already covered by existing tests, such as *(please describe tests)*. (or) This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end.* - *Added HoodieClientWriteTest to verify the change.* - *Manually verified the change by running a job locally.* ## Committer checklist - [ ] Has a corresponding JIRA in PR title & commit - [ ] Commit message is descriptive of the change - [ ] CI is green - [ ] Necessary doc changes done or have another open PR - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…
codecov-io edited a comment on pull request #2651: URL: https://github.com/apache/hudi/pull/2651#issuecomment-794945140 # [Codecov](https://codecov.io/gh/apache/hudi/pull/2651?src=pr&el=h1) Report > Merging [#2651](https://codecov.io/gh/apache/hudi/pull/2651?src=pr&el=desc) (b52ce56) into [master](https://codecov.io/gh/apache/hudi/commit/d3a451611cc01a65cc0f305c72c04d64af0e7d38?el=desc) (d3a4516) will **increase** coverage by `0.14%`. > The diff coverage is `73.10%`. [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2651/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2651?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master#2651 +/- ## + Coverage 51.53% 51.68% +0.14% - Complexity 3491 3533 +42 Files 462 464 +2 Lines 2188122054 +173 Branches 2327 2364 +37 + Hits 1127711398 +121 - Misses 9624 9642 +18 - Partials980 1014 +34 ``` | Flag | Coverage Δ | Complexity Δ | | |---|---|---|---| | hudicli | `37.01% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudicommon | `51.45% <0.00%> (-0.04%)` | `0.00 <0.00> (ø)` | | | hudiflink | `50.34% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudihadoopmr | `33.44% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudisparkdatasource | `70.08% <76.31%> (+0.24%)` | `0.00 <47.00> (ø)` | | | hudisync | `49.62% <ø> (ø)` | `0.00 <ø> (ø)` | | | huditimelineservice | `64.36% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudiutilities | `69.53% <ø> (+0.05%)` | `0.00 <ø> (ø)` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2651?src=pr&el=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...rg/apache/hudi/common/table/HoodieTableConfig.java](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL0hvb2RpZVRhYmxlQ29uZmlnLmphdmE=) | `43.20% <0.00%> (-2.25%)` | `17.00 <0.00> (ø)` | | | [...pache/hudi/common/table/HoodieTableMetaClient.java](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL0hvb2RpZVRhYmxlTWV0YUNsaWVudC5qYXZh) | `66.66% <0.00%> (-1.65%)` | `43.00 <0.00> (ø)` | | | [...src/main/scala/org/apache/hudi/DefaultSource.scala](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0RlZmF1bHRTb3VyY2Uuc2NhbGE=) | `73.03% <62.79%> (-11.12%)` | `28.00 <18.00> (+11.00)` | :arrow_down: | | [...c/main/scala/org/apache/hudi/HoodieFileIndex.scala](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0hvb2RpZUZpbGVJbmRleC5zY2FsYQ==) | `73.48% <73.48%> (ø)` | `27.00 <27.00> (?)` | | | [.../org/apache/hudi/MergeOnReadSnapshotRelation.scala](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL01lcmdlT25SZWFkU25hcHNob3RSZWxhdGlvbi5zY2FsYQ==) | `89.47% <85.71%> (+0.34%)` | `17.00 <1.00> (ø)` | | | [...cala/org/apache/hudi/HoodieBootstrapRelation.scala](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0hvb2RpZUJvb3RzdHJhcFJlbGF0aW9uLnNjYWxh) | `89.13% <100.00%> (+1.63%)` | `18.00 <1.00> (+3.00)` | | | [...n/scala/org/apache/hudi/HoodieSparkSqlWriter.scala](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL0hvb2RpZVNwYXJrU3FsV3JpdGVyLnNjYWxh) | `53.42% <100.00%> (+2.05%)` | `0.00 <0.00> (ø)` | | | [...g/apache/hudi/MergeOnReadIncrementalRelation.scala](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9odWRpL01lcmdlT25SZWFkSW5jcmVtZW50YWxSZWxhdGlvbi5zY2FsYQ==) | `84.00% <100.00%> (+0.12%)` | `24.00 <0.00> (ø)` | | | [...ark/sql/execution/datasources/PartitionUtils.scala](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1zcGFyay1kYXRhc291cmNlL2h1ZGktc3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvZXhlY3V0aW9u
[GitHub] [hudi] codecov-io commented on pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…
codecov-io commented on pull request #2651: URL: https://github.com/apache/hudi/pull/2651#issuecomment-794945140 # [Codecov](https://codecov.io/gh/apache/hudi/pull/2651?src=pr&el=h1) Report > Merging [#2651](https://codecov.io/gh/apache/hudi/pull/2651?src=pr&el=desc) (b52ce56) into [master](https://codecov.io/gh/apache/hudi/commit/d3a451611cc01a65cc0f305c72c04d64af0e7d38?el=desc) (d3a4516) will **increase** coverage by `0.18%`. > The diff coverage is `0.00%`. [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2651/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2651?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master#2651 +/- ## + Coverage 51.53% 51.72% +0.18% + Complexity 3491 2527 -964 Files 462 323 -139 Lines 2188114188-7693 Branches 2327 1401 -926 - Hits 11277 7339-3938 + Misses 9624 6260-3364 + Partials980 589 -391 ``` | Flag | Coverage Δ | Complexity Δ | | |---|---|---|---| | hudicli | `37.01% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudicommon | `51.45% <0.00%> (-0.04%)` | `0.00 <0.00> (ø)` | | | hudiflink | `?` | `?` | | | hudihadoopmr | `?` | `?` | | | hudisparkdatasource | `?` | `?` | | | hudisync | `?` | `?` | | | huditimelineservice | `?` | `?` | | | hudiutilities | `69.53% <ø> (+0.05%)` | `0.00 <ø> (ø)` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2651?src=pr&el=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...rg/apache/hudi/common/table/HoodieTableConfig.java](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL0hvb2RpZVRhYmxlQ29uZmlnLmphdmE=) | `43.20% <0.00%> (-2.25%)` | `17.00 <0.00> (ø)` | | | [...pache/hudi/common/table/HoodieTableMetaClient.java](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL0hvb2RpZVRhYmxlTWV0YUNsaWVudC5qYXZh) | `66.66% <0.00%> (-1.65%)` | `43.00 <0.00> (ø)` | | | [...org/apache/hudi/hadoop/BootstrapBaseFileSplit.java](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL0Jvb3RzdHJhcEJhc2VGaWxlU3BsaXQuamF2YQ==) | | | | | [.../apache/hudi/timeline/service/TimelineService.java](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS10aW1lbGluZS1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL3RpbWVsaW5lL3NlcnZpY2UvVGltZWxpbmVTZXJ2aWNlLmphdmE=) | | | | | [.../hadoop/utils/HoodieRealtimeRecordReaderUtils.java](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL3V0aWxzL0hvb2RpZVJlYWx0aW1lUmVjb3JkUmVhZGVyVXRpbHMuamF2YQ==) | | | | | [...g/apache/hudi/hadoop/HoodieParquetInputFormat.java](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL0hvb2RpZVBhcnF1ZXRJbnB1dEZvcm1hdC5qYXZh) | | | | | [.../apache/hudi/operator/InstantGenerateOperator.java](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9JbnN0YW50R2VuZXJhdGVPcGVyYXRvci5qYXZh) | | | | | [.../apache/hudi/hive/MultiPartKeysValueExtractor.java](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvTXVsdGlQYXJ0S2V5c1ZhbHVlRXh0cmFjdG9yLmphdmE=) | | | | | [...hadoop/LocatedFileStatusWithBootstrapBaseFile.java](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL0xvY2F0ZWRGaWxlU3RhdHVzV2l0aEJvb3RzdHJhcEJhc2VGaWxlLmphdmE=) | | | | | [.../org/apache/hudi/hive/NonPartitionedExtractor.java](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvTm9uUGFydGl0aW9uZWRFeHRyYWN0b3IuamF2YQ==) | | | | | ... and [128 more](https://codecov.io/gh/apache/hudi/pull/2651/diff?src=pr&el=tree-more) | | This is an automated
[GitHub] [hudi] nsivabalan edited a comment on issue #2515: [HUDI-1615] [SUPPORT] ERROR HoodieTimelineArchiveLog: Failed to archive commits
nsivabalan edited a comment on issue #2515: URL: https://github.com/apache/hudi/issues/2515#issuecomment-794908330 @t0il3ts0ap : Can you please clarify something. Whats the schema of the dataframe of the output from transformer in deltastreamer you use in your delete use-case? I assume its the same schema as hudi dataset schema w/ the "_hoodie_is_deleted" field set to true. is my understanding right ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
codecov-io edited a comment on pull request #2640: URL: https://github.com/apache/hudi/pull/2640#issuecomment-791887408 # [Codecov](https://codecov.io/gh/apache/hudi/pull/2640?src=pr&el=h1) Report > Merging [#2640](https://codecov.io/gh/apache/hudi/pull/2640?src=pr&el=desc) (fb79363) into [master](https://codecov.io/gh/apache/hudi/commit/02073235c3bbe72db32ed282b3e318e7daa0800a?el=desc) (0207323) will **increase** coverage by `0.33%`. > The diff coverage is `74.92%`. [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2640/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2640?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master#2640 +/- ## + Coverage 51.53% 51.87% +0.33% - Complexity 3491 3557 +66 Files 462 465 +3 Lines 2188122164 +283 Branches 2327 2357 +30 + Hits 1127711498 +221 - Misses 9624 9667 +43 - Partials980 999 +19 ``` | Flag | Coverage Δ | Complexity Δ | | |---|---|---|---| | hudicli | `37.01% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudicommon | `51.49% <100.00%> (+<0.01%)` | `0.00 <1.00> (ø)` | | | hudiflink | `52.77% <74.84%> (+2.42%)` | `0.00 <70.00> (ø)` | | | hudihadoopmr | `33.44% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudisparkdatasource | `69.84% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudisync | `49.62% <ø> (ø)` | `0.00 <ø> (ø)` | | | huditimelineservice | `64.36% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudiutilities | `69.53% <ø> (+0.05%)` | `0.00 <ø> (ø)` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2640?src=pr&el=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...ain/java/org/apache/hudi/sink/HoodieTableSink.java](https://codecov.io/gh/apache/hudi/pull/2640/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9zaW5rL0hvb2RpZVRhYmxlU2luay5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | | | [.../org/apache/hudi/streamer/HoodieFlinkStreamer.java](https://codecov.io/gh/apache/hudi/pull/2640/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9zdHJlYW1lci9Ib29kaWVGbGlua1N0cmVhbWVyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | | | [...rg/apache/hudi/streamer/HoodieFlinkStreamerV2.java](https://codecov.io/gh/apache/hudi/pull/2640/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9zdHJlYW1lci9Ib29kaWVGbGlua1N0cmVhbWVyVjIuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | | | [...java/org/apache/hudi/source/HoodieTableSource.java](https://codecov.io/gh/apache/hudi/pull/2640/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9zb3VyY2UvSG9vZGllVGFibGVTb3VyY2UuamF2YQ==) | `66.89% <21.87%> (-8.77%)` | `27.00 <3.00> (-7.00)` | | | [...hudi/source/format/mor/MergeOnReadInputFormat.java](https://codecov.io/gh/apache/hudi/pull/2640/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9zb3VyY2UvZm9ybWF0L21vci9NZXJnZU9uUmVhZElucHV0Rm9ybWF0LmphdmE=) | `30.58% <50.00%> (+1.07%)` | `9.00 <0.00> (+2.00)` | | | [...rg/apache/hudi/source/format/mor/InstantRange.java](https://codecov.io/gh/apache/hudi/pull/2640/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9zb3VyY2UvZm9ybWF0L21vci9JbnN0YW50UmFuZ2UuamF2YQ==) | `72.22% <72.22%> (ø)` | `3.00 <3.00> (?)` | | | [...he/hudi/operator/StreamReadMonitoringFunction.java](https://codecov.io/gh/apache/hudi/pull/2640/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9TdHJlYW1SZWFkTW9uaXRvcmluZ0Z1bmN0aW9uLmphdmE=) | `76.59% <76.59%> (ø)` | `35.00 <35.00> (?)` | | | [...a/org/apache/hudi/source/format/FilePathUtils.java](https://codecov.io/gh/apache/hudi/pull/2640/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9zb3VyY2UvZm9ybWF0L0ZpbGVQYXRoVXRpbHMuamF2YQ==) | `67.12% <77.41%> (+2.77%)` | `42.00 <12.00> (+12.00)` | | | [...a/org/apache/hudi/operator/StreamReadOperator.java](https://codecov.io/gh/apache/hudi/pull/2640/diff?src=pr&el=tree#diff-aHVkaS1mbGluay9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaHVkaS9vcGVyYXRvci9TdHJlYW1SZWFkT3BlcmF0b3IuamF2YQ==) | `90.54% <90.54%> (ø)` | `15.00 <15.00> (?)` | | | [...in/java/org/apache/hudi/operator/FlinkOptions.java](https://codecov.io/gh/apache/hudi/pull/2640/diff?src=pr&el=
[GitHub] [hudi] codecov-io commented on pull request #2653: [WIP] [HUDI 1615] Fixing null schema in bulk_insert row writer path
codecov-io commented on pull request #2653: URL: https://github.com/apache/hudi/pull/2653#issuecomment-794938096 # [Codecov](https://codecov.io/gh/apache/hudi/pull/2653?src=pr&el=h1) Report > Merging [#2653](https://codecov.io/gh/apache/hudi/pull/2653?src=pr&el=desc) (86e7706) into [master](https://codecov.io/gh/apache/hudi/commit/26da4f546275e8ab6496537743efe73510cb723d?el=desc) (26da4f5) will **increase** coverage by `18.60%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2653/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2653?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master#2653 +/- ## = + Coverage 50.92% 69.53% +18.60% + Complexity 3168 364 -2804 = Files 433 53 -380 Lines 19812 1963-17849 Branches 2033 235 -1798 = - Hits 10090 1365 -8725 + Misses 8902 465 -8437 + Partials820 133 -687 ``` | Flag | Coverage Δ | Complexity Δ | | |---|---|---|---| | hudicli | `?` | `?` | | | hudiclient | `?` | `?` | | | hudicommon | `?` | `?` | | | hudiflink | `?` | `?` | | | hudihadoopmr | `?` | `?` | | | hudisparkdatasource | `?` | `?` | | | hudisync | `?` | `?` | | | huditimelineservice | `?` | `?` | | | hudiutilities | `69.53% <ø> (+0.01%)` | `0.00 <ø> (ø)` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2653?src=pr&el=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...hudi/utilities/sources/helpers/KafkaOffsetGen.java](https://codecov.io/gh/apache/hudi/pull/2653/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9LYWZrYU9mZnNldEdlbi5qYXZh) | `85.84% <0.00%> (-2.94%)` | `20.00% <0.00%> (+4.00%)` | :arrow_down: | | [...apache/hudi/utilities/deltastreamer/DeltaSync.java](https://codecov.io/gh/apache/hudi/pull/2653/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvRGVsdGFTeW5jLmphdmE=) | `70.34% <0.00%> (-0.52%)` | `53.00% <0.00%> (+2.00%)` | :arrow_down: | | [...i/utilities/deltastreamer/HoodieDeltaStreamer.java](https://codecov.io/gh/apache/hudi/pull/2653/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllRGVsdGFTdHJlYW1lci5qYXZh) | `68.72% <0.00%> (-0.26%)` | `18.00% <0.00%> (ø%)` | | | [...s/deltastreamer/HoodieMultiTableDeltaStreamer.java](https://codecov.io/gh/apache/hudi/pull/2653/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllTXVsdGlUYWJsZURlbHRhU3RyZWFtZXIuamF2YQ==) | `78.39% <0.00%> (ø)` | `18.00% <0.00%> (ø%)` | | | [...rg/apache/hudi/cli/commands/SavepointsCommand.java](https://codecov.io/gh/apache/hudi/pull/2653/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL2NvbW1hbmRzL1NhdmVwb2ludHNDb21tYW5kLmphdmE=) | | | | | [.../common/bloom/HoodieDynamicBoundedBloomFilter.java](https://codecov.io/gh/apache/hudi/pull/2653/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2Jsb29tL0hvb2RpZUR5bmFtaWNCb3VuZGVkQmxvb21GaWx0ZXIuamF2YQ==) | | | | | [...rg/apache/hudi/common/model/HoodieFileGroupId.java](https://codecov.io/gh/apache/hudi/pull/2653/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZUZpbGVHcm91cElkLmphdmE=) | | | | | [...apache/hudi/common/engine/HoodieEngineContext.java](https://codecov.io/gh/apache/hudi/pull/2653/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2VuZ2luZS9Ib29kaWVFbmdpbmVDb250ZXh0LmphdmE=) | | | | | [...a/org/apache/hudi/common/model/HoodieBaseFile.java](https://codecov.io/gh/apache/hudi/pull/2653/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL21vZGVsL0hvb2RpZUJhc2VGaWxlLmphdmE=) | | | | | [...n/java/org/apache/hudi/common/HoodieCleanStat.java](https://codecov.io/gh/apache/hudi/pull/2653/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL0hvb2RpZUNsZWFuU3RhdC5qYXZh) | | | | | ... and [372 more](https://codecov.io/gh/apache/hudi/pull/2653/diff?src=pr&el=tree-more) | |
[GitHub] [hudi] vinothchandar commented on issue #2614: Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token
vinothchandar commented on issue #2614: URL: https://github.com/apache/hudi/issues/2614#issuecomment-794926627 @root18039532923 #2622 is now landed. are you able to try out the master branch and confirm? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan opened a new pull request #2653: [WIP] [Hudi 1615] Fixing null schema in bulk_insert row writer path
nsivabalan opened a new pull request #2653: URL: https://github.com/apache/hudi/pull/2653 ## *Tips* - *Thank you very much for contributing to Apache Hudi.* - *Please review https://hudi.apache.org/contributing.html before opening a pull request.* ## What is the purpose of the pull request *(For example: This pull request adds quick-start document.)* ## Brief change log *(for example:)* - *Modify AnnotationLocation checkstyle rule in checkstyle.xml* ## Verify this pull request *(Please pick either of the following options)* This pull request is a trivial rework / code cleanup without any test coverage. *(or)* This pull request is already covered by existing tests, such as *(please describe tests)*. (or) This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end.* - *Added HoodieClientWriteTest to verify the change.* - *Manually verified the change by running a job locally.* ## Committer checklist - [ ] Has a corresponding JIRA in PR title & commit - [ ] Commit message is descriptive of the change - [ ] CI is green - [ ] Necessary doc changes done or have another open PR - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] vinothchandar commented on issue #2652: [SUPPORT] I have some questions for hudi clustering
vinothchandar commented on issue #2652: URL: https://github.com/apache/hudi/issues/2652#issuecomment-794910331 cc @satishkotha @lw309637554 do you want to take this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on issue #2515: [HUDI-1615] [SUPPORT] ERROR HoodieTimelineArchiveLog: Failed to archive commits
nsivabalan commented on issue #2515: URL: https://github.com/apache/hudi/issues/2515#issuecomment-794908330 @t0il3ts0ap : Can you clarify something. Whats the schema of the dataframe of the output from transformer in deltastreamer you use in your delete use-case? I assume its the same schema as dataset schema w/ the "_hoodie_is_deleted" field set to true. is my understanding right ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
danny0405 commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r591024300 ## File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java ## @@ -332,7 +331,7 @@ public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throw public static boolean needsScheduleCompaction(Configuration conf) { return conf.getString(FlinkOptions.TABLE_TYPE) .toUpperCase(Locale.ROOT) -.equals(HoodieTableType.MERGE_ON_READ.name()) +.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) Review comment: `HoodieTableType` is an enumeration, but in many cases we need a string constant, not a method call like `HoodieTableType.name()`, such as the Junit5 parameterized test. BTW, can we not block this PR because these two options are now introduced in this PR though. We can discuss it in another issue, i think, this is not a critical issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
danny0405 commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r591024300 ## File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java ## @@ -332,7 +331,7 @@ public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throw public static boolean needsScheduleCompaction(Configuration conf) { return conf.getString(FlinkOptions.TABLE_TYPE) .toUpperCase(Locale.ROOT) -.equals(HoodieTableType.MERGE_ON_READ.name()) +.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) Review comment: `HoodieTableType` is an enumeration, but in many cases we need a string constant, not a method call like `HoodieTableType.name()`, such as the Junit5 parameterized test. BTW, can we not block this PR because these two options are now introduced in this PR though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] shenbinglife opened a new issue #2652: [SUPPORT] I have some questions for hudi clustering
shenbinglife opened a new issue #2652: URL: https://github.com/apache/hudi/issues/2652 1. does the mapping of [ -> fileGroupId ] changed after clustering ? the record may wrote to another filegroup? 2. clusting sort the columns, does it change the physical path of the record to different location which not a partition path ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (HUDI-1672) packaging fails w/ scala 12 (due to flink)
[ https://issues.apache.org/jira/browse/HUDI-1672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan updated HUDI-1672: -- Status: In Progress (was: Open) > packaging fails w/ scala 12 (due to flink) > -- > > Key: HUDI-1672 > URL: https://issues.apache.org/jira/browse/HUDI-1672 > Project: Apache Hudi > Issue Type: Bug > Components: Common Core >Affects Versions: 0.8.0 >Reporter: sivabalan narayanan >Assignee: vinoyang >Priority: Major > Labels: sev:critical, user-support-issues > > mvn clean package -DskipTests -Dspark3 -Dscala-2.12 > . > . > [*INFO*] > /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java: > Some input files use or override a deprecated API. > [*INFO*] > /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java: > Recompile with -Xlint:deprecation for details. > [*INFO*] > /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java: > Some input files use unchecked or unsafe operations. > [*INFO*] > /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java: > Recompile with -Xlint:unchecked for details. > [*INFO*] - > [*ERROR*] COMPILATION ERROR : > [*INFO*] - > [*ERROR*] > /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:[334,32] > package HoodieTableType does not exist > [*INFO*] 1 error > [*INFO*] - > [*INFO*] > ** > [*INFO*] *Reactor Summary for Hudi 0.8.0-SNAPSHOT:* > [*INFO*] > [*INFO*] Hudi ... *SUCCESS* [ > 2.736 s] > [*INFO*] hudi-common *SUCCESS* [ > 38.206 s] > [*INFO*] hudi-timeline-service .. *SUCCESS* [ > 2.579 s] > [*INFO*] hudi-client *SUCCESS* [ > 0.083 s] > [*INFO*] hudi-client-common . *SUCCESS* [ > 10.578 s] > [*INFO*] hudi-hadoop-mr . *SUCCESS* [ > 6.155 s] > [*INFO*] hudi-spark-client .. *SUCCESS* [ > 18.130 s] > [*INFO*] hudi-sync-common ... *SUCCESS* [ > 0.549 s] > [*INFO*] hudi-hive-sync . *SUCCESS* [ > 3.376 s] > [*INFO*] hudi-spark-datasource .. *SUCCESS* [ > 0.077 s] > [*INFO*] hudi-spark-common .. *SUCCESS* [ > 10.523 s] > [*INFO*] hudi-spark2_2.12 ... *SUCCESS* [ > 9.411 s] > [*INFO*] hudi-spark3_2.12 ... *SUCCESS* [ > 12.286 s] > [*INFO*] hudi-spark_2.12 *SUCCESS* [ > 40.109 s] > [*INFO*] hudi-utilities_2.12 *SUCCESS* [ > 7.229 s] > [*INFO*] hudi-utilities-bundle_2.12 . *SUCCESS* [ > 36.710 s] > [*INFO*] hudi-cli ... *SUCCESS* [ > 13.804 s] > [*INFO*] hudi-java-client ... *SUCCESS* [ > 1.869 s] > [*INFO*] hudi-flink-client .. *SUCCESS* [ > 7.289 s] > [*INFO*] hudi-dla-sync .. *SUCCESS* [ > 1.627 s] > [*INFO*] hudi-sync .. *SUCCESS* [ > 0.058 s] > [*INFO*] hudi-hadoop-mr-bundle .. *SUCCESS* [ > 4.328 s] > [*INFO*] hudi-hive-sync-bundle .. *SUCCESS* [ > 1.307 s] > [*INFO*] hudi-spark-bundle_2.12 . *SUCCESS* [ > 8.829 s] > [*INFO*] hudi-presto-bundle . *SUCCESS* [ > 12.235 s] > [*INFO*] hudi-timeline-server-bundle *SUCCESS* [ > 4.582 s] > [*INFO*] hudi-hadoop-docker . *SUCCESS* [ > 0.543 s] > [*INFO*] hudi-hadoop-base-docker *SUCCESS* [ > 0.080 s] > [*INFO*] hudi-hadoop-namenode-docker *SUCCESS* [ > 0.071 s] > [*INFO*] hudi-hadoop-datanode-docker *SUCCESS* [ > 0.072 s] > [*INFO*] hudi-hadoop-history-docker
[jira] [Resolved] (HUDI-1672) packaging fails w/ scala 12 (due to flink)
[ https://issues.apache.org/jira/browse/HUDI-1672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sivabalan narayanan resolved HUDI-1672. --- Fix Version/s: 0.8.0 Resolution: Fixed > packaging fails w/ scala 12 (due to flink) > -- > > Key: HUDI-1672 > URL: https://issues.apache.org/jira/browse/HUDI-1672 > Project: Apache Hudi > Issue Type: Bug > Components: Common Core >Affects Versions: 0.8.0 >Reporter: sivabalan narayanan >Assignee: vinoyang >Priority: Major > Labels: sev:critical, user-support-issues > Fix For: 0.8.0 > > > mvn clean package -DskipTests -Dspark3 -Dscala-2.12 > . > . > [*INFO*] > /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java: > Some input files use or override a deprecated API. > [*INFO*] > /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java: > Recompile with -Xlint:deprecation for details. > [*INFO*] > /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java: > Some input files use unchecked or unsafe operations. > [*INFO*] > /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java: > Recompile with -Xlint:unchecked for details. > [*INFO*] - > [*ERROR*] COMPILATION ERROR : > [*INFO*] - > [*ERROR*] > /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:[334,32] > package HoodieTableType does not exist > [*INFO*] 1 error > [*INFO*] - > [*INFO*] > ** > [*INFO*] *Reactor Summary for Hudi 0.8.0-SNAPSHOT:* > [*INFO*] > [*INFO*] Hudi ... *SUCCESS* [ > 2.736 s] > [*INFO*] hudi-common *SUCCESS* [ > 38.206 s] > [*INFO*] hudi-timeline-service .. *SUCCESS* [ > 2.579 s] > [*INFO*] hudi-client *SUCCESS* [ > 0.083 s] > [*INFO*] hudi-client-common . *SUCCESS* [ > 10.578 s] > [*INFO*] hudi-hadoop-mr . *SUCCESS* [ > 6.155 s] > [*INFO*] hudi-spark-client .. *SUCCESS* [ > 18.130 s] > [*INFO*] hudi-sync-common ... *SUCCESS* [ > 0.549 s] > [*INFO*] hudi-hive-sync . *SUCCESS* [ > 3.376 s] > [*INFO*] hudi-spark-datasource .. *SUCCESS* [ > 0.077 s] > [*INFO*] hudi-spark-common .. *SUCCESS* [ > 10.523 s] > [*INFO*] hudi-spark2_2.12 ... *SUCCESS* [ > 9.411 s] > [*INFO*] hudi-spark3_2.12 ... *SUCCESS* [ > 12.286 s] > [*INFO*] hudi-spark_2.12 *SUCCESS* [ > 40.109 s] > [*INFO*] hudi-utilities_2.12 *SUCCESS* [ > 7.229 s] > [*INFO*] hudi-utilities-bundle_2.12 . *SUCCESS* [ > 36.710 s] > [*INFO*] hudi-cli ... *SUCCESS* [ > 13.804 s] > [*INFO*] hudi-java-client ... *SUCCESS* [ > 1.869 s] > [*INFO*] hudi-flink-client .. *SUCCESS* [ > 7.289 s] > [*INFO*] hudi-dla-sync .. *SUCCESS* [ > 1.627 s] > [*INFO*] hudi-sync .. *SUCCESS* [ > 0.058 s] > [*INFO*] hudi-hadoop-mr-bundle .. *SUCCESS* [ > 4.328 s] > [*INFO*] hudi-hive-sync-bundle .. *SUCCESS* [ > 1.307 s] > [*INFO*] hudi-spark-bundle_2.12 . *SUCCESS* [ > 8.829 s] > [*INFO*] hudi-presto-bundle . *SUCCESS* [ > 12.235 s] > [*INFO*] hudi-timeline-server-bundle *SUCCESS* [ > 4.582 s] > [*INFO*] hudi-hadoop-docker . *SUCCESS* [ > 0.543 s] > [*INFO*] hudi-hadoop-base-docker *SUCCESS* [ > 0.080 s] > [*INFO*] hudi-hadoop-namenode-docker *SUCCESS* [ > 0.071 s] > [*INFO*] hudi-hadoop-datanode-docker *SUCCESS* [ > 0
[GitHub] [hudi] nsivabalan commented on pull request #2378: [HUDI-1491] Support partition pruning for MOR snapshot query
nsivabalan commented on pull request #2378: URL: https://github.com/apache/hudi/pull/2378#issuecomment-794784065 @garyli1019 : whats the status of the PR in general. when do you think we can get this landed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
danny0405 commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r590944560 ## File path: hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.source.format.FilePathUtils; +import org.apache.hudi.source.format.mor.InstantRange; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS; + +/** + * This is the single (non-parallel) monitoring task which takes a {@link MergeOnReadInputSplit} + * , it is responsible for: + * + * + * Monitoring a user-provided hoodie table path. + * Deciding which files(or split) should be further read and processed. + * Creating the {@link MergeOnReadInputSplit splits} corresponding to those files. + * Assigning them to downstream tasks for further processing. + * + * + * The splits to be read are forwarded to the downstream {@link StreamReadOperator} + * which can have parallelism greater than one. + * + * IMPORTANT NOTE: Splits are forwarded downstream for reading in ascending instant commits time order, + * in each downstream task, the splits are also read in receiving sequence. We do not ensure split consuming sequence + * among the downstream tasks. + */ +public class StreamReadMonitoringFunction +extends RichSourceFunction implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(StreamReadMonitoringFunction.class); + + private static final long serialVersionUID = 1L; + + /** The path to monitor. */ + private final Path path; + + /** The interval between consecutive path scans. */ + private final long interval; + + private transient Object checkpointLock; + + private volatile boolean isRunning = true; + + private String issuedInstant; + + private transient ListState instantState; + + private final Configuration conf; + + private transient org.apache.hadoop.conf.Configuration hadoopConf; + + private final HoodieTableMetaClient metaClient; + + private final List partitionKeys; + + private final String defaultParName; + + private final long maxCompactionMemoryInBytes; + + public StreamReadMonitoringFunction( + Configuration conf, + Path path, + HoodieTableMetaClient metaClient, + List partitionKeys, + long maxCompactionMemoryInBytes) { +this.conf = conf; +this.path = path; +this.metaClient = metaC
[GitHub] [hudi] danny0405 commented on a change in pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
danny0405 commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r590940541 ## File path: hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.source.format.FilePathUtils; +import org.apache.hudi.source.format.mor.InstantRange; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS; + +/** + * This is the single (non-parallel) monitoring task which takes a {@link MergeOnReadInputSplit} + * , it is responsible for: + * + * + * Monitoring a user-provided hoodie table path. + * Deciding which files(or split) should be further read and processed. + * Creating the {@link MergeOnReadInputSplit splits} corresponding to those files. + * Assigning them to downstream tasks for further processing. + * + * + * The splits to be read are forwarded to the downstream {@link StreamReadOperator} + * which can have parallelism greater than one. + * + * IMPORTANT NOTE: Splits are forwarded downstream for reading in ascending instant commits time order, + * in each downstream task, the splits are also read in receiving sequence. We do not ensure split consuming sequence + * among the downstream tasks. + */ +public class StreamReadMonitoringFunction Review comment: `ContinuousFileMonitoringFunction` works based on `FileInputSplit` but this function works based on `MergeOnReadInputSplit`, `ContinuousFileMonitoringFunction` needs the input format has the ability to book-keep the consuming offset for each file which `MergeOnReadInputFormat` does not have. In a word, `StreamReadMonitoringFunction` is more simple code (has no complex state machine in the code) and also works well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] jackiehff commented on issue #2641: [SUPPORT]Build Hudi source with spark 3.1.1 compile error
jackiehff commented on issue #2641: URL: https://github.com/apache/hudi/issues/2641#issuecomment-794739819 @garyli1019 spark-avro version is the same as spark version 3.1.1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] stackfun commented on pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
stackfun commented on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-794736308 Thanks @nsivabalan - no crash on the same test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (HUDI-797) Improve performance of rewriting AVRO records in HoodieAvroUtils::rewriteRecord
[ https://issues.apache.org/jira/browse/HUDI-797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298461#comment-17298461 ] Prashant Wason commented on HUDI-797: - This change did not work and I do not have any alternative. > Improve performance of rewriting AVRO records in > HoodieAvroUtils::rewriteRecord > --- > > Key: HUDI-797 > URL: https://issues.apache.org/jira/browse/HUDI-797 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Prashant Wason >Assignee: Prashant Wason >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Data is ingested into a [HUDI |https://hudi.apache.org/]dataset as AVRO > encoded records. These records have a [schema > |https://avro.apache.org/docs/current/spec.html]which is determined by the > dataset user and provided to HUDI during the writing process (as part of > HUDIWriteConfig). The records are finally saved in [parquet > |https://parquet.apache.org/]files which include the schema (in parquet > format) in the footer of individual files. > > HUDI design requires addition of some metadata fields to all incoming records > to aid in book-keeping and indexing. To achieve this, the incoming schema > needs to be modified by adding the HUDI metadata fields and is called the > HUDI schema for the dataset. Each incoming record is then re-written to > translate it from the incoming schema into the HUDI schema. Re-writing the > incoming records to a new schema is reasonably fast as it looks up all fields > in the incoming record and adds them to a new record. But since this takes > place for each and every incoming record. > When ingestion large datasets (billions of records) or large number of > datasets, even small improvements in the CPU-bound conversion can translate > into notable improvement in compute efficiency. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [hudi] codecov-io edited a comment on pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
codecov-io edited a comment on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-776932935 # [Codecov](https://codecov.io/gh/apache/hudi/pull/2500?src=pr&el=h1) Report > Merging [#2500](https://codecov.io/gh/apache/hudi/pull/2500?src=pr&el=desc) (2da574f) into [master](https://codecov.io/gh/apache/hudi/commit/a2f85d90de73a58e924b4de757d09d6133b046a4?el=desc) (a2f85d9) will **decrease** coverage by `41.38%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2500/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2500?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master #2500 +/- ## - Coverage 50.90% 9.52% -41.39% + Complexity 3167 48 -3119 Files 433 53 -380 Lines 198061963-17843 Branches 2032 235 -1797 - Hits 10083 187 -9896 + Misses 89041763 -7141 + Partials819 13 -806 ``` | Flag | Coverage Δ | Complexity Δ | | |---|---|---|---| | hudicli | `?` | `?` | | | hudiclient | `?` | `?` | | | hudicommon | `?` | `?` | | | hudiflink | `?` | `?` | | | hudihadoopmr | `?` | `?` | | | hudisparkdatasource | `?` | `?` | | | hudisync | `?` | `?` | | | huditimelineservice | `?` | `?` | | | hudiutilities | `9.52% <ø> (-59.94%)` | `0.00 <ø> (ø)` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2500?src=pr&el=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2500/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | | | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2500/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | | | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2500/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | | | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2500/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | | | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2500/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | | | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2500/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-10.00%)` | | | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2500/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | | | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2500/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-6.00%)` | | | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2500/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-5.00%)` | | | [...lities/schema/SchemaProviderWithPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2500/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm
[GitHub] [hudi] nsivabalan commented on a change in pull request #2619: [HUDI-1650] Custom avro kafka deserializer.
nsivabalan commented on a change in pull request #2619: URL: https://github.com/apache/hudi/pull/2619#discussion_r590845061 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java ## @@ -42,18 +43,31 @@ */ public class AvroKafkaSource extends AvroSource { + private static final String KAFKA_AVRO_VALUE_DESERIALIZER = "hoodie.deltastreamer.source.kafka.value.deserializer.class"; Review comment: @n3nash thnx @vburenin : let's move this config to DataSourceOptions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
nsivabalan commented on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-794627589 @stackfun : sorry, missed to assign logfile to instance var. have fixed it. Can you try it out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[hudi] branch master updated: [HUDI-1651] Fix archival of requested replacecommit (#2622)
This is an automated email from the ASF dual-hosted git repository. nagarwal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new c4a6632 [HUDI-1651] Fix archival of requested replacecommit (#2622) c4a6632 is described below commit c4a66324cdd3e289e0bf18bdd150b95ee6e4c66c Author: satishkotha AuthorDate: Tue Mar 9 15:56:44 2021 -0800 [HUDI-1651] Fix archival of requested replacecommit (#2622) --- .../hudi/table/HoodieTimelineArchiveLog.java | 11 +-- .../hudi/io/TestHoodieTimelineArchiveLog.java | 8 - .../java/org/apache/hudi/table/TestCleaner.java| 37 ++ hudi-common/pom.xml| 2 +- .../src/main/avro/HoodieArchivedMetaEntry.avsc | 8 + .../hudi/common/testutils/FileCreateUtils.java | 7 ++-- .../hudi/common/testutils/HoodieTestTable.java | 5 +-- 7 files changed, 63 insertions(+), 15 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index cae5dbb..8efd3a2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -400,9 +400,14 @@ public class HoodieTimelineArchiveLog { break; } case HoodieTimeline.REPLACE_COMMIT_ACTION: { -HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata -.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class); - archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata)); +if (hoodieInstant.isRequested()) { + archivedMetaWrapper.setHoodieRequestedReplaceMetadata( + TimelineMetadataUtils.deserializeRequestedReplaceMetadata(commitTimeline.getInstantDetails(hoodieInstant).get())); +} else if (hoodieInstant.isCompleted()) { + HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata + .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class); + archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata)); +} archivedMetaWrapper.setActionType(ActionType.replacecommit.name()); break; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index 60f605c..042013d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -21,6 +21,7 @@ package org.apache.hudi.io; import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; @@ -495,11 +496,16 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { String fileId2 = "file-" + instantTime + "-2"; // create replace instant to mark fileId1 as deleted +HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() +.setOperationType(WriteOperationType.INSERT_OVERWRITE.toString()) +.setVersion(1) +.setExtraMetadata(Collections.emptyMap()) +.build(); HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); replaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1); replaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE); HoodieTestTable.of(metaClient) -.addReplaceCommit(instantTime, replaceMetadata) +.addReplaceCommit(instantTime, requestedReplaceMetadata, replaceMetadata) .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 1119f26..fd578bd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/
[GitHub] [hudi] n3nash merged pull request #2622: [HUDI-1651] Fix archival of requested replacecommit
n3nash merged pull request #2622: URL: https://github.com/apache/hudi/pull/2622 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] n3nash commented on a change in pull request #2619: [HUDI-1650] Custom avro kafka deserializer.
n3nash commented on a change in pull request #2619: URL: https://github.com/apache/hudi/pull/2619#discussion_r590815407 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java ## @@ -42,18 +43,31 @@ */ public class AvroKafkaSource extends AvroSource { + private static final String KAFKA_AVRO_VALUE_DESERIALIZER = "hoodie.deltastreamer.source.kafka.value.deserializer.class"; Review comment: @nsivabalan https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala#L127 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2619: [HUDI-1650] Custom avro kafka deserializer.
nsivabalan commented on a change in pull request #2619: URL: https://github.com/apache/hudi/pull/2619#discussion_r590423544 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java ## @@ -42,18 +43,31 @@ */ public class AvroKafkaSource extends AvroSource { + private static final String KAFKA_AVRO_VALUE_DESERIALIZER = "hoodie.deltastreamer.source.kafka.value.deserializer.class"; private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class); - private final KafkaOffsetGen offsetGen; - private final HoodieDeltaStreamerMetrics metrics; public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) { super(props, sparkContext, sparkSession, schemaProvider); -this.metrics = metrics; + props.put("key.deserializer", StringDeserializer.class); -props.put("value.deserializer", KafkaAvroDeserializer.class); +String deserializerClassName = props.getString(KAFKA_AVRO_VALUE_DESERIALIZER, ""); Review comment: may be we should add default value to this config similar to others configs in HoodieWriteClientConfig. I mean, where ever we move this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] stackfun edited a comment on pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
stackfun edited a comment on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-794515867 I tested this pull request and crashed - here's the stack trace Edit - I pulled the latest code and updated the stack trace. ``` 21/03/09 22:28:08 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 28.0 in stage 14.0 (TID 686, dataproc-w-22.us-east1-b.c.my-project.internal, executor 44): org.apache.hudi.exception.HoodieException: Exception when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:261) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:93) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:75) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230) at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:327) at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.(HoodieMergeOnReadRDD.scala:209) at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:199) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:74) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.hudi.common.table.log.HoodieLogFileReader.getFSDataInputStreamForGCSFs(HoodieLogFileReader.java:137) at org.apache.hudi.common.table.log.HoodieLogFileReader.getFSDataInputStream(HoodieLogFileReader.java:109) at org.apache.hudi.common.table.log.HoodieLogFileReader.(HoodieLogFileReader.java:79) at org.apache.hudi.common.table.log.HoodieLogFormatReader.(HoodieLogFormatReader.java:62) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:134) ... 42 more 21/03/09 22:28:15 ERROR org.apache.spark.scheduler.TaskSetManager: Task 17 in stage 14.0 failed 4 times; aborting job 21/03/09 22:28:15 ERROR org.apache.spark.sql.execution.datasources.FileFormatWriter: Aborting job f426c7a0-b351-4bac-bc87-170784
[GitHub] [hudi] vburenin commented on a change in pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
vburenin commented on a change in pull request #2500: URL: https://github.com/apache/hudi/pull/2500#discussion_r590769097 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java ## @@ -107,6 +96,60 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false); } + /** + * Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams. + * @param fsDataInputStream original instance of {@link FSDataInputStream}. + * @param fs instance of {@link FileSystem} in use. + * @param bufferSize buffer size to be used. + * @return the right {@link FSDataInputStream} as required. + */ + private FSDataInputStream getFSDataInputStream(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) { Review comment: I would rather rewrite it like this reducing cyclomatic complexity, but I am also fine with what is here originally: ```java private FSDataInputStream getFSDataInputStream(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) { if (FSUtils.isGCSFileSystem(fs)) { return new SchemeAwareFSDataInputStream( getFSDataInputStreamForGCSFs(fsDataInputStream, fs, bufferSize), true); } if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); } return fsDataInputStream; } private FSDataInputStream getFSDataInputStreamForGCSFs(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) { if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); } if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream && ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) { FSInputStream inputStream = (FSInputStream)((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream(); return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream(inputStream, bufferSize))); } return fsDataInputSt ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] stackfun edited a comment on pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
stackfun edited a comment on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-794515867 I tested this pull request and crashed - here's the stack trace ``` 21/03/09 21:21:38 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 15.0 (TID 6741, dataproc-w-19.us-east1-b.c.my-project.internal, executor 37): org.apache.hudi.exception.HoodieException: Exception when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:250) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:93) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:75) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230) at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:331) at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.(HoodieMergeOnReadRDD.scala:213) at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:203) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.hudi.common.table.log.HoodieLogFileReader.getFSDataInputStream(HoodieLogFileReader.java:113) at org.apache.hudi.common.table.log.HoodieLogFileReader.(HoodieLogFileReader.java:79) at org.apache.hudi.common.table.log.HoodieLogFormatReader.(HoodieLogFormatReader.java:62) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:131) ... 42 more 21/03/09 21:21:41 ERROR org.apache.spark.scheduler.TaskSetManager: Task 2 in stage 15.0 failed 4 times; aborting job 21/03/09 21:21:41 ERROR org.apache.spark.sql.execution.datasources.FileFormatWriter: Aborting job f2d52d91-29da-4a49-a648-e49596e808c4. org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 15.0 failed 4 times, most recent failure: Lost task 2.3 in stage 15.0 (TID 6983, dataproc-w-9
[GitHub] [hudi] stackfun edited a comment on pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
stackfun edited a comment on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-794515867 I tested this pull request and crashed - here's the stack trace ``` 21/03/09 21:21:38 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 15.0 (TID 6741, dataproc-w-19.us-east1-b.c.my-project.internal, executor 37): org.apache.hudi.exception.HoodieException: Exception when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:250) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:93) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:75) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230) at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:331) at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.(HoodieMergeOnReadRDD.scala:213) at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:203) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.hudi.common.table.log.HoodieLogFileReader.getFSDataInputStream(HoodieLogFileReader.java:113) at org.apache.hudi.common.table.log.HoodieLogFileReader.(HoodieLogFileReader.java:79) at org.apache.hudi.common.table.log.HoodieLogFormatReader.(HoodieLogFormatReader.java:62) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:131) ... 42 more 21/03/09 21:21:41 ERROR org.apache.spark.scheduler.TaskSetManager: Task 2 in stage 15.0 failed 4 times; aborting job 21/03/09 21:21:41 ERROR org.apache.spark.sql.execution.datasources.FileFormatWriter: Aborting job f2d52d91-29da-4a49-a648-e49596e808c4. org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 15.0 failed 4 times, most recent failure: Lost task 2.3 in stage 15.0 (TID 6983, dataproc-yo-
[GitHub] [hudi] stackfun commented on pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
stackfun commented on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-794515867 I tested this pull request and crashed - here's the stack trace `21/03/09 21:21:38 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 15.0 (TID 6741, dataproc-w-19.us-east1-b.c.my-project.internal, executor 37): org.apache.hudi.exception.HoodieException: Exception when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:250) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:93) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.(HoodieMergedLogRecordScanner.java:75) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230) at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:331) at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.(HoodieMergeOnReadRDD.scala:213) at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:203) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.hudi.common.table.log.HoodieLogFileReader.getFSDataInputStream(HoodieLogFileReader.java:113) at org.apache.hudi.common.table.log.HoodieLogFileReader.(HoodieLogFileReader.java:79) at org.apache.hudi.common.table.log.HoodieLogFormatReader.(HoodieLogFormatReader.java:62) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:131) ... 42 more 21/03/09 21:21:41 ERROR org.apache.spark.scheduler.TaskSetManager: Task 2 in stage 15.0 failed 4 times; aborting job 21/03/09 21:21:41 ERROR org.apache.spark.sql.execution.datasources.FileFormatWriter: Aborting job f2d52d91-29da-4a49-a648-e49596e808c4. org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 15.0 failed 4 times, most recent failure: Lost task 2.3 in stage 15.0 (TID 6983, dataproc-yo-w-9.us-east1-
[GitHub] [hudi] satishkotha commented on a change in pull request #2622: [HUDI-1651] Fix archival of requested replacecommit
satishkotha commented on a change in pull request #2622: URL: https://github.com/apache/hudi/pull/2622#discussion_r590748375 ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java ## @@ -495,11 +496,16 @@ private void createReplaceMetadata(String instantTime) throws Exception { String fileId2 = "file-" + instantTime + "-2"; // create replace instant to mark fileId1 as deleted +HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() +.setOperationType(WriteOperationType.INSERT_OVERWRITE.toString()) +.setVersion(1) Review comment: @n3nash This is not related to timeline layout version. This is tracking version of HoodieRequestedReplaceMetadata (in case we add new fields to replace metadata avro structure, we can increase version). In the test, we were actually already configured to use default timeline layout version 1. But test was only creating commit file and was not creating requested files. I modified to add requested files as it happens with timeline layout version 1. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] n3nash commented on a change in pull request #2622: [HUDI-1651] Fix archival of requested replacecommit
n3nash commented on a change in pull request #2622: URL: https://github.com/apache/hudi/pull/2622#discussion_r590734308 ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java ## @@ -495,11 +496,16 @@ private void createReplaceMetadata(String instantTime) throws Exception { String fileId2 = "file-" + instantTime + "-2"; // create replace instant to mark fileId1 as deleted +HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() +.setOperationType(WriteOperationType.INSERT_OVERWRITE.toString()) +.setVersion(1) Review comment: Should this be the CURRENT_VERSION from timeline layout ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[hudi] branch master updated (d3a4516 -> d8af24d)
This is an automated email from the ASF dual-hosted git repository. nagarwal pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git. from d3a4516 [MINOR] HoodieClientTestHarness close resources in AfterAll phase (#2646) add d8af24d [HUDI-1635] Improvements to Hudi Test Suite (#2628) No new revisions were added by this update. Summary of changes: hudi-integ-test/README.md | 18 ++- .../hudi/integ/testsuite/HoodieTestSuiteJob.java | 61 - .../integ/testsuite/configuration/DeltaConfig.java | 22 +++- .../apache/hudi/integ/testsuite/dag/DagUtils.java | 144 - .../integ/testsuite/dag/nodes/RollbackNode.java| 26 ++-- .../testsuite/dag/scheduler/DagScheduler.java | 3 +- .../dag/scheduler/SaferSchemaDagScheduler.java | 53 .../integ/testsuite/generator/DeltaGenerator.java | 2 +- .../FlexibleSchemaRecordGenerationIterator.java| 8 +- .../GenericRecordFullPayloadGenerator.java | 15 ++- .../testsuite/dag/HiveSyncDagGeneratorMOR.java | 4 +- .../hudi/integ/testsuite/dag/TestDagUtils.java | 34 + .../TestGenericRecordPayloadGenerator.java | 8 +- 13 files changed, 360 insertions(+), 38 deletions(-) create mode 100644 hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/SaferSchemaDagScheduler.java
[GitHub] [hudi] n3nash merged pull request #2628: [HUDI-1635] Improvements to Hudi Test Suite
n3nash merged pull request #2628: URL: https://github.com/apache/hudi/pull/2628 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] satishkotha commented on issue #2614: Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token
satishkotha commented on issue #2614: URL: https://github.com/apache/hudi/issues/2614#issuecomment-794434981 got it. i'm asking other folks to review and merge the patch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] satishkotha edited a comment on pull request #2622: [HUDI-1651] Fix archival of requested replacecommit
satishkotha edited a comment on pull request #2622: URL: https://github.com/apache/hudi/pull/2622#issuecomment-794433500 > @satishkotha just add some minor comment @lw309637554 I dont follow this. Did you review and add a comment? I dont see one? @n3nash since this is assigned to you, can you please take a look this is blocking support issue This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] satishkotha commented on pull request #2622: [HUDI-1651] Fix archival of requested replacecommit
satishkotha commented on pull request #2622: URL: https://github.com/apache/hudi/pull/2622#issuecomment-794433500 > @satishkotha just add some minor comment I dont follow this. Did you review and add a comment? I dont see one? @n3nash since this is assigned to you, can you please take a look this is blocking support issue This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
nsivabalan commented on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-794426256 @vburenin : yeah, but we have not seen this reported by anyone else in the community so far and so we would like to think it happens only for GCS FS for now. don't want to over-optimize w/ an assumption. Also, I have done some minor refactoring to move code to a separate method and kept GCS handling to a separate method as well so that general users don't need to wrap their head around the special handling of GCS. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] modi95 commented on pull request #2628: [HUDI-1635] Improvements to Hudi Test Suite
modi95 commented on pull request #2628: URL: https://github.com/apache/hudi/pull/2628#issuecomment-794358663 Thanks for making those changes. Looks good to me :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on pull request #2494: [HUDI-1552] Improve performance of key lookups from base file in Metadata Table.
prashantwason commented on pull request #2494: URL: https://github.com/apache/hudi/pull/2494#issuecomment-794269612 @vinothchandar and I discussed simplifying this PR. The following changes are to be implemented: 1. Remove the "reuse" configuration as it does not make sense for performance reasons. - When timeline server is used, reuse should be on - When timeline server is not used, each executor has its own instance of the Metadata Reader and reuse is implicit. 2. Simplify the above code to use the instance variables 3. Locking is not required because of the usage pattern in #1. Locking will still be required in HFileReader because KeyScanner is not thread safe. I am working on updating this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] guanziyue commented on issue #2648: [SUPPORT] a NPE error when reading MOR table in spark datasource
guanziyue commented on issue #2648: URL: https://github.com/apache/hudi/issues/2648#issuecomment-794236129 Hi Happy to see your reply. I'd like to share more information about that. After I posted this, I discovered more with help from my colleague. To reproduce this, we need an index which returns true when canIndexLogFiles() is called, such as HbaseIndex. At such time, DeltaCommitActionExecutor will try to append insert records to a log file rather than create a parquet base file as code shows. [https://github.com/apache/hudi/blob/release-0.6.0/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java#L94](url) This is how a fileGroup without parquet base file produced. However, with my limited knowledge about data source, it seems that dataSource assumes every fileGroup has a parquet base file and all log files are appended to the base file. I guess this may be the root of error. I plan to try if making canIndexLogFiles() return false can avoid this problem temporarily while the other way I can com up with now is to generate a parquet file when inserting records. Could you please correct me if I made some mistake? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] vinothchandar edited a comment on issue #2515: [HUDI-1615] [SUPPORT] ERROR HoodieTimelineArchiveLog: Failed to archive commits
vinothchandar edited a comment on issue #2515: URL: https://github.com/apache/hudi/issues/2515#issuecomment-794204243 yes, if you are using spark datasource just `hoodie.datasource.write.row.writer.enable=false` may be sufficient. It will slow down bulk_inserts. Upsert/insert/delete wont be impacted. This config was missed, I think there was a JIRA to update it. Will check it out or add it myself. thanks for flagging again! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] vinothchandar commented on issue #2515: [HUDI-1615] [SUPPORT] ERROR HoodieTimelineArchiveLog: Failed to archive commits
vinothchandar commented on issue #2515: URL: https://github.com/apache/hudi/issues/2515#issuecomment-794204243 yes, if you are using spark datasource just `hoodie.datasource.write.row.writer.enable=false` may be sufficient. It will slow down bulk_inserts. Upsert/insert/delete wont be impacted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] bvaradar commented on issue #2648: [SUPPORT] a NPE error when reading MOR table in spark datasource
bvaradar commented on issue #2648: URL: https://github.com/apache/hudi/issues/2648#issuecomment-794176036 @umehrot2 : Can you take a look at this Spark DataSource related issue This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] bvaradar commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table
bvaradar commented on issue #2637: URL: https://github.com/apache/hudi/issues/2637#issuecomment-794174161 @n3nash : Can you please take a look at this request when you get a chance ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on pull request #2520: [HUDI-1446] Support skip bootstrapIndex's init in abstract fs view init
nsivabalan commented on pull request #2520: URL: https://github.com/apache/hudi/pull/2520#issuecomment-794052784 @Xoln : Few minor comments. once addressed, we can land this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2520: [HUDI-1446] Support skip bootstrapIndex's init in abstract fs view init
nsivabalan commented on a change in pull request #2520: URL: https://github.com/apache/hudi/pull/2520#discussion_r590462996 ## File path: hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/NoOpBootstrapIndex.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bootstrap.index; + +import org.apache.hudi.common.model.BootstrapFileMapping; +import org.apache.hudi.common.table.HoodieTableMetaClient; + +import java.util.List; + +/** + * No Op Bootstrap Index , which is a emtpy implement and not do anything. Review comment: minor. typo for "empty" ## File path: hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/NoOpBootstrapIndex.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bootstrap.index; + +import org.apache.hudi.common.model.BootstrapFileMapping; +import org.apache.hudi.common.table.HoodieTableMetaClient; + +import java.util.List; + +/** + * No Op Bootstrap Index , which is a emtpy implement and not do anything. + */ +public class NoOpBootstrapIndex extends BootstrapIndex { + + public NoOpBootstrapIndex(HoodieTableMetaClient metaClient) { +super(metaClient); + } + + @Override + public IndexReader createReader() { +return null; Review comment: should we consider throwing exception for all methods which should never be invoked. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] vburenin commented on a change in pull request #2619: [HUDI-1650] Custom avro kafka deserializer.
vburenin commented on a change in pull request #2619: URL: https://github.com/apache/hudi/pull/2619#discussion_r590472165 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.deser; + +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import org.apache.avro.Schema; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.kafka.common.errors.SerializationException; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +/** + * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization. + */ +public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer { + private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class"; + protected Schema sourceSchema; + + public KafkaAvroSchemaDeserializer() {} + + @Override + public void configure(Map configs, boolean isKey) { +super.configure(configs, isKey); +try { + TypedProperties props = getConvertToTypedProperties(configs); + String className = props.getString(SCHEMA_PROVIDER_CLASS_PROP); + SchemaProvider schemaProvider = (SchemaProvider) ReflectionUtils.loadClass(className, props); Review comment: yes, and it fails if I pass "null" for jsc, so here is a custom one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan closed pull request #2380: [Hudi 73] Adding support for vanilla AvroKafkaSource
nsivabalan closed pull request #2380: URL: https://github.com/apache/hudi/pull/2380 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] yanghua commented on a change in pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
yanghua commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r590458549 ## File path: hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.core.fs.Path; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Common test utils. + */ +public class TestUtils { + + public static String getLatestCommit(String basePath) { +final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() +.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); +return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant().get().getTimestamp(); + } + + public static String getFirstCommit(String basePath) { +final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() +.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); +return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant().get().getTimestamp(); + } + + public static String getSplitPartitionPath(MergeOnReadInputSplit split) { +assertTrue(split.getLogPaths().isPresent()); +final String logPath = split.getLogPaths().get().get(0); +String[] paths = logPath.split(Path.SEPARATOR); Review comment: OK, keep it for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] yanghua commented on a change in pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
yanghua commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r590457864 ## File path: hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.operator.StreamReadMonitoringFunction; +import org.apache.hudi.operator.StreamReadOperator; +import org.apache.hudi.operator.utils.TestConfigurations; +import org.apache.hudi.operator.utils.TestData; +import org.apache.hudi.source.format.FilePathUtils; +import org.apache.hudi.source.format.mor.MergeOnReadInputFormat; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.source.format.mor.MergeOnReadTableState; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.TestUtils; + +import org.apache.avro.Schema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; +import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor; +import org.apache.flink.streaming.util.CollectingSourceContext; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for {@link StreamReadOperator}. + */ +public class TestStreamReadOperator { + private Configuration conf; + + @TempDir + File tempFile; + + @BeforeEach + public void before() throws Exception { +final String basePath = tempFile.getAbsolutePath(); +conf = TestConfigurations.getDefaultConf(basePath); +conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + +StreamerUtil.initTableIfNotExists(conf); + } + + @Test + void testWriteRecords() throws Exception { +TestData.writeData(TestData.DATA_SET_ONE, conf); +try (OneInputStreamOperatorTestHarness harness = createReader()) { + harness.setup(); + harness.open(); + + SteppingMailboxProcessor processor = createLocalMailbox(harness); + StreamReadMonitoringFunction func = getMonitorFunc(); + + List splits = generateSplits(func); + assertThat("Should have 4 splits", splits.size(), is(4)); + for (MergeOnReadInputSplit split : splits) { +// Process this element to enqueue to mail-box. +harness.processElement(split, -1); + +// Run the mail-box once to read all records from the given split. +assertThat("Should process 1 split", processor.runMailboxStep()); + } + // Assert the output has expected elements. + TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_ONE); + + TestData.writeData(TestData.DATA_SET_TWO, conf); + final List splits2 = generateSplits(func); + assertThat("Should have 4 splits", splits2.size(), is(4)); + for (MergeOnReadInputSplit split : splits2) { +// Process this element to enqueue to mail-box. +ha
[GitHub] [hudi] nsivabalan edited a comment on pull request #2625: [1568] Fixing spark3 bundles
nsivabalan edited a comment on pull request #2625: URL: https://github.com/apache/hudi/pull/2625#issuecomment-789746642 CC @vinothchandar @garyli1019 @bvaradar @n3nash This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on pull request #2619: [HUDI-1650] Custom avro kafka deserializer.
nsivabalan commented on pull request #2619: URL: https://github.com/apache/hudi/pull/2619#issuecomment-794025650 @vburenin : I have added a commit w/ some test set up for AvroKafkaSource. Please feel free to take it up and enhance the tests as per necessity. Ensure you pull in latest commit before you start working on the feedback. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] yanghua commented on a change in pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
yanghua commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r590455667 ## File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java ## @@ -332,7 +331,7 @@ public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throw public static boolean needsScheduleCompaction(Configuration conf) { return conf.getString(FlinkOptions.TABLE_TYPE) .toUpperCase(Locale.ROOT) -.equals(HoodieTableType.MERGE_ON_READ.name()) +.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) Review comment: IMO, it's different. Spark referenced the `HoodieTableType.COPY_ON_WRITE.name`, but you used a string literal. What I mean about the word `define`, is to use a literal directly. In the future, if we change the value of `HoodieTableType `, in `DataSourceOptions ` we do not need to search the string literal, and we can control all the change points. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #2619: [HUDI-1650] Custom avro kafka deserializer.
nsivabalan commented on a change in pull request #2619: URL: https://github.com/apache/hudi/pull/2619#discussion_r590421462 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.deser; + +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import org.apache.avro.Schema; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.kafka.common.errors.SerializationException; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +/** + * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization. + */ +public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer { + private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class"; + protected Schema sourceSchema; + + public KafkaAvroSchemaDeserializer() {} + + @Override + public void configure(Map configs, boolean isKey) { +super.configure(configs, isKey); +try { + TypedProperties props = getConvertToTypedProperties(configs); + String className = props.getString(SCHEMA_PROVIDER_CLASS_PROP); + SchemaProvider schemaProvider = (SchemaProvider) ReflectionUtils.loadClass(className, props); Review comment: may I know if this will work for any schemaProvider? or are we making any assumptions? Bcoz, as of now, guess we instantiate schemaProvider for DeltaStreamer in UtilHelpers.createSchemaProvider(className, props, jsc). ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.deser; + +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import org.apache.avro.Schema; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.kafka.common.errors.SerializationException; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +/** + * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization. + */ +public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer { + private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class"; Review comment: May be we should try to re-use HoodieDeltaStreamer.Config.schemaProviderClassName. ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java ## @@ -42,18 +43,31 @@ */ public class AvroKafkaSource extends AvroSource { + private static final String KAFKA_AVRO_VALUE_DESERIALIZER = "hoodie.deltastreamer.source.kafka.value.deserializer.class"; private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class); - private final KafkaOffsetGen offsetGen; - private final HoodieDeltaStreamerMetrics metrics
[GitHub] [hudi] garyli1019 commented on a change in pull request #2378: [HUDI-1491] Support partition pruning for MOR snapshot query
garyli1019 commented on a change in pull request #2378: URL: https://github.com/apache/hudi/pull/2378#discussion_r590443628 ## File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala ## @@ -504,6 +506,42 @@ class TestMORDataSource extends HoodieClientTestBase { hudiSnapshotDF2.show(1) } + @Test + def testPrunePartitions() { +// First Operation: +// Producing parquet files to three hive style partitions like /partition=20150316/. +// SNAPSHOT view on MOR table with parquet files only. +dataGen.setPartitionPaths(Array("20150316","20150317","20160315")); +val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList +val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) +inputDF1.write.format("org.apache.hudi") Review comment: Hi @yui2010 , thanks for promoting this detailed explanation. Will look into this in the next few days. Regarding Datasource V2, yes, the community is planning to rewrite the data source API in V2, for better Spark 3 support. If you are interested, contributing is always welcome! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] codecov-io edited a comment on pull request #2619: [HUDI-1650] Custom avro kafka deserializer.
codecov-io edited a comment on pull request #2619: URL: https://github.com/apache/hudi/pull/2619#issuecomment-789253784 # [Codecov](https://codecov.io/gh/apache/hudi/pull/2619?src=pr&el=h1) Report > Merging [#2619](https://codecov.io/gh/apache/hudi/pull/2619?src=pr&el=desc) (de50b68) into [master](https://codecov.io/gh/apache/hudi/commit/9437e0ddef9821f728a4edf24ab90506532b6d0d?el=desc) (9437e0d) will **decrease** coverage by `5.42%`. > The diff coverage is `0.00%`. [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2619/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2619?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master#2619 +/- ## - Coverage 51.54% 46.11% -5.43% + Complexity 3491 3177 -314 Files 462 463 +1 Lines 2187921909 +30 Branches 2326 2329 +3 - Hits 1127710103-1174 - Misses 962310947+1324 + Partials979 859 -120 ``` | Flag | Coverage Δ | Complexity Δ | | |---|---|---|---| | hudicli | `37.01% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudicommon | `51.51% <ø> (+0.02%)` | `0.00 <ø> (ø)` | | | hudiflink | `50.34% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudihadoopmr | `33.44% <ø> (-0.04%)` | `0.00 <ø> (ø)` | | | hudisparkdatasource | `69.84% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudisync | `49.62% <ø> (ø)` | `0.00 <ø> (ø)` | | | huditimelineservice | `64.36% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudiutilities | `9.39% <0.00%> (-60.10%)` | `0.00 <0.00> (ø)` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2619?src=pr&el=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...i/utilities/deser/KafkaAvroSchemaDeserializer.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2Rlc2VyL0thZmthQXZyb1NjaGVtYURlc2VyaWFsaXplci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | | | [...ities/schema/NullTargetSchemaRegistryProvider.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9OdWxsVGFyZ2V0U2NoZW1hUmVnaXN0cnlQcm92aWRlci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | | | [.../hudi/utilities/schema/SchemaRegistryProvider.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFSZWdpc3RyeVByb3ZpZGVyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | | | [...apache/hudi/utilities/sources/AvroKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0thZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | | | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | | | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | | | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | | | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | | | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | | | [...rg/apache/hud
[GitHub] [hudi] codecov-io edited a comment on pull request #2619: [HUDI-1650] Custom avro kafka deserializer.
codecov-io edited a comment on pull request #2619: URL: https://github.com/apache/hudi/pull/2619#issuecomment-789253784 # [Codecov](https://codecov.io/gh/apache/hudi/pull/2619?src=pr&el=h1) Report > Merging [#2619](https://codecov.io/gh/apache/hudi/pull/2619?src=pr&el=desc) (de50b68) into [master](https://codecov.io/gh/apache/hudi/commit/9437e0ddef9821f728a4edf24ab90506532b6d0d?el=desc) (9437e0d) will **decrease** coverage by `6.99%`. > The diff coverage is `0.00%`. [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2619/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2619?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master#2619 +/- ## - Coverage 51.54% 44.54% -7.00% + Complexity 3491 2595 -896 Files 462 372 -90 Lines 2187917078-4801 Branches 2326 1743 -583 - Hits 11277 7608-3669 + Misses 9623 8848 -775 + Partials979 622 -357 ``` | Flag | Coverage Δ | Complexity Δ | | |---|---|---|---| | hudicli | `37.01% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudicommon | `51.51% <ø> (+0.02%)` | `0.00 <ø> (ø)` | | | hudiflink | `50.34% <ø> (ø)` | `0.00 <ø> (ø)` | | | hudihadoopmr | `?` | `?` | | | hudisparkdatasource | `?` | `?` | | | hudisync | `?` | `?` | | | huditimelineservice | `?` | `?` | | | hudiutilities | `9.39% <0.00%> (-60.10%)` | `0.00 <0.00> (ø)` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2619?src=pr&el=tree) | Coverage Δ | Complexity Δ | | |---|---|---|---| | [...i/utilities/deser/KafkaAvroSchemaDeserializer.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2Rlc2VyL0thZmthQXZyb1NjaGVtYURlc2VyaWFsaXplci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | | | [...ities/schema/NullTargetSchemaRegistryProvider.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9OdWxsVGFyZ2V0U2NoZW1hUmVnaXN0cnlQcm92aWRlci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | | | [.../hudi/utilities/schema/SchemaRegistryProvider.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFSZWdpc3RyeVByb3ZpZGVyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | | | [...apache/hudi/utilities/sources/AvroKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0thZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | | | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | | | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | | | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | | | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | | | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | | | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2619/diff?src=pr&el=tre
[GitHub] [hudi] vburenin commented on pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
vburenin commented on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-793935990 @nsivabalan Well, I am fine with that little change for us, but it makes it kinda questionable for other file systems if they run into the same case, so if I were you I would not add isGCSFileSystem check as it is only for GCS and if there is something similar in the future we will hit it again. Other than that LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] pengzhiwei2018 commented on pull request #2283: [HUDI-1415] Read Hoodie Table As Spark DataSource Table
pengzhiwei2018 commented on pull request #2283: URL: https://github.com/apache/hudi/pull/2283#issuecomment-793875706 This Pr is currently blocked by the issue in #2651 , which support No-Stars query for hoodie. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan edited a comment on pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
nsivabalan edited a comment on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-793874571 I made one minor change to your proposal @vburenin . with the else condition for wrapped over wrapped, added isGCSFileSystem check as well. Don't want to change the flow for other FileSystems. Have updated the PR. please check it out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on pull request #2500: [HUDI-1496] Fixing detection of GCS FileSystem
nsivabalan commented on pull request #2500: URL: https://github.com/apache/hudi/pull/2500#issuecomment-793874571 I made one minor change to your proposal @vburenin . with the else condition for wrapped over wrapped, added isGCSFileSystem check as well. Don't want to change the flow for other FileSystems. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (HUDI-1591) Improve Hoodie Table Query Performance And Ease Of Use For Spark
[ https://issues.apache.org/jira/browse/HUDI-1591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated HUDI-1591: - Labels: pull-request-available (was: ) > Improve Hoodie Table Query Performance And Ease Of Use For Spark > > > Key: HUDI-1591 > URL: https://issues.apache.org/jira/browse/HUDI-1591 > Project: Apache Hudi > Issue Type: Improvement > Components: Spark Integration >Reporter: pengzhiwei >Assignee: pengzhiwei >Priority: Major > Labels: pull-request-available > Fix For: 0.8.0 > > > We have found same problems on query hoodie table on spark: > 1、Users must specify "*" to tell the partition level to spark for the query. > 2、Cannot support partition prune for COW table. > This issue wants to achieve the following goals: > 1、Support No Stars query for hoodie table. > 2、Support partition prune for COW table. > Refer to the documentation for more details about this: [Optimization For > Hudi COW > Query|https://docs.google.com/document/d/1qG014M3VZg3lMswsZv7cYB9Tb0vz8yXgqvlI_Jlnnsc/edit#heading=h.k6ro6dhgwh8y] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [hudi] pengzhiwei2018 opened a new pull request #2651: [HUDI-1591] Improve Hoodie Table Query Performance And Ease Of Use Fo…
pengzhiwei2018 opened a new pull request #2651: URL: https://github.com/apache/hudi/pull/2651 …r Spark ## What is the purpose of the pull request Support partition prune and None-Stars query for hoodie on spark engine. The detail of this problem is at [RFC-26](https://cwiki.apache.org/confluence/display/HUDI/RFC-26+Optimization+For+Hudi+Table+Query) ## Brief change log - Add a `HoodieFileIndex` for hoodie to list files. - Store the partition columns to the `hoodie.properties` - Refactor some code in DefaultSource to use the `HoodieFileIndex` ## Verify this pull request - Add `TestHoodieFileIndex` to test `HoodieFileIndex` - Add some test case in `TestCOWDataSource` and `TestMORDataSource` to test the no stars query. ## Committer checklist - [ ] Has a corresponding JIRA in PR title & commit - [ ] Commit message is descriptive of the change - [ ] CI is green - [ ] Necessary doc changes done or have another open PR - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (HUDI-1591) Improve Hoodie Table Query Performance And Ease Of Use For Spark
[ https://issues.apache.org/jira/browse/HUDI-1591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] pengzhiwei updated HUDI-1591: - Description: We have found same problems on query hoodie table on spark: 1、Users must specify "*" to tell the partition level to spark for the query. 2、Cannot support partition prune for COW table. This issue wants to achieve the following goals: 1、Support No Stars query for hoodie table. 2、Support partition prune for COW table. Refer to the documentation for more details about this: [Optimization For Hudi COW Query|https://docs.google.com/document/d/1qG014M3VZg3lMswsZv7cYB9Tb0vz8yXgqvlI_Jlnnsc/edit#heading=h.k6ro6dhgwh8y] was: We have found same problems on query hoodie table on spark: 1、Users must specify "*" to tell the partition level to spark for the query. 2、Cannot support partition prune for COW table. 3、Performance issue in the *HoodieROTablePathFilter* for COW table. This issue wants to achieve the following goals: 1、Support No Stars query for hoodie table. 2、Support partition prune for COW table. 3、Solve the performance issue with *HoodieROTablePathFilter.* Refer to the documentation for more details about this: [Optimization For Hudi COW Query|https://docs.google.com/document/d/1qG014M3VZg3lMswsZv7cYB9Tb0vz8yXgqvlI_Jlnnsc/edit#heading=h.k6ro6dhgwh8y] > Improve Hoodie Table Query Performance And Ease Of Use For Spark > > > Key: HUDI-1591 > URL: https://issues.apache.org/jira/browse/HUDI-1591 > Project: Apache Hudi > Issue Type: Improvement > Components: Spark Integration >Reporter: pengzhiwei >Assignee: pengzhiwei >Priority: Major > Fix For: 0.8.0 > > > We have found same problems on query hoodie table on spark: > 1、Users must specify "*" to tell the partition level to spark for the query. > 2、Cannot support partition prune for COW table. > This issue wants to achieve the following goals: > 1、Support No Stars query for hoodie table. > 2、Support partition prune for COW table. > Refer to the documentation for more details about this: [Optimization For > Hudi COW > Query|https://docs.google.com/document/d/1qG014M3VZg3lMswsZv7cYB9Tb0vz8yXgqvlI_Jlnnsc/edit#heading=h.k6ro6dhgwh8y] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [hudi] yanghua commented on a change in pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
yanghua commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r590077931 ## File path: hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java ## @@ -112,6 +118,25 @@ private FlinkOptions() { + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x" + " use UTC timezone, by default true"); + public static final ConfigOption READ_AS_STREAMING = ConfigOptions + .key("read.streaming.enabled") + .booleanType() + .defaultValue(false)// default read as batch + .withDescription("Whether to read as streaming source, default false"); + + public static final ConfigOption STREAMING_CHECK_INTERVAL = ConfigOptions + .key("read.streaming.check-interval") + .intType() + .defaultValue(60)// default 1 minute + .withDescription("Check interval for streaming read of SECOND, default 1 minute"); + + public static final ConfigOption STREAMING_START_COMMIT = ConfigOptions Review comment: ditto ## File path: hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.operator; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.source.format.FilePathUtils; +import org.apache.hudi.source.format.mor.InstantRange; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS; + +/** + * This is the single (non-parallel) monitoring task which takes a {@link MergeOnReadInputSplit} + * , it is responsible for: + * + * + * Monitoring a user-provided hoodie table path. + * Deciding which files(or split) should be further read and processed. + * Creating the {@link MergeOnReadInputSplit splits} corresponding to those files. + * Assigning them to downstream tasks for further processing. + * + * + * The splits to be read are forwarded to the downstream {@link StreamReadOperator} + * which can have parallelism greater than one. + * + * IMPORTANT NOTE: Splits are forwarded downstream for reading in ascending instant commits time order, + * in each downstream task, the splits are also read in receiving sequence. We do not ensure split consuming sequence + * among the downstream tasks. + */ +public class StreamReadMonitoringFunction Review comment: This is very like the `ContinuousFileMonitoringFunction` in flink project. And it seems `ContinuousFileMonitoringFunction` sounds more reasonable. Bec
[GitHub] [hudi] liujinhui1994 commented on pull request #1929: [HUDI-1160] Support update partial fields for CoW table
liujinhui1994 commented on pull request #1929: URL: https://github.com/apache/hudi/pull/1929#issuecomment-793798733 @nsivabalan I’m sorry for the late reply. Recently I cannot access git due to network reasons. I will deal with this PR as suggested in the comments. I hope you will have time to review later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on issue #2641: [SUPPORT]Build Hudi source with spark 3.1.1 compile error
garyli1019 commented on issue #2641: URL: https://github.com/apache/hudi/issues/2641#issuecomment-793783530 @jackiehff Hi, did you change the `spark-avro` version as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
danny0405 commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r590150162 ## File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java ## @@ -332,7 +331,7 @@ public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throw public static boolean needsScheduleCompaction(Configuration conf) { return conf.getString(FlinkOptions.TABLE_TYPE) .toUpperCase(Locale.ROOT) -.equals(HoodieTableType.MERGE_ON_READ.name()) +.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) Review comment: Spark did have a definition, see `DataSourceOptions.COW_TABLE_TYPE_OPT_VAL` and `DataSourceOptions.MOR_TABLE_TYPE_OPT_VAL`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a change in pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
danny0405 commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r590142759 ## File path: hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java ## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.operator.StreamReadMonitoringFunction; +import org.apache.hudi.operator.utils.TestConfigurations; +import org.apache.hudi.operator.utils.TestData; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.TestUtils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for {@link StreamReadMonitoringFunction}. + */ +public class TestStreamReadMonitoringFunction { + private static final long WAIT_TIME_MILLIS = 5 * 1000L; + + private Configuration conf; + + @TempDir + File tempFile; + + @BeforeEach + public void before() throws Exception { +final String basePath = tempFile.getAbsolutePath(); +conf = TestConfigurations.getDefaultConf(basePath); +conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ); +conf.setInteger(FlinkOptions.STREAMING_CHECK_INTERVAL, 2); // check every 2 seconds + +StreamerUtil.initTableIfNotExists(conf); + } + + @Test + public void testConsumeFromLatestCommit() throws Exception { +TestData.writeData(TestData.DATA_SET_ONE, conf); +StreamReadMonitoringFunction function = getMonitorFunc(); +try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { + harness.setup(); + harness.open(); + + CountDownLatch latch = new CountDownLatch(4); + TestSourceContext sourceContext = new TestSourceContext(latch); + + runAsync(sourceContext, function); + + assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); + assertThat("Should produce the expected splits", + sourceContext.getPartitionPaths(), is("par1,par2,par3,par4")); + assertTrue(sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()), + "No instants should have range limit"); + + Thread.sleep(1000L); + + // reset the source context + latch = new CountDownLatch(4); + sourceContext.reset(latch); + + // write another instant and validate + TestData.writeData(TestData.DATA_SET_TWO, conf); + + assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); + assertThat("Should produce the expected splits", + sourceContext.getPartitionPaths(), is("par1,par2,par3,par4")); + assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), + "All the instants should have range limit"); + + // Stop the stream task. + function.close(); +} + } + + @Test + public void testConsumeFromSpecifiedCommit() throws Exception { +// write 2 commits first, use the second commit time as the specified start instant, +// all the splits should come from the second commit. +
[GitHub] [hudi] danny0405 commented on a change in pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
danny0405 commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r590141827 ## File path: hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java ## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.operator.StreamReadMonitoringFunction; +import org.apache.hudi.operator.StreamReadOperator; +import org.apache.hudi.operator.utils.TestConfigurations; +import org.apache.hudi.operator.utils.TestData; +import org.apache.hudi.source.format.FilePathUtils; +import org.apache.hudi.source.format.mor.MergeOnReadInputFormat; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.source.format.mor.MergeOnReadTableState; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.TestUtils; + +import org.apache.avro.Schema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; +import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor; +import org.apache.flink.streaming.util.CollectingSourceContext; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for {@link StreamReadOperator}. + */ +public class TestStreamReadOperator { + private Configuration conf; + + @TempDir + File tempFile; + + @BeforeEach + public void before() throws Exception { +final String basePath = tempFile.getAbsolutePath(); +conf = TestConfigurations.getDefaultConf(basePath); +conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + +StreamerUtil.initTableIfNotExists(conf); + } + + @Test + void testWriteRecords() throws Exception { +TestData.writeData(TestData.DATA_SET_ONE, conf); +try (OneInputStreamOperatorTestHarness harness = createReader()) { + harness.setup(); + harness.open(); + + SteppingMailboxProcessor processor = createLocalMailbox(harness); + StreamReadMonitoringFunction func = getMonitorFunc(); + + List splits = generateSplits(func); + assertThat("Should have 4 splits", splits.size(), is(4)); + for (MergeOnReadInputSplit split : splits) { +// Process this element to enqueue to mail-box. +harness.processElement(split, -1); + +// Run the mail-box once to read all records from the given split. +assertThat("Should process 1 split", processor.runMailboxStep()); + } + // Assert the output has expected elements. + TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_ONE); + + TestData.writeData(TestData.DATA_SET_TWO, conf); + final List splits2 = generateSplits(func); + assertThat("Should have 4 splits", splits2.size(), is(4)); + for (MergeOnReadInputSplit split : splits2) { +// Process this element to enqueue to mail-box. +
[GitHub] [hudi] danny0405 commented on a change in pull request #2640: [HUDI-1663] Streaming read for Flink MOR table
danny0405 commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r590140920 ## File path: hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.core.fs.Path; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Common test utils. + */ +public class TestUtils { + + public static String getLatestCommit(String basePath) { +final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() +.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); +return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant().get().getTimestamp(); + } + + public static String getFirstCommit(String basePath) { +final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() +.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); +return metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant().get().getTimestamp(); + } + + public static String getSplitPartitionPath(MergeOnReadInputSplit split) { +assertTrue(split.getLogPaths().isPresent()); +final String logPath = split.getLogPaths().get().get(0); +String[] paths = logPath.split(Path.SEPARATOR); Review comment: I still think it is not necessary, we build the `HoodieTableMetaClient` only with the configured table path, which is only suitable for tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (HUDI-1676) Support SQL with spark3
[ https://issues.apache.org/jira/browse/HUDI-1676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tao meng updated HUDI-1676: --- Summary: Support SQL with spark3 (was: Support sql with spark3) > Support SQL with spark3 > --- > > Key: HUDI-1676 > URL: https://issues.apache.org/jira/browse/HUDI-1676 > Project: Apache Hudi > Issue Type: Sub-task > Components: Spark Integration >Reporter: tao meng >Priority: Major > Fix For: 0.8.0 > > > 1、support CTAS for spark3 > 3、support INSERT for spark3 > 4、support merge、update、delete without RowKey constraint for spark3 > 5、support dataSourceV2 for spark3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (HUDI-1677) Support Clustering and Metatable for SQL performance
tao meng created HUDI-1677: -- Summary: Support Clustering and Metatable for SQL performance Key: HUDI-1677 URL: https://issues.apache.org/jira/browse/HUDI-1677 Project: Apache Hudi Issue Type: Sub-task Components: Spark Integration Reporter: tao meng Fix For: 0.8.0 1、support Metatable to improve SQL write pefermance 2、support Clustering to SQL read performance -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (HUDI-1676) Support sql with spark3
[ https://issues.apache.org/jira/browse/HUDI-1676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tao meng updated HUDI-1676: --- Summary: Support sql with spark3 (was: support sql with spark3) > Support sql with spark3 > --- > > Key: HUDI-1676 > URL: https://issues.apache.org/jira/browse/HUDI-1676 > Project: Apache Hudi > Issue Type: Sub-task > Components: Spark Integration >Reporter: tao meng >Priority: Major > Fix For: 0.8.0 > > > 1、support CTAS for spark3 > 3、support INSERT for spark3 > 4、support merge、update、delete without RowKey constraint for spark3 > 5、support dataSourceV2 for spark3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (HUDI-1676) support sql with spark3
tao meng created HUDI-1676: -- Summary: support sql with spark3 Key: HUDI-1676 URL: https://issues.apache.org/jira/browse/HUDI-1676 Project: Apache Hudi Issue Type: Sub-task Components: Spark Integration Reporter: tao meng Fix For: 0.8.0 1、support CTAS for spark3 3、support INSERT for spark3 4、support merge、update、delete without RowKey constraint for spark3 5、support dataSourceV2 for spark3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (HUDI-1675) Externalize all Hudi configurations
tao meng created HUDI-1675: -- Summary: Externalize all Hudi configurations Key: HUDI-1675 URL: https://issues.apache.org/jira/browse/HUDI-1675 Project: Apache Hudi Issue Type: Sub-task Components: Spark Integration Reporter: tao meng Fix For: 0.8.0 # Externalize all Hudi configurations (separate configuration file) # Save table related properties into hoodie.properties file. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [hudi] sbernauer commented on pull request #2649: Fixed invalid avro schemas in hudi-common/src/main/avro/
sbernauer commented on pull request #2649: URL: https://github.com/apache/hudi/pull/2649#issuecomment-793567734 Closed in favor of #2650 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] sbernauer closed pull request #2649: Fixed invalid avro schemas in hudi-common/src/main/avro/
sbernauer closed pull request #2649: URL: https://github.com/apache/hudi/pull/2649 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] sbernauer opened a new pull request #2650: Preparation for Avro update
sbernauer opened a new pull request #2650: URL: https://github.com/apache/hudi/pull/2650 Hi Hudi team! ## What is the purpose of the pull request We need to upgrade to at least Avro 1.9.x in production so i tried upgrading the avro version in the pom.xml of Hudi. Doing so i noticed some problems: Upgrade to Avro 1.9.2: - Renamed method defaultValue to defaultVal - Moved NullNode.getInstance() to JsonProperties.NULL_VALUE - Avro complains about invalid schemas/default values in hudi-common/src/main/avro/ - The shaded guava libs from Avro have been removed Upgrade to Avro 1.10.1: - Some more stuff Spark 3.2.0 (we currently use 3.1.1) will contain Avro 1.10.1 (https://issues.apache.org/jira/browse/SPARK-27733). Ín order to reduce the effort switching to a newer Avro version in the future i provided a patches that fixes the above mentioned issues. ## Brief change log Fix the above mentioned issues ## Verify this pull request Run Unit tests Kind regards Sebastian This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] sbernauer opened a new pull request #2649: Fixed invalid avro schemas in hudi-common/src/main/avro/
sbernauer opened a new pull request #2649: URL: https://github.com/apache/hudi/pull/2649 Hi Hudi team! ## What is the purpose of the pull request We need to upgrade to at least Avro 1.9.x in production so i tried upgrading the avro version in the pom.xml of Hudi. Doing so i noticed some problems: Upgrade to Avro 1.9.2: - Renamed method defaultValue to defaultVal ==> #TODO - Avro complains about invalid schemas/default values in hudi-common/src/main/avro/ ==> #TODO - The shaded guava libs from Avro have been removed ==> #TODO Upgrade to Avro 1.10.1: - Some more stuff Spark 3.2.0 (we currently use 3.1.1) will contain Avro 1.10.1 (https://issues.apache.org/jira/browse/SPARK-27733). Ín order to reduce the effort switching to a newer Avro version in the future i provided some patches that fixes the above mentioned issues. ## Brief change log This PR addresses the following issues: - Avro complains about invalid schemas/default values in hudi-common/src/main/avro/ ==> #TODO ## Verify this pull request Run Unit tests Kind regards Sebastian This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] guanziyue opened a new issue #2648: [SUPPORT]
guanziyue opened a new issue #2648: URL: https://github.com/apache/hudi/issues/2648 **Describe the problem you faced** Hello guys, I meet a NPE when I use spark dataSource API to read a MOR table. The stacktrace is attached at the end of post. Then I tried to find suspicious code by online debugging. The result being observed is shown as below. Let's start from the method buildFileIndex in MergeOnReadSnapshotRelation. [https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala#L136](url) Firstly, file status of all **parquet** files is fetched from InMemoryFileIndex at line 137. Then all **parquet** files is fetched as base file from HoodieTableFileSystemView at line 145 as latestFiles. After that, logic goes into groupLogsByBaseFile in HoodieRealtimeInputFormatUtils. [https://github.com/apache/hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java#L139](url) At line 158, **all fileSlices** are returned with some having a parquet base file while others not (not be compacted). At line 166, for every file slice, hudi try to get base file for it by look up the map which only contains parquet base file id. When a file slice has not have a parquet base file yet , such looking up will result in NPE. Could any one please kindly point out which step has an unexpected result? **To Reproduce** The code I used to query is quite simple. > `SparkSession spark = SparkSession.builder() .appName("hudi-read_guanziyue") .enableHiveSupport() .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.hive.convertMetastoreParquet", "false") .config("spark.driver.allowMultipleContexts", true) .config("spark.dynamicAllocation.enabled", true) .config("spark.executor.memory", "30g") .config("spark.executor.cores", "4") .getOrCreate(); Dataset queryDF = spark .read() .format("hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()) .load(warehousePath + "/*"); queryDF.createOrReplaceTempView("table"); queryDF.show();` **Environment Description** * Hudi version :0.6.0 * Spark version :3.0.1 **Stacktrace** ```org.apache.hudi.exception.HoodieException: Error obtaining data file/log file grouping: hdfs://mytablePath/20210308 at org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.lambda$groupLogsByBaseFile$16(HoodieRealtimeInputFormatUtils.java:162) at java.util.HashMap$KeySet.forEach(HashMap.java:932) at org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(HoodieRealtimeInputFormatUtils.java:131) at org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:139) at org.apache.hudi.MergeOnReadSnapshotRelation.(MergeOnReadSnapshotRelation.scala:73) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:98) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:342) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:214) at hudiReadExample.main(hudiReadExample.java:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:742) Caused by: java.lang.NullPointerException at org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.lambda$null$15(HoodieRealtimeInputFormatUtils.java:151) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.stream.Re