[jira] [Created] (HUDI-1795) allow ExternalSpillMap use accurate payload size rather than estimated
ZiyueGuan created HUDI-1795: --- Summary: allow ExternalSpillMap use accurate payload size rather than estimated Key: HUDI-1795 URL: https://issues.apache.org/jira/browse/HUDI-1795 Project: Apache Hudi Issue Type: Improvement Components: Compaction Reporter: ZiyueGuan Situation: In ExternalSpillMap, we need to control the amount of data in memory map to avoid OOM. Currently, we evaluate this by estimate the average size of each payload twice. And get total memory use by multiple average payload size with payload number. The first time we get the size is when first payload is inserted while the second time is when there are 100 payloads stored in memory. Problem: If the size is underestimated in the second estimation, an OOM will happen. Plan: Could we have a flag to control if we want an evaluation in accurate? Currently, I have several ideas but not sure which one could be the best or if there are any better one. # Estimate each payload, store the length of payload with its value. Once update or remove happen, use diff old length and add new length if needed so that we keep the sum of all payload size precisely. This is the method I currently use in prod. # Do not store the length but evaluate old payload again when it is popped. It trades off space against time comparing to method one. A better performance may be reached when updating and removing are rare. I didn't adopt this because I had profile ingestion process by arthas and found size estimating in that may be time consuming in flame graph. But I'm not sure whether it is true in compaction. In my intuition,HoodieRecordPayload has a quite simple structure. # I also have a more accurate estimate method that is evaluate the whole map when size is 1,100,1 and one million. Less underestimate will happen in such large amount of data. Look forward to any advice or suggestion or discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (HUDI-1796) allow ExternalSpillMap use accurate payload size rather than estimated
ZiyueGuan created HUDI-1796: --- Summary: allow ExternalSpillMap use accurate payload size rather than estimated Key: HUDI-1796 URL: https://issues.apache.org/jira/browse/HUDI-1796 Project: Apache Hudi Issue Type: Improvement Components: Compaction Reporter: ZiyueGuan Situation: In ExternalSpillMap, we need to control the amount of data in memory map to avoid OOM. Currently, we evaluate this by estimate the average size of each payload twice. And get total memory use by multiple average payload size with payload number. The first time we get the size is when first payload is inserted while the second time is when there are 100 payloads stored in memory. Problem: If the size is underestimated in the second estimation, an OOM will happen. Plan: Could we have a flag to control if we want an evaluation in accurate? Currently, I have several ideas but not sure which one could be the best or if there are any better one. # Estimate each payload, store the length of payload with its value. Once update or remove happen, use diff old length and add new length if needed so that we keep the sum of all payload size precisely. This is the method I currently use in prod. # Do not store the length but evaluate old payload again when it is popped. It trades off space against time comparing to method one. A better performance may be reached when updating and removing are rare. I didn't adopt this because I had profile ingestion process by arthas and found size estimating in that may be time consuming in flame graph. But I'm not sure whether it is true in compaction. In my intuition,HoodieRecordPayload has a quite simple structure. # I also have a more accurate estimate method that is evaluate the whole map when size is 1,100,1 and one million. Less underestimate will happen in such large amount of data. Look forward to any advice or suggestion or discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (HUDI-1795) allow ExternalSpillMap use accurate payload size rather than estimated
[ https://issues.apache.org/jira/browse/HUDI-1795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan closed HUDI-1795. --- Resolution: Duplicate > allow ExternalSpillMap use accurate payload size rather than estimated > -- > > Key: HUDI-1795 > URL: https://issues.apache.org/jira/browse/HUDI-1795 > Project: Apache Hudi > Issue Type: Improvement > Components: Compaction >Reporter: ZiyueGuan >Priority: Major > > Situation: In ExternalSpillMap, we need to control the amount of data in > memory map to avoid OOM. Currently, we evaluate this by estimate the average > size of each payload twice. And get total memory use by multiple average > payload size with payload number. The first time we get the size is when > first payload is inserted while the second time is when there are 100 > payloads stored in memory. > Problem: If the size is underestimated in the second estimation, an OOM will > happen. > Plan: Could we have a flag to control if we want an evaluation in accurate? > Currently, I have several ideas but not sure which one could be the best or > if there are any better one. > # Estimate each payload, store the length of payload with its value. Once > update or remove happen, use diff old length and add new length if needed so > that we keep the sum of all payload size precisely. This is the method I > currently use in prod. > # Do not store the length but evaluate old payload again when it is popped. > It trades off space against time comparing to method one. A better > performance may be reached when updating and removing are rare. I didn't > adopt this because I had profile ingestion process by arthas and found size > estimating in that may be time consuming in flame graph. But I'm not sure > whether it is true in compaction. In my intuition,HoodieRecordPayload has a > quite simple structure. > # I also have a more accurate estimate method that is evaluate the whole map > when size is 1,100,1 and one million. Less underestimate will happen in > such large amount of data. > Look forward to any advice or suggestion or discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (HUDI-1796) allow ExternalSpillMap use accurate payload size rather than estimated
[ https://issues.apache.org/jira/browse/HUDI-1796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan updated HUDI-1796: Description: Situation: In ExternalSpillMap, we need to control the amount of data in memory map to avoid OOM. Currently, we evaluate this by estimate the average size of each payload twice. And get total memory use by multiplying average payload size with payload number. The first time we get the size is when first payload is inserted while the second time is when there are 100 payloads stored in memory. Problem: If the size is underestimated in the second estimation, an OOM will happen. Plan: Could we have a flag to control if we want an evaluation in accurate? Currently, I have several ideas but not sure which one could be the best or if there are any better one. # Estimate each payload, store the length of payload with its value. Once update or remove happen, use diff old length and add new length if needed so that we keep the sum of all payload size precisely. This is the method I currently use in prod. # Do not store the length but evaluate old payload again when it is popped. It trades off space against time comparing to method one. A better performance may be reached when updating and removing are rare. I didn't adopt this because I had profile ingestion process by arthas and found size estimating in that may be time consuming in flame graph. But I'm not sure whether it is true in compaction. In my intuition,HoodieRecordPayload has a quite simple structure. # I also have a more accurate estimate method that is evaluate the whole map when size is 1,100,1 and one million. Less underestimate will happen in such large amount of data. Look forward to any advice or suggestion or discussion. was: Situation: In ExternalSpillMap, we need to control the amount of data in memory map to avoid OOM. Currently, we evaluate this by estimate the average size of each payload twice. And get total memory use by multiple average payload size with payload number. The first time we get the size is when first payload is inserted while the second time is when there are 100 payloads stored in memory. Problem: If the size is underestimated in the second estimation, an OOM will happen. Plan: Could we have a flag to control if we want an evaluation in accurate? Currently, I have several ideas but not sure which one could be the best or if there are any better one. # Estimate each payload, store the length of payload with its value. Once update or remove happen, use diff old length and add new length if needed so that we keep the sum of all payload size precisely. This is the method I currently use in prod. # Do not store the length but evaluate old payload again when it is popped. It trades off space against time comparing to method one. A better performance may be reached when updating and removing are rare. I didn't adopt this because I had profile ingestion process by arthas and found size estimating in that may be time consuming in flame graph. But I'm not sure whether it is true in compaction. In my intuition,HoodieRecordPayload has a quite simple structure. # I also have a more accurate estimate method that is evaluate the whole map when size is 1,100,1 and one million. Less underestimate will happen in such large amount of data. Look forward to any advice or suggestion or discussion. > allow ExternalSpillMap use accurate payload size rather than estimated > -- > > Key: HUDI-1796 > URL: https://issues.apache.org/jira/browse/HUDI-1796 > Project: Apache Hudi > Issue Type: Improvement > Components: Compaction >Reporter: ZiyueGuan >Priority: Major > > Situation: In ExternalSpillMap, we need to control the amount of data in > memory map to avoid OOM. Currently, we evaluate this by estimate the average > size of each payload twice. And get total memory use by multiplying average > payload size with payload number. The first time we get the size is when > first payload is inserted while the second time is when there are 100 > payloads stored in memory. > Problem: If the size is underestimated in the second estimation, an OOM will > happen. > Plan: Could we have a flag to control if we want an evaluation in accurate? > Currently, I have several ideas but not sure which one could be the best or > if there are any better one. > # Estimate each payload, store the length of payload with its value. Once > update or remove happen, use diff old length and add new length if needed so > that we keep the sum of all payload size precisely. This is the method I > currently use in prod. > # Do not store the length but evaluate old payload again when it is popped. > It trades off space against time comparing to method one. A b
[jira] [Created] (HUDI-1875) Improve perf of MOR table upsert based on HDFS
ZiyueGuan created HUDI-1875: --- Summary: Improve perf of MOR table upsert based on HDFS Key: HUDI-1875 URL: https://issues.apache.org/jira/browse/HUDI-1875 Project: Apache Hudi Issue Type: Improvement Reporter: ZiyueGuan Problem: When we use upsert in MOR table, hudi assign one task for one fileId which needs to be created or updated. In such situation, near one million tasks may be created in most of which may simply append few records to a fileId. Such process may be slow and a few skew tasks appear. Reason: hudi use hsync to guarantee data is stored properly. Call hsync so much times towards a hdfs cluster in 2 minutes or less will lead to high IOPS for disks. In addition to this, creating too much tasks brings high overhead of scheduling tasks against append two or three records to a file. TODO: Option One: use hflush instead of hsync. This may lead data loss when all DN shutdown at the same time. However, this has a quite low chance to occur when HDFS deploy across AZ. Option two: make hsync process asynchronous and let more than one writing process run in the same task. This will reduce the task numbers but increase mem use. I may first try option one as it is simple enough. When -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (HUDI-3026) HoodieAppendhandle may result in duplicate key for hbase index
[ https://issues.apache.org/jira/browse/HUDI-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan updated HUDI-3026: Description: Problem: a same key may occur in two file group when Hbase index is used. These two file group will have same FileID prefix. As Hbase index is global, this is unexpected How to repro: We should have a table w/o record sorted in spark. Let's say we have five records with key 1,2,3,4,5 to write. They may be iterated in different order. In the first attempt 1, we write three records 5,4,3 to fileID_1_log.1_attempt1. But this attempt failed. Spark will have a try in the second task attempt (attempt 2), we write four records 1,2,3,4 to fileID_1_log.1_attempt2. And then, we find this filegroup is large enough by call canWrite. So hudi write record 5 to fileID_2_log.1_attempt2 and finish this commit. When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we also got 5 in fileID_2. Record 5 will appear in two fileGroup. Reason: Markerfile doesn't reconcile log file as code show in [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.] And log file is actually not fail-safe. I'm not sure if [~danny0405] have found this problem too as I find FlinkAppendHandle had been made to always return true. But it was just changed back recently. Solution: We may have a quick fix by making canWrite in HoodieAppendHandle always return true. However, I think there may be a more elegant solution that we use append result to generate compaction plan rather than list log file, in which we will have a more granular control on log block instead of log file. was: Problem: a same key may occur in two file group when Hbase index is used. These two file group will have same FileID prefix. As Hbase index is global, this is unexpected How to repro: We should have a table w/o record sorted in spark. Let's say we have 1,2,3,4,5 records to write. They may be iterated in different order. In the first attempt 1, we write 543 to fileID_1_log.1_attempt1. But this attempt failed. Spark will have a try in the second task attempt (attempt 2), we write 1234 to fileID_1_log.1_attempt2. And then, we find this filegroup is large enough by call canWrite. So hudi write record 5 to fileID_2_log.1_attempt2 and finish this commit. When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we also got 5 in fileID_2. Record 5 will appear in two fileGroup. Reason: Markerfile doesn't reconcile log file as code show in [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.] And log file is actually not fail-safe. I'm not sure if [~danny0405] have found this problem too as I find FlinkAppendHandle had been made to always return true. But it was just changed back recently. Solution: We may have a quick fix by making canWrite in HoodieAppendHandle always return true. However, I think there may be a more elegant solution that we use append result to generate compaction plan rather than list log file, in which we will have a more granular control on log block instead of log file. > HoodieAppendhandle may result in duplicate key for hbase index > -- > > Key: HUDI-3026 > URL: https://issues.apache.org/jira/browse/HUDI-3026 > Project: Apache Hudi > Issue Type: Bug >Reporter: ZiyueGuan >Assignee: ZiyueGuan >Priority: Major > Labels: pull-request-available > > Problem: a same key may occur in two file group when Hbase index is used. > These two file group will have same FileID prefix. As Hbase index is global, > this is unexpected > How to repro: > We should have a table w/o record sorted in spark. Let's say we have five > records with key 1,2,3,4,5 to write. They may be iterated in different order. > In the first attempt 1, we write three records 5,4,3 to > fileID_1_log.1_attempt1. But this attempt failed. Spark will have a try in > the second task attempt (attempt 2), we write four records 1,2,3,4 to > fileID_1_log.1_attempt2. And then, we find this filegroup is large enough by > call canWrite. So hudi write record 5 to fileID_2_log.1_attempt2 and finish > this commit. > When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 > will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we > also got 5 in fileID_2. Record 5 will appear in two fileGroup. > Reason: Markerfile doesn't reconcile log file
[jira] [Commented] (HUDI-3026) HoodieAppendhandle may result in duplicate key for hbase index
[ https://issues.apache.org/jira/browse/HUDI-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469656#comment-17469656 ] ZiyueGuan commented on HUDI-3026: - Thanks for your kind explanation. I have few experience about hudi on flink. This problem may only occur w/ spark. > HoodieAppendhandle may result in duplicate key for hbase index > -- > > Key: HUDI-3026 > URL: https://issues.apache.org/jira/browse/HUDI-3026 > Project: Apache Hudi > Issue Type: Bug >Reporter: ZiyueGuan >Assignee: ZiyueGuan >Priority: Major > Labels: pull-request-available > > Problem: a same key may occur in two file group when Hbase index is used. > These two file group will have same FileID prefix. As Hbase index is global, > this is unexpected > How to repro: > We should have a table w/o record sorted in spark. Let's say we have five > records with key 1,2,3,4,5 to write. They may be iterated in different order. > In the first attempt 1, we write three records 5,4,3 to > fileID_1_log.1_attempt1. But this attempt failed. Spark will have a try in > the second task attempt (attempt 2), we write four records 1,2,3,4 to > fileID_1_log.1_attempt2. And then, we find this filegroup is large enough by > call canWrite. So hudi write record 5 to fileID_2_log.1_attempt2 and finish > this commit. > When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 > will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we > also got 5 in fileID_2. Record 5 will appear in two fileGroup. > Reason: Markerfile doesn't reconcile log file as code show in > [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.] > And log file is actually not fail-safe. > I'm not sure if [~danny0405] have found this problem too as I find > FlinkAppendHandle had been made to always return true. But it was just > changed back recently. > Solution: > We may have a quick fix by making canWrite in HoodieAppendHandle always > return true. However, I think there may be a more elegant solution that we > use append result to generate compaction plan rather than list log file, in > which we will have a more granular control on log block instead of log file. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (HUDI-2400) Allow timeline server correctly sync when concurrent write to timeline
ZiyueGuan created HUDI-2400: --- Summary: Allow timeline server correctly sync when concurrent write to timeline Key: HUDI-2400 URL: https://issues.apache.org/jira/browse/HUDI-2400 Project: Apache Hudi Issue Type: Sub-task Components: Compaction Reporter: ZiyueGuan Firstly, assume HUDI-1847 is available and we can have an ingestion spark job and a compaction job running at the same time. Assume we have a timestamp for each HoodieTimeLine object which represent the time it generated from hdfs. Considering following case, 1. ingestion schedule compaction inline. Now we have a timeline: 1.deltaCommit.Completed, 2.Compaction.Requested (TimeStamp: 1L) 2. Then ingestion keep move on. We now have 1.deltaCommit.Completed, 2.Compaction.Requested 3.deltaCommit.Inflight (TimeStamp: 2L) in ingestion job. 3. We have an independent Spark job run compaction 2. We now have 1.deltaCommit.Completed, 2.Compaction.Inflight 3.deltaCommit.Inflight (TimeStamp: 3L) 4. Executors in ingestion job send request to timeline server, now they hold timeline with TimeStamp 2L. But Timeline Server have timestamp 3L which is later than client. According to the logic in https://github.com/apache/hudi/blob/master/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java#L137, we thought local view of table's timeline is behind that of client's view as long as the timeline hashes are different. However this may not be true in the case mentioned above. Here the hashes are different because client view is behind local view. A simple solution is to add an attribute to timeline which is the timestamp we used above. And timeline server may determine whether to sync fileSystemView by comparing timestamps between client and local rather than the difference between timeline hashes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (HUDI-2665) Overflow of DataOutputStream may lead to corrupted log block
ZiyueGuan created HUDI-2665: --- Summary: Overflow of DataOutputStream may lead to corrupted log block Key: HUDI-2665 URL: https://issues.apache.org/jira/browse/HUDI-2665 Project: Apache Hudi Issue Type: Bug Reporter: ZiyueGuan In HoodieLogFormatWriter, we use size() method of DataOutputStream to calculate the size of log block we write. However, this method only allows size no more than Integer.MAX_VALUE. When bytes we writen overflow, we will get a corrupted log block as the size of header is inconsistent with the one at footer https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (HUDI-2665) Overflow of DataOutputStream may lead to corrupted log block
[ https://issues.apache.org/jira/browse/HUDI-2665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan reassigned HUDI-2665: --- Assignee: ZiyueGuan > Overflow of DataOutputStream may lead to corrupted log block > > > Key: HUDI-2665 > URL: https://issues.apache.org/jira/browse/HUDI-2665 > Project: Apache Hudi > Issue Type: Bug >Reporter: ZiyueGuan >Assignee: ZiyueGuan >Priority: Major > > In HoodieLogFormatWriter, we use size() method of DataOutputStream to > calculate the size of log block we write. However, this method only allows > size no more than Integer.MAX_VALUE. When bytes we writen overflow, we will > get a corrupted log block as the size of header is inconsistent with the one > at footer > https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (HUDI-2665) Overflow of DataOutputStream may lead to corrupted log block
[ https://issues.apache.org/jira/browse/HUDI-2665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan updated HUDI-2665: Priority: Minor (was: Major) > Overflow of DataOutputStream may lead to corrupted log block > > > Key: HUDI-2665 > URL: https://issues.apache.org/jira/browse/HUDI-2665 > Project: Apache Hudi > Issue Type: Bug >Reporter: ZiyueGuan >Assignee: ZiyueGuan >Priority: Minor > > In HoodieLogFormatWriter, we use size() method of DataOutputStream to > calculate the size of log block we write. However, this method only allows > size no more than Integer.MAX_VALUE. When bytes we writen overflow, we will > get a corrupted log block as the size of header is inconsistent with the one > at footer > https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (HUDI-2031) JVM occasionally crashes during compaction when spark speculative execution is enabled
[ https://issues.apache.org/jira/browse/HUDI-2031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17437937#comment-17437937 ] ZiyueGuan commented on HUDI-2031: - Any guys know the root cause of this problem? Curious about why an interrupt could lead to problems of parquet > JVM occasionally crashes during compaction when spark speculative execution > is enabled > -- > > Key: HUDI-2031 > URL: https://issues.apache.org/jira/browse/HUDI-2031 > Project: Apache Hudi > Issue Type: Bug >Affects Versions: 0.8.0 >Reporter: Rong Ma >Priority: Major > Labels: pull-request-available > Fix For: 0.9.0 > > > This could happen when speculative execution is triggered. The duplicated > tasks are expected to terminate normally, but sometimes they cannot and will > cause the JVM crashes. > > From executor logs: > {quote}ERROR [Executor task launch worker for task 6828] HoodieMergeHandle: > Error writing record HoodieRecord{key=HoodieKey > { recordKey=45246275517 partitionPath=2021-06-13}, currentLocation='null', > newLocation='null'}ERROR [Executor task launch worker for task 6828] > HoodieMergeHandle: Error writing record HoodieRecord\{key=HoodieKey { > recordKey=45246275517 partitionPath=2021-06-13} > , currentLocation='null', > newLocation='null'}java.lang.IllegalArgumentException: You cannot call > toBytes() more than once without calling reset() at > org.apache.parquet.Preconditions.checkArgument(Preconditions.java:53) at > org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder.toBytes(RunLengthBitPackingHybridEncoder.java:254) > at > org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter.getBytes(RunLengthBitPackingHybridValuesWriter.java:65) > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:148) > at > org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106) > at > org.apache.parquet.column.impl.ColumnWriterV1.write(ColumnWriterV1.java:200) > at > org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addBinary(MessageColumnIO.java:469) > at > org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346) > at > org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278) > at > org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191) > at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128) > at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299) at > org.apache.hudi.io.storage.HoodieParquetWriter.writeAvroWithMetadata(HoodieParquetWriter.java:83) > at > org.apache.hudi.io.HoodieMergeHandle.writeRecord(HoodieMergeHandle.java:252) > at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:336) at > org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:107) > at > org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdateInternal(HoodieSparkCopyOnWriteTable.java:199) > at > org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdate(HoodieSparkCopyOnWriteTable.java:190) > at > org.apache.hudi.table.action.compact.HoodieSparkMergeOnReadTableCompactor.compact(HoodieSparkMergeOnReadTableCompactor.java:154) > at > org.apache.hudi.table.action.compact.HoodieSparkMergeOnReadTableCompactor.lambda$compact$9ec9d4c7$1(HoodieSparkMergeOnReadTableCompactor.java:105) > at > org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at > scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at > org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349) > at > org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1388) > at > org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:311) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
[jira] [Created] (HUDI-2771) Handle FileNotExist exception in parquet Utils
ZiyueGuan created HUDI-2771: --- Summary: Handle FileNotExist exception in parquet Utils Key: HUDI-2771 URL: https://issues.apache.org/jira/browse/HUDI-2771 Project: Apache Hudi Issue Type: Bug Reporter: ZiyueGuan As spark may use speculation. there may be two base file in one file group with same instant time. In such case, we check if parquet is valid by reading footer. However, a cleaning process may happen during this process, which lead to a FileNotFoundException. We may catch such exception and treat it as a invalid parquet. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (HUDI-2771) Handle FileNotExist exception in parquet Utils
[ https://issues.apache.org/jira/browse/HUDI-2771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan reassigned HUDI-2771: --- Assignee: ZiyueGuan > Handle FileNotExist exception in parquet Utils > -- > > Key: HUDI-2771 > URL: https://issues.apache.org/jira/browse/HUDI-2771 > Project: Apache Hudi > Issue Type: Bug >Reporter: ZiyueGuan >Assignee: ZiyueGuan >Priority: Major > > As spark may use speculation. there may be two base file in one file group > with same instant time. In such case, we check if parquet is valid by reading > footer. However, a cleaning process may happen during this process, which > lead to a FileNotFoundException. We may catch such exception and treat it as > a invalid parquet. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (HUDI-2771) Handle FileNotExist exception in parquet Utils
[ https://issues.apache.org/jira/browse/HUDI-2771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan closed HUDI-2771. --- Resolution: Invalid > Handle FileNotExist exception in parquet Utils > -- > > Key: HUDI-2771 > URL: https://issues.apache.org/jira/browse/HUDI-2771 > Project: Apache Hudi > Issue Type: Bug >Reporter: ZiyueGuan >Assignee: ZiyueGuan >Priority: Major > > As spark may use speculation. there may be two base file in one file group > with same instant time. In such case, we check if parquet is valid by reading > footer. However, a cleaning process may happen during this process, which > lead to a FileNotFoundException. We may catch such exception and treat it as > a invalid parquet. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption
[ https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan reassigned HUDI-2875: --- Assignee: ZiyueGuan > Concurrent call to HoodieMergeHandler cause parquet corruption > -- > > Key: HUDI-2875 > URL: https://issues.apache.org/jira/browse/HUDI-2875 > Project: Apache Hudi > Issue Type: Bug > Components: Common Core >Reporter: ZiyueGuan >Assignee: ZiyueGuan >Priority: Major > > Problem: > Some corrupted parquet files are generated and exceptions will be thrown when > read. > e.g. > > Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value > at 0 in block -1 in file > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) > at > org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) > at > org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ... 4 more > Caused by: org.apache.parquet.io.ParquetDecodingException: could not read > page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in > col required binary col > at > org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599) > at > org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57) > at > org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536) > at > org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533) > at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95) > at > org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533) > at > org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525) > at > org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638) > at > org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353) > at > org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80) > at > org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75) > at > org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271) > at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) > at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) > at > org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) > at > org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222) > ... 11 more > Caused by: java.io.EOFException > at java.io.DataInputStream.readFully(DataInputStream.java:197) > at java.io.DataInputStream.readFully(DataInputStream.java:169) > at > org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286) > at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237) > at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246) > at > org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592) > > How to reproduce: > We need a way which could interrupt one task w/o shutdown JVM. Let's say, > speculation. When speculation is triggered, other tasks at the same time or > later will have the risk to suffer a wrong parquet generation. Nearly half > of them will throw exception while there is few tasks succeed without any > signal. > RootCause: > ParquetWriter is not thread safe. User of it should apply proper way to > guarantee that there is not concurrent call to ParquetWriter. In the > following code: > https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103 > We call both write and close to parquet writer concurrently. There is a pool > of Compressor which is used inside parquet writer for store compressed bytes. > Parquet writers closed in such way, could not pa
[jira] [Created] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption
ZiyueGuan created HUDI-2875: --- Summary: Concurrent call to HoodieMergeHandler cause parquet corruption Key: HUDI-2875 URL: https://issues.apache.org/jira/browse/HUDI-2875 Project: Apache Hudi Issue Type: Bug Components: Common Core Reporter: ZiyueGuan Problem: Some corrupted parquet files are generated and exceptions will be thrown when read. e.g. Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 more Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col required binary col at org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599) at org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57) at org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536) at org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533) at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95) at org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533) at org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525) at org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638) at org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353) at org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80) at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75) at org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222) ... 11 more Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286) at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237) at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246) at org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592) How to reproduce: We need a way which could interrupt one task w/o shutdown JVM. Let's say, speculation. When speculation is triggered, other tasks at the same time or later will have the risk to suffer a wrong parquet generation. Nearly half of them will throw exception while there is few tasks succeed without any signal. RootCause: ParquetWriter is not thread safe. User of it should apply proper way to guarantee that there is not concurrent call to ParquetWriter. In the following code: https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103 We call both write and close to parquet writer concurrently. There is a pool of Compressor which is used inside parquet writer for store compressed bytes. Parquet writers closed in such way, could not payback totally reset compressor so that any task reuse this dirty compressor may generate wrong data. What a pity is that I haven't come up with a good way to repo in small use case. Validation is doing in real hudi ingestion job. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (HUDI-2917) Rollback may be incorrect for canIndexLogFile index
ZiyueGuan created HUDI-2917: --- Summary: Rollback may be incorrect for canIndexLogFile index Key: HUDI-2917 URL: https://issues.apache.org/jira/browse/HUDI-2917 Project: Apache Hudi Issue Type: Bug Components: Common Core Reporter: ZiyueGuan Problem: we may find some data which should be rollbacked in hudi table. Root cause: Let's first recall how rollback plan generated about log blocks for deltaCommit. Hudi takes two cases into consideration. # For some log file with no base file, they are comprised by records which are all 'insert record'. Delete them directly. Here we assume all inserted record should be covered by this way. # For those fileID which are updated according to inflight commit meta of instant we want to rollback, we append command block to these log file to rollback. Here all updated record are handled. However, the first condition is not always true. For indexes which can index log file, they could insert record to some existing log file. In current process, inflight hoodieCommitMeta was generated before they are assigned to specific filegroup. Fix: What's needed to fix this problem, we need to use the result of partitioner to generate hoodieCommitMeta rather than workProfile. Also, we may need more comments in rollback code to remind this case. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption
[ https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan updated HUDI-2875: Description: Problem: Some corrupted parquet files are generated and exceptions will be thrown when read. e.g. Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 more Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col required binary col at org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599) at org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57) at org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536) at org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533) at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95) at org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533) at org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525) at org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638) at org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353) at org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80) at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75) at org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222) ... 11 more Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286) at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237) at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246) at org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592) How to reproduce: We need a way which could interrupt one task w/o shutdown JVM. Let's say, speculation. When speculation is triggered, other tasks working at the same executor will have the risk to suffer a wrong parquet generation. This will not always result in corrupted parquet file. Nearly half of them will throw exception while there is few tasks succeed without any signal. RootCause: ParquetWriter is not thread safe. User of it should apply proper way to guarantee that there is not concurrent call to ParquetWriter. In the following code: [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103] We call both write and close to parquet writer concurrently. Data may being written while we call close. In close method, compressor (a class used by parquet to do compressing which has a stateful data structure insied) will be cleared and payback to a pool for following reuse. Due to the concurrent write mentioned above, data may be continued pushed to compressor even though we have them cleared. Besides, there is a mechanism inside compressor which tries to check some invalid use. That's why some of invalid usage will throw exception rather than generate corrupted parquet. Validation: Current solution is validated by production environment. A signal is that when this fix applied is that there should be no task failed due
[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption
[ https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan updated HUDI-2875: Description: Problem: Some corrupted parquet files are generated and exceptions will be thrown when read. e.g. Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 more Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col required binary col at org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599) at org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57) at org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536) at org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533) at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95) at org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533) at org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525) at org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638) at org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353) at org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80) at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75) at org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222) ... 11 more Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286) at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237) at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246) at org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592) How to reproduce: We need a way which could interrupt one task w/o shutdown JVM. Let's say, speculation. When speculation is triggered, other tasks working at the same executor will have the risk to suffer a wrong parquet generation. This will not always result in corrupted parquet file. Nearly half of them will throw exception while there is few tasks succeed without any signal. RootCause: ParquetWriter is not thread safe. User of it should apply proper way to guarantee that there is not concurrent call to ParquetWriter. In the following code: [https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103] We call both write and close to parquet writer concurrently. Data may being written while we call close. In close method, compressor (a class used by parquet to do compressing which has a stateful data structure insied) will be cleared and payback to a pool for following reuse. Due to the concurrent write mentioned above, data may be continued pushed to compressor even though we have them cleared. Besides, there is a mechanism inside compressor which tries to check some invalid use. That's why some of invalid usage will throw exception rather than generate corrupted parquet. Validation: Current solution is validated by production environment. A single is that when this fix applied is that there should be no task failed due
[jira] [Commented] (HUDI-2761) IllegalArgException from timeline server when serving getLastestBaseFiles with multi-writer
[ https://issues.apache.org/jira/browse/HUDI-2761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17457810#comment-17457810 ] ZiyueGuan commented on HUDI-2761: - Close 2400 as it seems to be the same problem with this issude. https://issues.apache.org/jira/browse/HUDI-2400 > IllegalArgException from timeline server when serving getLastestBaseFiles > with multi-writer > --- > > Key: HUDI-2761 > URL: https://issues.apache.org/jira/browse/HUDI-2761 > Project: Apache Hudi > Issue Type: Sub-task >Reporter: sivabalan narayanan >Assignee: sivabalan narayanan >Priority: Blocker > Fix For: 0.10.0 > > Attachments: Screen Shot 2021-11-15 at 8.27.11 AM.png, Screen Shot > 2021-11-15 at 8.27.33 AM.png, Screen Shot 2021-11-15 at 8.28.03 AM.png, > Screen Shot 2021-11-15 at 8.28.25 AM.png > > > When concurrent writes try to ingest to hudi, occasionally, we run into > IllegalArgumentException as below. Even though exception is seen, the actual > write succeeds though. > Here is what is happening from my understanding. > > Lets say table's latest commit is C3. > Writer1 tries to commit C4, writer2 tries to do C5 and writer3 tries to do C6 > (all 3 are non-overlapping and so expected to succeed) > I started C4 from writer1 and then switched to writer 2 and triggered C5 and > then did the same for writer3. > C4 went through fine for writer1 and succeeded. > for writer2, when timeline got instantiated, it's latest snapshot was C3, but > when it received the getLatestBaseFiles() request, latest commit was C4 and > so it throws an exception. Similar issue happend w/ writer3 as well. > > {code:java} > scala> df.write.format("hudi"). > | options(getQuickstartWriteConfigs). > | option(PRECOMBINE_FIELD.key(), "created_at"). > | option(RECORDKEY_FIELD.key(), "other"). > | option(PARTITIONPATH_FIELD.key(), "type"). > | option("hoodie.cleaner.policy.failed.writes","LAZY"). > | > option("hoodie.write.concurrency.mode","OPTIMISTIC_CONCURRENCY_CONTROL"). > | > option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider"). > | option("hoodie.write.lock.zookeeper.url","localhost"). > | option("hoodie.write.lock.zookeeper.port","2181"). > | option("hoodie.write.lock.zookeeper.lock_key","locks"). > | > option("hoodie.write.lock.zookeeper.base_path","/tmp/mw_testing/.locks"). > | option(TBL_NAME.key(), tableName). > | mode(Append). > | save(basePath) > 21/11/15 07:47:33 WARN HoodieSparkSqlWriter$: Commit time 2025074733457 > 21/11/15 07:47:35 WARN EmbeddedTimelineService: Started embedded timeline > server at 10.0.0.202:57644 > [Stage 2:> (0 > 21/11/15 07:47:39 > ERROR RequestHandler: Got runtime exception servicing request > partition=CreateEvent&maxinstant=2025074301094&basepath=file%3A%2Ftmp%2Fmw_testing%2Ftrial2&lastinstantts=2025074301094&timelinehash=ce963fe977a9d2176fadecf16c223cb3b98d7f6f7aaaf41cd7855eb098aee47d > java.lang.IllegalArgumentException: Last known instant from client was > 2025074301094 but server has the following timeline > [[2025074301094__commit__COMPLETED], > [2025074731908__commit__COMPLETED]] > at > org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40) > at > org.apache.hudi.timeline.service.RequestHandler$ViewHandler.handle(RequestHandler.java:510) > at io.javalin.security.SecurityUtil.noopAccessManager(SecurityUtil.kt:22) > at io.javalin.Javalin.lambda$addHandler$0(Javalin.java:606) > at io.javalin.core.JavalinServlet$service$2$1.invoke(JavalinServlet.kt:46) > at io.javalin.core.JavalinServlet$service$2$1.invoke(JavalinServlet.kt:17) > at io.javalin.core.JavalinServlet$service$1.invoke(JavalinServlet.kt:143) > at io.javalin.core.JavalinServlet$service$2.invoke(JavalinServlet.kt:41) > at io.javalin.core.JavalinServlet.service(JavalinServlet.kt:107) > at > io.javalin.core.util.JettyServerUtil$initialize$httpHandler$1.doHandle(JettyServerUtil.kt:72) > at > org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203) > at > org.apache.hudi.org.apache.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480) > at > org.apache.hudi.org.apache.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1668) > at > org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201) > at > org.apache.hudi.org.apache.jetty.server.handler.ContextHandler.doScope(ContextHand
[jira] [Commented] (HUDI-2400) Allow timeline server correctly sync when concurrent write to timeline
[ https://issues.apache.org/jira/browse/HUDI-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17457811#comment-17457811 ] ZiyueGuan commented on HUDI-2400: - Duplicate with https://issues.apache.org/jira/browse/HUDI-2761. > Allow timeline server correctly sync when concurrent write to timeline > -- > > Key: HUDI-2400 > URL: https://issues.apache.org/jira/browse/HUDI-2400 > Project: Apache Hudi > Issue Type: Sub-task > Components: Compaction >Reporter: ZiyueGuan >Priority: Major > > Firstly, assume HUDI-1847 is available and we can have an ingestion spark job > and a compaction job running at the same time. > Assume we have a timestamp for each HoodieTimeLine object which represent the > time it generated from hdfs. > Considering following case, > 1. ingestion schedule compaction inline. Now we have a timeline: > 1.deltaCommit.Completed, 2.Compaction.Requested (TimeStamp: 1L) > 2. Then ingestion keep move on. We now have 1.deltaCommit.Completed, > 2.Compaction.Requested 3.deltaCommit.Inflight (TimeStamp: 2L) in ingestion > job. > 3. We have an independent Spark job run compaction 2. We now have > 1.deltaCommit.Completed, 2.Compaction.Inflight 3.deltaCommit.Inflight > (TimeStamp: 3L) > 4. Executors in ingestion job send request to timeline server, now they hold > timeline with TimeStamp 2L. But Timeline Server have timestamp 3L which is > later than client. > According to the logic in > https://github.com/apache/hudi/blob/master/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java#L137, > > we thought local view of table's timeline is behind that of client's view as > long as the timeline hashes are different. However this may not be true in > the case mentioned above. > Here the hashes are different because client view is behind local view. > A simple solution is to add an attribute to timeline which is the timestamp > we used above. > And timeline server may determine whether to sync fileSystemView by comparing > timestamps between client and local rather than the difference between > timeline hashes. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (HUDI-2400) Allow timeline server correctly sync when concurrent write to timeline
[ https://issues.apache.org/jira/browse/HUDI-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan closed HUDI-2400. --- Resolution: Duplicate > Allow timeline server correctly sync when concurrent write to timeline > -- > > Key: HUDI-2400 > URL: https://issues.apache.org/jira/browse/HUDI-2400 > Project: Apache Hudi > Issue Type: Sub-task > Components: Compaction >Reporter: ZiyueGuan >Priority: Major > > Firstly, assume HUDI-1847 is available and we can have an ingestion spark job > and a compaction job running at the same time. > Assume we have a timestamp for each HoodieTimeLine object which represent the > time it generated from hdfs. > Considering following case, > 1. ingestion schedule compaction inline. Now we have a timeline: > 1.deltaCommit.Completed, 2.Compaction.Requested (TimeStamp: 1L) > 2. Then ingestion keep move on. We now have 1.deltaCommit.Completed, > 2.Compaction.Requested 3.deltaCommit.Inflight (TimeStamp: 2L) in ingestion > job. > 3. We have an independent Spark job run compaction 2. We now have > 1.deltaCommit.Completed, 2.Compaction.Inflight 3.deltaCommit.Inflight > (TimeStamp: 3L) > 4. Executors in ingestion job send request to timeline server, now they hold > timeline with TimeStamp 2L. But Timeline Server have timestamp 3L which is > later than client. > According to the logic in > https://github.com/apache/hudi/blob/master/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java#L137, > > we thought local view of table's timeline is behind that of client's view as > long as the timeline hashes are different. However this may not be true in > the case mentioned above. > Here the hashes are different because client view is behind local view. > A simple solution is to add an attribute to timeline which is the timestamp > we used above. > And timeline server may determine whether to sync fileSystemView by comparing > timestamps between client and local rather than the difference between > timeline hashes. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (HUDI-3026) HoodieAppendhandle may result in duplicate key for hbase index
ZiyueGuan created HUDI-3026: --- Summary: HoodieAppendhandle may result in duplicate key for hbase index Key: HUDI-3026 URL: https://issues.apache.org/jira/browse/HUDI-3026 Project: Apache Hudi Issue Type: Bug Reporter: ZiyueGuan Problem: a same key may occur in two file group. These two file group will have same FileID prefix. How to repro: We should have a table w/o record sorted in spark. Let's say we have 1,2,3,4,5 records to write. They may be iterated in different order. In the first attempt 1, we write 543 to fileID_1_log.1_attempt1. But this attempt failed. Spark will have a try in the second task attempt (attempt 2), we write 1234 to fileID_1_log.1_attempt2. And then, we find this filegroup is large enough by call canWrite. So hudi write record 5 to fileID_2_log.1_attempt2 and finish this commit. When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we also got 5 in fileID_2. Record 5 will appear in two fileGroup. Reason: Markerfile doesn't reconcile log file as code show in [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.] And log file is actually not fail-safe. I'm not sure if [~danny0405] have found this problem too as I find FlinkAppendHandle had been made to always return true. But it was just changed back recently. Solution: We may have a quick fix by making canWrite in HoodieAppendHandle always return true. However, I think there may be a more elegant solution that we use append result to generate compaction plan rather than list log file, in which we will have a more granular control on log block instead of log file. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (HUDI-3026) HoodieAppendhandle may result in duplicate key for hbase index
[ https://issues.apache.org/jira/browse/HUDI-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan reassigned HUDI-3026: --- Assignee: ZiyueGuan > HoodieAppendhandle may result in duplicate key for hbase index > -- > > Key: HUDI-3026 > URL: https://issues.apache.org/jira/browse/HUDI-3026 > Project: Apache Hudi > Issue Type: Bug >Reporter: ZiyueGuan >Assignee: ZiyueGuan >Priority: Major > > Problem: a same key may occur in two file group. These two file group will > have same FileID prefix. > How to repro: > We should have a table w/o record sorted in spark. Let's say we have > 1,2,3,4,5 records to write. They may be iterated in different order. > In the first attempt 1, we write 543 to fileID_1_log.1_attempt1. But this > attempt failed. Spark will have a try in the second task attempt (attempt 2), > we write 1234 to fileID_1_log.1_attempt2. And then, we find this filegroup > is large enough by call canWrite. So hudi write record 5 to > fileID_2_log.1_attempt2 and finish this commit. > When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 > will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we > also got 5 in fileID_2. Record 5 will appear in two fileGroup. > Reason: Markerfile doesn't reconcile log file as code show in > [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.] > And log file is actually not fail-safe. > I'm not sure if [~danny0405] have found this problem too as I find > FlinkAppendHandle had been made to always return true. But it was just > changed back recently. > Solution: > We may have a quick fix by making canWrite in HoodieAppendHandle always > return true. However, I think there may be a more elegant solution that we > use append result to generate compaction plan rather than list log file, in > which we will have a more granular control on log block instead of log file. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (HUDI-2917) Rollback may be incorrect for canIndexLogFile index
[ https://issues.apache.org/jira/browse/HUDI-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan reassigned HUDI-2917: --- Assignee: ZiyueGuan > Rollback may be incorrect for canIndexLogFile index > --- > > Key: HUDI-2917 > URL: https://issues.apache.org/jira/browse/HUDI-2917 > Project: Apache Hudi > Issue Type: Bug > Components: Common Core >Reporter: ZiyueGuan >Assignee: ZiyueGuan >Priority: Major > > Problem: > we may find some data which should be rollbacked in hudi table. > Root cause: > Let's first recall how rollback plan generated about log blocks for > deltaCommit. Hudi takes two cases into consideration. > # For some log file with no base file, they are comprised by records which > are all 'insert record'. Delete them directly. Here we assume all inserted > record should be covered by this way. > # For those fileID which are updated according to inflight commit meta of > instant we want to rollback, we append command block to these log file to > rollback. Here all updated record are handled. > However, the first condition is not always true. For indexes which can index > log file, they could insert record to some existing log file. In current > process, inflight hoodieCommitMeta was generated before they are assigned to > specific filegroup. > > Fix: > What's needed to fix this problem, we need to use the result of partitioner > to generate hoodieCommitMeta rather than workProfile. Also, we may need more > comments in rollback code to remind this case. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (HUDI-3026) HoodieAppendhandle may result in duplicate key for hbase index
[ https://issues.apache.org/jira/browse/HUDI-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan updated HUDI-3026: Description: Problem: a same key may occur in two file group when Hbase index is used. These two file group will have same FileID prefix. As Hbase index is global, this is unexpected How to repro: We should have a table w/o record sorted in spark. Let's say we have 1,2,3,4,5 records to write. They may be iterated in different order. In the first attempt 1, we write 543 to fileID_1_log.1_attempt1. But this attempt failed. Spark will have a try in the second task attempt (attempt 2), we write 1234 to fileID_1_log.1_attempt2. And then, we find this filegroup is large enough by call canWrite. So hudi write record 5 to fileID_2_log.1_attempt2 and finish this commit. When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we also got 5 in fileID_2. Record 5 will appear in two fileGroup. Reason: Markerfile doesn't reconcile log file as code show in [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.] And log file is actually not fail-safe. I'm not sure if [~danny0405] have found this problem too as I find FlinkAppendHandle had been made to always return true. But it was just changed back recently. Solution: We may have a quick fix by making canWrite in HoodieAppendHandle always return true. However, I think there may be a more elegant solution that we use append result to generate compaction plan rather than list log file, in which we will have a more granular control on log block instead of log file. was: Problem: a same key may occur in two file group. These two file group will have same FileID prefix. How to repro: We should have a table w/o record sorted in spark. Let's say we have 1,2,3,4,5 records to write. They may be iterated in different order. In the first attempt 1, we write 543 to fileID_1_log.1_attempt1. But this attempt failed. Spark will have a try in the second task attempt (attempt 2), we write 1234 to fileID_1_log.1_attempt2. And then, we find this filegroup is large enough by call canWrite. So hudi write record 5 to fileID_2_log.1_attempt2 and finish this commit. When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we also got 5 in fileID_2. Record 5 will appear in two fileGroup. Reason: Markerfile doesn't reconcile log file as code show in [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.] And log file is actually not fail-safe. I'm not sure if [~danny0405] have found this problem too as I find FlinkAppendHandle had been made to always return true. But it was just changed back recently. Solution: We may have a quick fix by making canWrite in HoodieAppendHandle always return true. However, I think there may be a more elegant solution that we use append result to generate compaction plan rather than list log file, in which we will have a more granular control on log block instead of log file. > HoodieAppendhandle may result in duplicate key for hbase index > -- > > Key: HUDI-3026 > URL: https://issues.apache.org/jira/browse/HUDI-3026 > Project: Apache Hudi > Issue Type: Bug >Reporter: ZiyueGuan >Assignee: ZiyueGuan >Priority: Major > Labels: pull-request-available > > Problem: a same key may occur in two file group when Hbase index is used. > These two file group will have same FileID prefix. As Hbase index is global, > this is unexpected > How to repro: > We should have a table w/o record sorted in spark. Let's say we have > 1,2,3,4,5 records to write. They may be iterated in different order. > In the first attempt 1, we write 543 to fileID_1_log.1_attempt1. But this > attempt failed. Spark will have a try in the second task attempt (attempt 2), > we write 1234 to fileID_1_log.1_attempt2. And then, we find this filegroup > is large enough by call canWrite. So hudi write record 5 to > fileID_2_log.1_attempt2 and finish this commit. > When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 > will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we > also got 5 in fileID_2. Record 5 will appear in two fileGroup. > Reason: Markerfile doesn't reconcile log file as code show in > [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/Ho
[jira] [Commented] (HUDI-1517) Create marker file for every log file
[ https://issues.apache.org/jira/browse/HUDI-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495169#comment-17495169 ] ZiyueGuan commented on HUDI-1517: - Hi [~shivnarayan], Do you have a plan to pick this up recently? > Create marker file for every log file > - > > Key: HUDI-1517 > URL: https://issues.apache.org/jira/browse/HUDI-1517 > Project: Apache Hudi > Issue Type: Improvement > Components: writer-core >Reporter: sivabalan narayanan >Priority: Major > > As of now, hudi creates marker file based on base file. But we might need to > fix this to create one marker file per log file denoting the actual log file > info. We can leverage this during metadata sync w/ rollback and restore > rather than doing a file listing. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (HUDI-3694) Not use magic number of next block to determine current log block
ZiyueGuan created HUDI-3694: --- Summary: Not use magic number of next block to determine current log block Key: HUDI-3694 URL: https://issues.apache.org/jira/browse/HUDI-3694 Project: Apache Hudi Issue Type: Bug Reporter: ZiyueGuan HoodieLogFileReader use magic number of next log block to determine if current log block is corrupted. But when next is corrupted but current not. It will make it incorrect -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (HUDI-3694) Not use magic number of next block to determine current log block
[ https://issues.apache.org/jira/browse/HUDI-3694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan updated HUDI-3694: Description: HoodieLogFileReader use magic number of next log block to determine if current log block is corrupted. However, when next block has a corrupted magic number, we will abandon current block, which leads to data loss. (was: HoodieLogFileReader use magic number of next log block to determine if current log block is corrupted. But when next is corrupted but current not. It will make it incorrect) > Not use magic number of next block to determine current log block > - > > Key: HUDI-3694 > URL: https://issues.apache.org/jira/browse/HUDI-3694 > Project: Apache Hudi > Issue Type: Bug >Reporter: ZiyueGuan >Priority: Major > > HoodieLogFileReader use magic number of next log block to determine if > current log block is corrupted. However, when next block has a corrupted > magic number, we will abandon current block, which leads to data loss. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (HUDI-4055) use loop replace recursive call in ratelimiter
ZiyueGuan created HUDI-4055: --- Summary: use loop replace recursive call in ratelimiter Key: HUDI-4055 URL: https://issues.apache.org/jira/browse/HUDI-4055 Project: Apache Hudi Issue Type: Bug Components: index Reporter: ZiyueGuan rate limiter recursively call to acquire which may lead stack over flow -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (HUDI-4055) use loop replace recursive call in ratelimiter
[ https://issues.apache.org/jira/browse/HUDI-4055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17533485#comment-17533485 ] ZiyueGuan commented on HUDI-4055: - https://github.com/apache/hudi/pull/5530 > use loop replace recursive call in ratelimiter > -- > > Key: HUDI-4055 > URL: https://issues.apache.org/jira/browse/HUDI-4055 > Project: Apache Hudi > Issue Type: Bug > Components: index >Reporter: ZiyueGuan >Priority: Minor > Labels: pull-request-available > > rate limiter recursively call to acquire which may lead stack over flow -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (HUDI-4912) Make write status idempotent
ZiyueGuan created HUDI-4912: --- Summary: Make write status idempotent Key: HUDI-4912 URL: https://issues.apache.org/jira/browse/HUDI-4912 Project: Apache Hudi Issue Type: Bug Components: index Reporter: ZiyueGuan HBase Index update some times not inconsistent with data. The main reason is that the result of task is not idempotent. A task run two times may get different bucket assign result. * Hudi on spark cache write status on executor. Once executor exits before commit, wrtie status will be regenerated. However, hbase index is updated by previous write status and will not be updated by new write status. * When we use speculation in bulkinsert, hbase index is updated concurrently. Though only one task can succeed, it doesn't mean that all content in index is updated by this task. Those content updated by other failed task may be inconsistent with data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-4965) automatically adapt COMMITS_ARCHIVAL_BATCH_SIZE
ZiyueGuan created HUDI-4965: --- Summary: automatically adapt COMMITS_ARCHIVAL_BATCH_SIZE Key: HUDI-4965 URL: https://issues.apache.org/jira/browse/HUDI-4965 Project: Apache Hudi Issue Type: Improvement Reporter: ZiyueGuan COMMITS_ARCHIVAL_BATCH_SIZE is used to determine how many records will write to archived timeline. However, if there are several huge instants which leads to a huge batch, it cannot be written as log block is overflow. So determine batch size by instants binary size. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (HUDI-3026) HoodieAppendhandle may result in duplicate key for hbase index
[ https://issues.apache.org/jira/browse/HUDI-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726947#comment-17726947 ] ZiyueGuan commented on HUDI-3026: - This bug is fixed by HUDI-1517. In HUDI-1517, we allow marker file for log file use create type. At the end of commit (reconcile process), we will compare marker file with commit write status. Any illegal log files which are created during writing will be deleted, which works same as marker file for base file. See {code:java} HoodieTable.reconcileAgainstMarkers{code} > HoodieAppendhandle may result in duplicate key for hbase index > -- > > Key: HUDI-3026 > URL: https://issues.apache.org/jira/browse/HUDI-3026 > Project: Apache Hudi > Issue Type: Bug >Reporter: ZiyueGuan >Assignee: ZiyueGuan >Priority: Critical > Labels: pull-request-available > Fix For: 0.14.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > Problem: a same key may occur in two file group when Hbase index is used. > These two file group will have same FileID prefix. As Hbase index is global, > this is unexpected > How to repro: > We should have a table w/o record sorted in spark. Let's say we have five > records with key 1,2,3,4,5 to write. They may be iterated in different order. > In the first attempt 1, we write three records 5,4,3 to > fileID_1_log.1_attempt1. But this attempt failed. Spark will have a try in > the second task attempt (attempt 2), we write four records 1,2,3,4 to > fileID_1_log.1_attempt2. And then, we find this filegroup is large enough by > call canWrite. So hudi write record 5 to fileID_2_log.1_attempt2 and finish > this commit. > When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 > will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we > also got 5 in fileID_2. Record 5 will appear in two fileGroup. > Reason: Markerfile doesn't reconcile log file as code show in > [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.] > And log file is actually not fail-safe. > I'm not sure if [~danny0405] have found this problem too as I find > FlinkAppendHandle had been made to always return true. But it was just > changed back recently. > Solution: > We may have a quick fix by making canWrite in HoodieAppendHandle always > return true. However, I think there may be a more elegant solution that we > use append result to generate compaction plan rather than list log file, in > which we will have a more granular control on log block instead of log file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-6401) should not throw exception when create marker file for log file
ZiyueGuan created HUDI-6401: --- Summary: should not throw exception when create marker file for log file Key: HUDI-6401 URL: https://issues.apache.org/jira/browse/HUDI-6401 Project: Apache Hudi Issue Type: Bug Reporter: ZiyueGuan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-6401) should not throw exception when create marker file for log file
[ https://issues.apache.org/jira/browse/HUDI-6401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan updated HUDI-6401: Description: when spark task failed or speculation enabled, different task will create marker file for the same log file. We should use createIfNotExists to check the result of create marker file rather than let it throw exception > should not throw exception when create marker file for log file > --- > > Key: HUDI-6401 > URL: https://issues.apache.org/jira/browse/HUDI-6401 > Project: Apache Hudi > Issue Type: Bug >Reporter: ZiyueGuan >Priority: Major > > when spark task failed or speculation enabled, different task will create > marker file for the same log file. We should use createIfNotExists to check > the result of create marker file rather than let it throw exception -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (HUDI-6401) should not throw exception when create marker file for log file
[ https://issues.apache.org/jira/browse/HUDI-6401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZiyueGuan reassigned HUDI-6401: --- Assignee: ZiyueGuan > should not throw exception when create marker file for log file > --- > > Key: HUDI-6401 > URL: https://issues.apache.org/jira/browse/HUDI-6401 > Project: Apache Hudi > Issue Type: Bug >Reporter: ZiyueGuan >Assignee: ZiyueGuan >Priority: Major > > when spark task failed or speculation enabled, different task will create > marker file for the same log file. We should use createIfNotExists to check > the result of create marker file rather than let it throw exception -- This message was sent by Atlassian Jira (v8.20.10#820010)