[jira] [Created] (HUDI-1795) allow ExternalSpillMap use accurate payload size rather than estimated

2021-04-14 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-1795:
---

 Summary: allow ExternalSpillMap use accurate payload size rather 
than estimated
 Key: HUDI-1795
 URL: https://issues.apache.org/jira/browse/HUDI-1795
 Project: Apache Hudi
  Issue Type: Improvement
  Components: Compaction
Reporter: ZiyueGuan


Situation: In ExternalSpillMap, we need to control the amount of data in memory 
map to avoid OOM. Currently, we evaluate this by estimate the average size of 
each payload twice. And get total memory use by multiple average payload size 
with payload number. The first time we get the size is when first payload is 
inserted while the second time is when there are 100 payloads stored in memory. 

Problem: If the size is underestimated in the second estimation, an OOM will 
happen.

Plan: Could we have a flag to control if we want an evaluation in accurate?

Currently, I have several ideas but not sure which one could be the best or if 
there are any better one.
 # Estimate each payload, store the length of payload with its value.  Once 
update or remove happen, use diff old length and add new length if needed so 
that we keep the sum of all payload size precisely. This is the method I 
currently use in prod.
 # Do not store the length but evaluate old payload again when it is popped. It 
trades off space against time comparing to method one. A better performance may 
be reached when updating and removing are rare. I didn't adopt this because I 
had profile ingestion process by arthas and found size estimating in that may 
be time consuming in flame graph. But I'm not sure whether it is true in 
compaction. In my intuition,HoodieRecordPayload has a quite simple structure.
 # I also have a more accurate estimate method that is evaluate the whole map 
when size is 1,100,1 and one million. Less underestimate will happen in 
such large amount of data.

Look forward to any advice or suggestion or discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (HUDI-1796) allow ExternalSpillMap use accurate payload size rather than estimated

2021-04-14 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-1796:
---

 Summary: allow ExternalSpillMap use accurate payload size rather 
than estimated
 Key: HUDI-1796
 URL: https://issues.apache.org/jira/browse/HUDI-1796
 Project: Apache Hudi
  Issue Type: Improvement
  Components: Compaction
Reporter: ZiyueGuan


Situation: In ExternalSpillMap, we need to control the amount of data in memory 
map to avoid OOM. Currently, we evaluate this by estimate the average size of 
each payload twice. And get total memory use by multiple average payload size 
with payload number. The first time we get the size is when first payload is 
inserted while the second time is when there are 100 payloads stored in memory. 

Problem: If the size is underestimated in the second estimation, an OOM will 
happen.

Plan: Could we have a flag to control if we want an evaluation in accurate?

Currently, I have several ideas but not sure which one could be the best or if 
there are any better one.
 # Estimate each payload, store the length of payload with its value.  Once 
update or remove happen, use diff old length and add new length if needed so 
that we keep the sum of all payload size precisely. This is the method I 
currently use in prod.
 # Do not store the length but evaluate old payload again when it is popped. It 
trades off space against time comparing to method one. A better performance may 
be reached when updating and removing are rare. I didn't adopt this because I 
had profile ingestion process by arthas and found size estimating in that may 
be time consuming in flame graph. But I'm not sure whether it is true in 
compaction. In my intuition,HoodieRecordPayload has a quite simple structure.
 # I also have a more accurate estimate method that is evaluate the whole map 
when size is 1,100,1 and one million. Less underestimate will happen in 
such large amount of data.

Look forward to any advice or suggestion or discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (HUDI-1795) allow ExternalSpillMap use accurate payload size rather than estimated

2021-04-14 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-1795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan closed HUDI-1795.
---
Resolution: Duplicate

> allow ExternalSpillMap use accurate payload size rather than estimated
> --
>
> Key: HUDI-1795
> URL: https://issues.apache.org/jira/browse/HUDI-1795
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: Compaction
>Reporter: ZiyueGuan
>Priority: Major
>
> Situation: In ExternalSpillMap, we need to control the amount of data in 
> memory map to avoid OOM. Currently, we evaluate this by estimate the average 
> size of each payload twice. And get total memory use by multiple average 
> payload size with payload number. The first time we get the size is when 
> first payload is inserted while the second time is when there are 100 
> payloads stored in memory. 
> Problem: If the size is underestimated in the second estimation, an OOM will 
> happen.
> Plan: Could we have a flag to control if we want an evaluation in accurate?
> Currently, I have several ideas but not sure which one could be the best or 
> if there are any better one.
>  # Estimate each payload, store the length of payload with its value.  Once 
> update or remove happen, use diff old length and add new length if needed so 
> that we keep the sum of all payload size precisely. This is the method I 
> currently use in prod.
>  # Do not store the length but evaluate old payload again when it is popped. 
> It trades off space against time comparing to method one. A better 
> performance may be reached when updating and removing are rare. I didn't 
> adopt this because I had profile ingestion process by arthas and found size 
> estimating in that may be time consuming in flame graph. But I'm not sure 
> whether it is true in compaction. In my intuition,HoodieRecordPayload has a 
> quite simple structure.
>  # I also have a more accurate estimate method that is evaluate the whole map 
> when size is 1,100,1 and one million. Less underestimate will happen in 
> such large amount of data.
> Look forward to any advice or suggestion or discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (HUDI-1796) allow ExternalSpillMap use accurate payload size rather than estimated

2021-04-15 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-1796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan updated HUDI-1796:

Description: 
Situation: In ExternalSpillMap, we need to control the amount of data in memory 
map to avoid OOM. Currently, we evaluate this by estimate the average size of 
each payload twice. And get total memory use by multiplying average payload 
size with payload number. The first time we get the size is when first payload 
is inserted while the second time is when there are 100 payloads stored in 
memory. 

Problem: If the size is underestimated in the second estimation, an OOM will 
happen.

Plan: Could we have a flag to control if we want an evaluation in accurate?

Currently, I have several ideas but not sure which one could be the best or if 
there are any better one.
 # Estimate each payload, store the length of payload with its value.  Once 
update or remove happen, use diff old length and add new length if needed so 
that we keep the sum of all payload size precisely. This is the method I 
currently use in prod.
 # Do not store the length but evaluate old payload again when it is popped. It 
trades off space against time comparing to method one. A better performance may 
be reached when updating and removing are rare. I didn't adopt this because I 
had profile ingestion process by arthas and found size estimating in that may 
be time consuming in flame graph. But I'm not sure whether it is true in 
compaction. In my intuition,HoodieRecordPayload has a quite simple structure.
 # I also have a more accurate estimate method that is evaluate the whole map 
when size is 1,100,1 and one million. Less underestimate will happen in 
such large amount of data.

Look forward to any advice or suggestion or discussion.

  was:
Situation: In ExternalSpillMap, we need to control the amount of data in memory 
map to avoid OOM. Currently, we evaluate this by estimate the average size of 
each payload twice. And get total memory use by multiple average payload size 
with payload number. The first time we get the size is when first payload is 
inserted while the second time is when there are 100 payloads stored in memory. 

Problem: If the size is underestimated in the second estimation, an OOM will 
happen.

Plan: Could we have a flag to control if we want an evaluation in accurate?

Currently, I have several ideas but not sure which one could be the best or if 
there are any better one.
 # Estimate each payload, store the length of payload with its value.  Once 
update or remove happen, use diff old length and add new length if needed so 
that we keep the sum of all payload size precisely. This is the method I 
currently use in prod.
 # Do not store the length but evaluate old payload again when it is popped. It 
trades off space against time comparing to method one. A better performance may 
be reached when updating and removing are rare. I didn't adopt this because I 
had profile ingestion process by arthas and found size estimating in that may 
be time consuming in flame graph. But I'm not sure whether it is true in 
compaction. In my intuition,HoodieRecordPayload has a quite simple structure.
 # I also have a more accurate estimate method that is evaluate the whole map 
when size is 1,100,1 and one million. Less underestimate will happen in 
such large amount of data.

Look forward to any advice or suggestion or discussion.


> allow ExternalSpillMap use accurate payload size rather than estimated
> --
>
> Key: HUDI-1796
> URL: https://issues.apache.org/jira/browse/HUDI-1796
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: Compaction
>Reporter: ZiyueGuan
>Priority: Major
>
> Situation: In ExternalSpillMap, we need to control the amount of data in 
> memory map to avoid OOM. Currently, we evaluate this by estimate the average 
> size of each payload twice. And get total memory use by multiplying average 
> payload size with payload number. The first time we get the size is when 
> first payload is inserted while the second time is when there are 100 
> payloads stored in memory. 
> Problem: If the size is underestimated in the second estimation, an OOM will 
> happen.
> Plan: Could we have a flag to control if we want an evaluation in accurate?
> Currently, I have several ideas but not sure which one could be the best or 
> if there are any better one.
>  # Estimate each payload, store the length of payload with its value.  Once 
> update or remove happen, use diff old length and add new length if needed so 
> that we keep the sum of all payload size precisely. This is the method I 
> currently use in prod.
>  # Do not store the length but evaluate old payload again when it is popped. 
> It trades off space against time comparing to method one. A b

[jira] [Created] (HUDI-1875) Improve perf of MOR table upsert based on HDFS

2021-05-04 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-1875:
---

 Summary: Improve perf of MOR table upsert based on HDFS
 Key: HUDI-1875
 URL: https://issues.apache.org/jira/browse/HUDI-1875
 Project: Apache Hudi
  Issue Type: Improvement
Reporter: ZiyueGuan


Problem: When we use upsert in MOR table, hudi assign one task for one fileId 
which needs to be created or updated. In such situation, near one million tasks 
may be created in most of which may simply append few records to a fileId. Such 
process may be slow and a few skew tasks appear.

Reason: hudi use hsync to guarantee data is stored properly.  Call hsync so 
much times towards a hdfs cluster in 2 minutes or less will lead to high IOPS 
for disks. In addition to this, creating too much tasks brings high overhead of 
scheduling tasks against append two or three records to a file.

TODO: 

Option One: use hflush instead of hsync. This may lead data loss when all DN 
shutdown at the same time. However, this has a quite low chance to occur when 
HDFS deploy across AZ.

Option two: make hsync process asynchronous and let more than one writing 
process run in the same task. This will reduce the task numbers but increase 
mem use.

I may first try option one as it is simple enough.

When



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (HUDI-3026) HoodieAppendhandle may result in duplicate key for hbase index

2022-01-05 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan updated HUDI-3026:

Description: 
Problem: a same key may occur in two file group when Hbase index is used. These 
two file group will have same FileID prefix. As Hbase index is global, this is 
unexpected

How to repro:

We should have a table w/o record sorted in spark. Let's say we have five 
records with key 1,2,3,4,5 to write. They may be iterated in different order. 

In the first attempt 1, we write three records 5,4,3 to 
fileID_1_log.1_attempt1. But this attempt failed. Spark will have a try in the 
second task attempt (attempt 2), we write four records 1,2,3,4 to  
fileID_1_log.1_attempt2. And then, we find this filegroup is large enough by 
call canWrite. So hudi write record 5 to fileID_2_log.1_attempt2 and finish 
this commit.

When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 will 
be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we also 
got 5 in fileID_2. Record 5 will appear in two fileGroup.

Reason: Markerfile doesn't reconcile log file as code show in  
[https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.]

And log file is actually not fail-safe.

I'm not sure if [~danny0405] have found this problem too as I find 
FlinkAppendHandle had been made to always return true. But it was just changed 
back recently. 

Solution:

We may have a quick fix by making canWrite in HoodieAppendHandle always return 
true. However, I think there may be a more elegant solution that we use append 
result to generate compaction plan rather than list log file, in which we will 
have a more granular control on log block instead of log file. 

  was:
Problem: a same key may occur in two file group when Hbase index is used. These 
two file group will have same FileID prefix. As Hbase index is global, this is 
unexpected

How to repro:

We should have a table w/o record sorted in spark. Let's say we have 1,2,3,4,5 
records to write. They may be iterated in different order. 

In the first attempt 1, we write 543 to fileID_1_log.1_attempt1. But this 
attempt failed. Spark will have a try in the second task attempt (attempt 2), 
we write 1234 to  fileID_1_log.1_attempt2. And then, we find this filegroup is 
large enough by call canWrite. So hudi write record 5 to 
fileID_2_log.1_attempt2 and finish this commit.

When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 will 
be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we also 
got 5 in fileID_2. Record 5 will appear in two fileGroup.

Reason: Markerfile doesn't reconcile log file as code show in  
[https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.]

And log file is actually not fail-safe.

I'm not sure if [~danny0405] have found this problem too as I find 
FlinkAppendHandle had been made to always return true. But it was just changed 
back recently. 

Solution:

We may have a quick fix by making canWrite in HoodieAppendHandle always return 
true. However, I think there may be a more elegant solution that we use append 
result to generate compaction plan rather than list log file, in which we will 
have a more granular control on log block instead of log file. 


> HoodieAppendhandle may result in duplicate key for hbase index
> --
>
> Key: HUDI-3026
> URL: https://issues.apache.org/jira/browse/HUDI-3026
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>  Labels: pull-request-available
>
> Problem: a same key may occur in two file group when Hbase index is used. 
> These two file group will have same FileID prefix. As Hbase index is global, 
> this is unexpected
> How to repro:
> We should have a table w/o record sorted in spark. Let's say we have five 
> records with key 1,2,3,4,5 to write. They may be iterated in different order. 
> In the first attempt 1, we write three records 5,4,3 to 
> fileID_1_log.1_attempt1. But this attempt failed. Spark will have a try in 
> the second task attempt (attempt 2), we write four records 1,2,3,4 to  
> fileID_1_log.1_attempt2. And then, we find this filegroup is large enough by 
> call canWrite. So hudi write record 5 to fileID_2_log.1_attempt2 and finish 
> this commit.
> When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 
> will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we 
> also got 5 in fileID_2. Record 5 will appear in two fileGroup.
> Reason: Markerfile doesn't reconcile log file 

[jira] [Commented] (HUDI-3026) HoodieAppendhandle may result in duplicate key for hbase index

2022-01-05 Thread ZiyueGuan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469656#comment-17469656
 ] 

ZiyueGuan commented on HUDI-3026:
-

Thanks for your kind explanation. I have few experience about hudi on flink. 
This problem may only occur w/ spark.

> HoodieAppendhandle may result in duplicate key for hbase index
> --
>
> Key: HUDI-3026
> URL: https://issues.apache.org/jira/browse/HUDI-3026
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>  Labels: pull-request-available
>
> Problem: a same key may occur in two file group when Hbase index is used. 
> These two file group will have same FileID prefix. As Hbase index is global, 
> this is unexpected
> How to repro:
> We should have a table w/o record sorted in spark. Let's say we have five 
> records with key 1,2,3,4,5 to write. They may be iterated in different order. 
> In the first attempt 1, we write three records 5,4,3 to 
> fileID_1_log.1_attempt1. But this attempt failed. Spark will have a try in 
> the second task attempt (attempt 2), we write four records 1,2,3,4 to  
> fileID_1_log.1_attempt2. And then, we find this filegroup is large enough by 
> call canWrite. So hudi write record 5 to fileID_2_log.1_attempt2 and finish 
> this commit.
> When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 
> will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we 
> also got 5 in fileID_2. Record 5 will appear in two fileGroup.
> Reason: Markerfile doesn't reconcile log file as code show in  
> [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.]
> And log file is actually not fail-safe.
> I'm not sure if [~danny0405] have found this problem too as I find 
> FlinkAppendHandle had been made to always return true. But it was just 
> changed back recently. 
> Solution:
> We may have a quick fix by making canWrite in HoodieAppendHandle always 
> return true. However, I think there may be a more elegant solution that we 
> use append result to generate compaction plan rather than list log file, in 
> which we will have a more granular control on log block instead of log file. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (HUDI-2400) Allow timeline server correctly sync when concurrent write to timeline

2021-09-05 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-2400:
---

 Summary: Allow timeline server correctly sync when concurrent 
write to timeline
 Key: HUDI-2400
 URL: https://issues.apache.org/jira/browse/HUDI-2400
 Project: Apache Hudi
  Issue Type: Sub-task
  Components: Compaction
Reporter: ZiyueGuan


Firstly, assume HUDI-1847 is available and we can have an ingestion spark job 
and a compaction job running at the same time.
Assume we have a timestamp for each HoodieTimeLine object which represent the 
time it generated from hdfs.
Considering following case,
 1. ingestion schedule compaction inline. Now we have a timeline: 
1.deltaCommit.Completed, 2.Compaction.Requested (TimeStamp: 1L)
 2. Then ingestion keep move on. We now have 1.deltaCommit.Completed, 
2.Compaction.Requested 3.deltaCommit.Inflight (TimeStamp: 2L) in ingestion job.
 3. We have an independent Spark job run compaction 2. We now have 
1.deltaCommit.Completed, 2.Compaction.Inflight 3.deltaCommit.Inflight 
(TimeStamp: 3L)
 4. Executors in ingestion job send request to timeline server, now they hold 
timeline with TimeStamp 2L. But Timeline Server have timestamp 3L which is 
later than client.

According to the logic in 
https://github.com/apache/hudi/blob/master/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java#L137,
 
we thought local view of table's timeline is behind that of client's view as 
long as the timeline hashes are different. However this may not be true in the 
case mentioned above.
Here the hashes are different because client view is behind local view.

A simple solution is to add an attribute to timeline which is the timestamp we 
used above. 
And timeline server may determine whether to sync fileSystemView by comparing 
timestamps between client and local rather than the difference between timeline 
hashes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (HUDI-2665) Overflow of DataOutputStream may lead to corrupted log block

2021-11-02 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-2665:
---

 Summary: Overflow of DataOutputStream may lead to corrupted log 
block
 Key: HUDI-2665
 URL: https://issues.apache.org/jira/browse/HUDI-2665
 Project: Apache Hudi
  Issue Type: Bug
Reporter: ZiyueGuan


In HoodieLogFormatWriter, we use size() method of DataOutputStream to calculate 
the size of log block we write. However, this method only allows size no more 
than Integer.MAX_VALUE. When bytes we writen overflow, we will get a corrupted 
log block as the size of header is inconsistent with the one at footer

https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (HUDI-2665) Overflow of DataOutputStream may lead to corrupted log block

2021-11-02 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan reassigned HUDI-2665:
---

Assignee: ZiyueGuan

> Overflow of DataOutputStream may lead to corrupted log block
> 
>
> Key: HUDI-2665
> URL: https://issues.apache.org/jira/browse/HUDI-2665
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>
> In HoodieLogFormatWriter, we use size() method of DataOutputStream to 
> calculate the size of log block we write. However, this method only allows 
> size no more than Integer.MAX_VALUE. When bytes we writen overflow, we will 
> get a corrupted log block as the size of header is inconsistent with the one 
> at footer
> https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (HUDI-2665) Overflow of DataOutputStream may lead to corrupted log block

2021-11-02 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan updated HUDI-2665:

Priority: Minor  (was: Major)

> Overflow of DataOutputStream may lead to corrupted log block
> 
>
> Key: HUDI-2665
> URL: https://issues.apache.org/jira/browse/HUDI-2665
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Minor
>
> In HoodieLogFormatWriter, we use size() method of DataOutputStream to 
> calculate the size of log block we write. However, this method only allows 
> size no more than Integer.MAX_VALUE. When bytes we writen overflow, we will 
> get a corrupted log block as the size of header is inconsistent with the one 
> at footer
> https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (HUDI-2031) JVM occasionally crashes during compaction when spark speculative execution is enabled

2021-11-03 Thread ZiyueGuan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17437937#comment-17437937
 ] 

ZiyueGuan commented on HUDI-2031:
-

Any guys know the root cause of this problem? Curious about why an interrupt 
could lead to problems of parquet

> JVM occasionally crashes during compaction when spark speculative execution 
> is enabled
> --
>
> Key: HUDI-2031
> URL: https://issues.apache.org/jira/browse/HUDI-2031
> Project: Apache Hudi
>  Issue Type: Bug
>Affects Versions: 0.8.0
>Reporter: Rong Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.9.0
>
>
> This could happen when speculative execution is triggered. The duplicated 
> tasks are expected to terminate normally, but sometimes they cannot and will 
> cause the JVM crashes.
>  
>  From executor logs:
> {quote}ERROR [Executor task launch worker for task 6828] HoodieMergeHandle: 
> Error writing record  HoodieRecord{key=HoodieKey
> { recordKey=45246275517 partitionPath=2021-06-13}, currentLocation='null', 
> newLocation='null'}ERROR [Executor task launch worker for task 6828] 
> HoodieMergeHandle: Error writing record  HoodieRecord\{key=HoodieKey { 
> recordKey=45246275517 partitionPath=2021-06-13}
> , currentLocation='null', 
> newLocation='null'}java.lang.IllegalArgumentException: You cannot call 
> toBytes() more than once without calling reset() at 
> org.apache.parquet.Preconditions.checkArgument(Preconditions.java:53) at 
> org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder.toBytes(RunLengthBitPackingHybridEncoder.java:254)
>  at 
> org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter.getBytes(RunLengthBitPackingHybridValuesWriter.java:65)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:148)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.write(ColumnWriterV1.java:200) 
> at 
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addBinary(MessageColumnIO.java:469)
>  at 
> org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
>  at 
> org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
>  at 
> org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
>  at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165) 
> at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
>  at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299) at 
> org.apache.hudi.io.storage.HoodieParquetWriter.writeAvroWithMetadata(HoodieParquetWriter.java:83)
>  at 
> org.apache.hudi.io.HoodieMergeHandle.writeRecord(HoodieMergeHandle.java:252) 
> at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:336) at 
> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:107)
>  at 
> org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdateInternal(HoodieSparkCopyOnWriteTable.java:199)
>  at 
> org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdate(HoodieSparkCopyOnWriteTable.java:190)
>  at 
> org.apache.hudi.table.action.compact.HoodieSparkMergeOnReadTableCompactor.compact(HoodieSparkMergeOnReadTableCompactor.java:154)
>  at 
> org.apache.hudi.table.action.compact.HoodieSparkMergeOnReadTableCompactor.lambda$compact$9ec9d4c7$1(HoodieSparkMergeOnReadTableCompactor.java:105)
>  at 
> org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
>  at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at 
> scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) at 
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at 
> org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
>  at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
>  at 
> org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1388)
>  at 
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
>  at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362) 
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
>  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:311) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313)

[jira] [Created] (HUDI-2771) Handle FileNotExist exception in parquet Utils

2021-11-16 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-2771:
---

 Summary: Handle FileNotExist exception in parquet Utils
 Key: HUDI-2771
 URL: https://issues.apache.org/jira/browse/HUDI-2771
 Project: Apache Hudi
  Issue Type: Bug
Reporter: ZiyueGuan


As spark may use speculation. there may be two base file in one file group with 
same instant time. In such case, we check if parquet is valid by reading 
footer. However, a cleaning process may happen during this process, which lead 
to a FileNotFoundException. We may catch such exception and treat it as a 
invalid parquet. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (HUDI-2771) Handle FileNotExist exception in parquet Utils

2021-11-16 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan reassigned HUDI-2771:
---

Assignee: ZiyueGuan

> Handle FileNotExist exception in parquet Utils
> --
>
> Key: HUDI-2771
> URL: https://issues.apache.org/jira/browse/HUDI-2771
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>
> As spark may use speculation. there may be two base file in one file group 
> with same instant time. In such case, we check if parquet is valid by reading 
> footer. However, a cleaning process may happen during this process, which 
> lead to a FileNotFoundException. We may catch such exception and treat it as 
> a invalid parquet. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (HUDI-2771) Handle FileNotExist exception in parquet Utils

2021-11-18 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan closed HUDI-2771.
---
Resolution: Invalid

> Handle FileNotExist exception in parquet Utils
> --
>
> Key: HUDI-2771
> URL: https://issues.apache.org/jira/browse/HUDI-2771
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>
> As spark may use speculation. there may be two base file in one file group 
> with same instant time. In such case, we check if parquet is valid by reading 
> footer. However, a cleaning process may happen during this process, which 
> lead to a FileNotFoundException. We may catch such exception and treat it as 
> a invalid parquet. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2021-11-27 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan reassigned HUDI-2875:
---

Assignee: ZiyueGuan

> Concurrent call to HoodieMergeHandler cause parquet corruption
> --
>
> Key: HUDI-2875
> URL: https://issues.apache.org/jira/browse/HUDI-2875
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>
> Problem:
> Some corrupted parquet files are generated and exceptions will be thrown when 
> read.
> e.g.
>  
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read 
> page Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in 
> col  required binary col
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
>     at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
>     at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
>     at 
> org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>     at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>     at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>     ... 11 more
> Caused by: java.io.EOFException
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readFully(DataInputStream.java:169)
>     at 
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>     at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>     at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>     at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
>  
> How to reproduce:
> We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
> speculation. When speculation is triggered, other tasks at the same time or 
> later will have the risk to suffer a wrong parquet generation.  Nearly half 
> of them will throw exception while there is few tasks succeed without any 
> signal.
> RootCause:
> ParquetWriter is not thread safe. User of it should apply proper way to 
> guarantee that there is not concurrent call to ParquetWriter. In the 
> following code: 
> https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103
> We call both write and close to parquet writer concurrently. There is a pool 
> of Compressor which is used inside parquet writer for store compressed bytes. 
> Parquet writers closed in such way, could not pa

[jira] [Created] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2021-11-27 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-2875:
---

 Summary: Concurrent call to HoodieMergeHandler cause parquet 
corruption
 Key: HUDI-2875
 URL: https://issues.apache.org/jira/browse/HUDI-2875
 Project: Apache Hudi
  Issue Type: Bug
  Components: Common Core
Reporter: ZiyueGuan


Problem:

Some corrupted parquet files are generated and exceptions will be thrown when 
read.

e.g.

 
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 0 in block -1 in file 
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
    at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    ... 4 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page 
Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col  
required binary col
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
    at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
    at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
    at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
    at 
org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
    at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
    at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
    ... 11 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at 
org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
    at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
    at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
 

How to reproduce:

We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
speculation. When speculation is triggered, other tasks at the same time or 
later will have the risk to suffer a wrong parquet generation.  Nearly half of 
them will throw exception while there is few tasks succeed without any signal.

RootCause:

ParquetWriter is not thread safe. User of it should apply proper way to 
guarantee that there is not concurrent call to ParquetWriter. In the following 
code: 

https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103

We call both write and close to parquet writer concurrently. There is a pool of 
Compressor which is used inside parquet writer for store compressed bytes. 
Parquet writers closed in such way, could not payback totally reset compressor 
so that any task reuse this dirty compressor may generate wrong data.

 

What a pity is that I haven't come up with a good way to repo in small use 
case. Validation is doing in real hudi ingestion job.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (HUDI-2917) Rollback may be incorrect for canIndexLogFile index

2021-12-02 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-2917:
---

 Summary: Rollback may be incorrect for canIndexLogFile index
 Key: HUDI-2917
 URL: https://issues.apache.org/jira/browse/HUDI-2917
 Project: Apache Hudi
  Issue Type: Bug
  Components: Common Core
Reporter: ZiyueGuan


Problem:

we may find some data which should be rollbacked in hudi table.

Root cause:

Let's first recall how rollback plan generated about log blocks for 
deltaCommit. Hudi takes two cases into consideration.
 # For some log file with no base file, they are comprised by records which are 
all 'insert record'. Delete them directly. Here we assume all inserted record 
should be covered by this way.
 # For those fileID which are updated according to inflight commit meta of 
instant we want to rollback, we append command block to these log file to 
rollback.  Here all updated record are handled.

However, the first condition is not always true. For indexes which can index 
log file, they could insert record to some existing log file. In current 
process, inflight hoodieCommitMeta was generated before they are assigned to 
specific filegroup. 

 

Fix: 

What's needed to fix this problem, we need to use the result of partitioner to 
generate hoodieCommitMeta rather than workProfile. Also, we may need more 
comments in rollback code to remind this case.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2021-12-10 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan updated HUDI-2875:

Description: 
Problem:

Some corrupted parquet files are generated and exceptions will be thrown when 
read.

e.g.

 
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 0 in block -1 in file 
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
    at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    ... 4 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page 
Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col  
required binary col
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
    at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
    at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
    at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
    at 
org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
    at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
    at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
    ... 11 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at 
org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
    at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
    at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
 

How to reproduce:

We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
speculation. When speculation is triggered, other tasks working at the same 
executor will have the risk to suffer a wrong parquet generation. This will not 
always result in corrupted parquet file. Nearly half of them will throw 
exception while there is few tasks succeed without any signal.

RootCause:

ParquetWriter is not thread safe. User of it should apply proper way to 
guarantee that there is not concurrent call to ParquetWriter.

In the following code: 

[https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]

We call both write and close to parquet writer concurrently. Data may being 
written while we call close. In close method, compressor (a class used by 
parquet to do compressing which has a stateful data structure insied) will be 
cleared and payback to a pool for following reuse. Due to the concurrent write 
mentioned above, data may be continued pushed to compressor even though we have 
them cleared. Besides, there is a mechanism inside compressor which tries to 
check some invalid use. That's why some of invalid usage will throw exception 
rather than generate corrupted parquet.

Validation:

Current solution is validated by production environment. A signal is that when 
this fix applied is that there should be no task failed due 

[jira] [Updated] (HUDI-2875) Concurrent call to HoodieMergeHandler cause parquet corruption

2021-12-10 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan updated HUDI-2875:

Description: 
Problem:

Some corrupted parquet files are generated and exceptions will be thrown when 
read.

e.g.

 
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 0 in block -1 in file 
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
    at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:112)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    ... 4 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read page 
Page [bytes.size=1054316, valueCount=237, uncompressedSize=1054316] in col  
required binary col
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:599)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:536)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:533)
    at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:533)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:353)
    at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
    at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
    at 
org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:271)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
    at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
    at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
    ... 11 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at 
org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
    at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
    at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
    at 
org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:592)
 

How to reproduce:

We need a way which could interrupt one task w/o shutdown JVM. Let's say, 
speculation. When speculation is triggered, other tasks working at the same 
executor will have the risk to suffer a wrong parquet generation. This will not 
always result in corrupted parquet file. Nearly half of them will throw 
exception while there is few tasks succeed without any signal.

RootCause:

ParquetWriter is not thread safe. User of it should apply proper way to 
guarantee that there is not concurrent call to ParquetWriter.

In the following code: 

[https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L103]

We call both write and close to parquet writer concurrently. Data may being 
written while we call close. In close method, compressor (a class used by 
parquet to do compressing which has a stateful data structure insied) will be 
cleared and payback to a pool for following reuse. Due to the concurrent write 
mentioned above, data may be continued pushed to compressor even though we have 
them cleared. Besides, there is a mechanism inside compressor which tries to 
check some invalid use. That's why some of invalid usage will throw exception 
rather than generate corrupted parquet.

Validation:

Current solution is validated by production environment. A single is that when 
this fix applied is that there should be no task failed due 

[jira] [Commented] (HUDI-2761) IllegalArgException from timeline server when serving getLastestBaseFiles with multi-writer

2021-12-11 Thread ZiyueGuan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17457810#comment-17457810
 ] 

ZiyueGuan commented on HUDI-2761:
-

Close 2400 as it seems to be the same problem with this issude. 
https://issues.apache.org/jira/browse/HUDI-2400

> IllegalArgException from timeline server when serving getLastestBaseFiles 
> with multi-writer
> ---
>
> Key: HUDI-2761
> URL: https://issues.apache.org/jira/browse/HUDI-2761
> Project: Apache Hudi
>  Issue Type: Sub-task
>Reporter: sivabalan narayanan
>Assignee: sivabalan narayanan
>Priority: Blocker
> Fix For: 0.10.0
>
> Attachments: Screen Shot 2021-11-15 at 8.27.11 AM.png, Screen Shot 
> 2021-11-15 at 8.27.33 AM.png, Screen Shot 2021-11-15 at 8.28.03 AM.png, 
> Screen Shot 2021-11-15 at 8.28.25 AM.png
>
>
> When concurrent writes try to ingest to hudi, occasionally, we run into 
> IllegalArgumentException as below. Even though exception is seen, the actual 
> write succeeds though. 
> Here is what is happening from my understanding. 
>  
> Lets say table's latest commit is C3. 
> Writer1 tries to commit C4, writer2 tries to do C5 and writer3 tries to do C6 
> (all 3 are non-overlapping and so expected to succeed) 
> I started C4 from writer1 and then switched to writer 2 and triggered C5 and 
> then did the same for writer3. 
> C4 went through fine for writer1 and succeeded. 
> for writer2, when timeline got instantiated, it's latest snapshot was C3, but 
> when it received the getLatestBaseFiles() request, latest commit was C4 and 
> so it throws an exception. Similar issue happend w/ writer3 as well. 
>  
> {code:java}
> scala> df.write.format("hudi").
>      |   options(getQuickstartWriteConfigs).
>      |   option(PRECOMBINE_FIELD.key(), "created_at").
>      |   option(RECORDKEY_FIELD.key(), "other").
>      |   option(PARTITIONPATH_FIELD.key(), "type").
>      |   option("hoodie.cleaner.policy.failed.writes","LAZY").
>      |   
> option("hoodie.write.concurrency.mode","OPTIMISTIC_CONCURRENCY_CONTROL").
>      |   
> option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider").
>      |   option("hoodie.write.lock.zookeeper.url","localhost").
>      |   option("hoodie.write.lock.zookeeper.port","2181").
>      |   option("hoodie.write.lock.zookeeper.lock_key","locks").
>      |   
> option("hoodie.write.lock.zookeeper.base_path","/tmp/mw_testing/.locks").
>      |   option(TBL_NAME.key(), tableName).
>      |   mode(Append).
>      |   save(basePath)
> 21/11/15 07:47:33 WARN HoodieSparkSqlWriter$: Commit time 2025074733457
> 21/11/15 07:47:35 WARN EmbeddedTimelineService: Started embedded timeline 
> server at 10.0.0.202:57644
> [Stage 2:>                                                        (0          
>                                                           21/11/15 07:47:39 
> ERROR RequestHandler: Got runtime exception servicing request 
> partition=CreateEvent&maxinstant=2025074301094&basepath=file%3A%2Ftmp%2Fmw_testing%2Ftrial2&lastinstantts=2025074301094&timelinehash=ce963fe977a9d2176fadecf16c223cb3b98d7f6f7aaaf41cd7855eb098aee47d
> java.lang.IllegalArgumentException: Last known instant from client was 
> 2025074301094 but server has the following timeline 
> [[2025074301094__commit__COMPLETED], 
> [2025074731908__commit__COMPLETED]]
>     at 
> org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)
>     at 
> org.apache.hudi.timeline.service.RequestHandler$ViewHandler.handle(RequestHandler.java:510)
>     at io.javalin.security.SecurityUtil.noopAccessManager(SecurityUtil.kt:22)
>     at io.javalin.Javalin.lambda$addHandler$0(Javalin.java:606)
>     at io.javalin.core.JavalinServlet$service$2$1.invoke(JavalinServlet.kt:46)
>     at io.javalin.core.JavalinServlet$service$2$1.invoke(JavalinServlet.kt:17)
>     at io.javalin.core.JavalinServlet$service$1.invoke(JavalinServlet.kt:143)
>     at io.javalin.core.JavalinServlet$service$2.invoke(JavalinServlet.kt:41)
>     at io.javalin.core.JavalinServlet.service(JavalinServlet.kt:107)
>     at 
> io.javalin.core.util.JettyServerUtil$initialize$httpHandler$1.doHandle(JettyServerUtil.kt:72)
>     at 
> org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
>     at 
> org.apache.hudi.org.apache.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
>     at 
> org.apache.hudi.org.apache.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1668)
>     at 
> org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
>     at 
> org.apache.hudi.org.apache.jetty.server.handler.ContextHandler.doScope(ContextHand

[jira] [Commented] (HUDI-2400) Allow timeline server correctly sync when concurrent write to timeline

2021-12-11 Thread ZiyueGuan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17457811#comment-17457811
 ] 

ZiyueGuan commented on HUDI-2400:
-

Duplicate with https://issues.apache.org/jira/browse/HUDI-2761.

> Allow timeline server correctly sync when concurrent write to timeline
> --
>
> Key: HUDI-2400
> URL: https://issues.apache.org/jira/browse/HUDI-2400
> Project: Apache Hudi
>  Issue Type: Sub-task
>  Components: Compaction
>Reporter: ZiyueGuan
>Priority: Major
>
> Firstly, assume HUDI-1847 is available and we can have an ingestion spark job 
> and a compaction job running at the same time.
> Assume we have a timestamp for each HoodieTimeLine object which represent the 
> time it generated from hdfs.
> Considering following case,
>  1. ingestion schedule compaction inline. Now we have a timeline: 
> 1.deltaCommit.Completed, 2.Compaction.Requested (TimeStamp: 1L)
>  2. Then ingestion keep move on. We now have 1.deltaCommit.Completed, 
> 2.Compaction.Requested 3.deltaCommit.Inflight (TimeStamp: 2L) in ingestion 
> job.
>  3. We have an independent Spark job run compaction 2. We now have 
> 1.deltaCommit.Completed, 2.Compaction.Inflight 3.deltaCommit.Inflight 
> (TimeStamp: 3L)
>  4. Executors in ingestion job send request to timeline server, now they hold 
> timeline with TimeStamp 2L. But Timeline Server have timestamp 3L which is 
> later than client.
> According to the logic in 
> https://github.com/apache/hudi/blob/master/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java#L137,
>  
> we thought local view of table's timeline is behind that of client's view as 
> long as the timeline hashes are different. However this may not be true in 
> the case mentioned above.
> Here the hashes are different because client view is behind local view.
> A simple solution is to add an attribute to timeline which is the timestamp 
> we used above. 
> And timeline server may determine whether to sync fileSystemView by comparing 
> timestamps between client and local rather than the difference between 
> timeline hashes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (HUDI-2400) Allow timeline server correctly sync when concurrent write to timeline

2021-12-11 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan closed HUDI-2400.
---
Resolution: Duplicate

> Allow timeline server correctly sync when concurrent write to timeline
> --
>
> Key: HUDI-2400
> URL: https://issues.apache.org/jira/browse/HUDI-2400
> Project: Apache Hudi
>  Issue Type: Sub-task
>  Components: Compaction
>Reporter: ZiyueGuan
>Priority: Major
>
> Firstly, assume HUDI-1847 is available and we can have an ingestion spark job 
> and a compaction job running at the same time.
> Assume we have a timestamp for each HoodieTimeLine object which represent the 
> time it generated from hdfs.
> Considering following case,
>  1. ingestion schedule compaction inline. Now we have a timeline: 
> 1.deltaCommit.Completed, 2.Compaction.Requested (TimeStamp: 1L)
>  2. Then ingestion keep move on. We now have 1.deltaCommit.Completed, 
> 2.Compaction.Requested 3.deltaCommit.Inflight (TimeStamp: 2L) in ingestion 
> job.
>  3. We have an independent Spark job run compaction 2. We now have 
> 1.deltaCommit.Completed, 2.Compaction.Inflight 3.deltaCommit.Inflight 
> (TimeStamp: 3L)
>  4. Executors in ingestion job send request to timeline server, now they hold 
> timeline with TimeStamp 2L. But Timeline Server have timestamp 3L which is 
> later than client.
> According to the logic in 
> https://github.com/apache/hudi/blob/master/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java#L137,
>  
> we thought local view of table's timeline is behind that of client's view as 
> long as the timeline hashes are different. However this may not be true in 
> the case mentioned above.
> Here the hashes are different because client view is behind local view.
> A simple solution is to add an attribute to timeline which is the timestamp 
> we used above. 
> And timeline server may determine whether to sync fileSystemView by comparing 
> timestamps between client and local rather than the difference between 
> timeline hashes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (HUDI-3026) HoodieAppendhandle may result in duplicate key for hbase index

2021-12-15 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-3026:
---

 Summary: HoodieAppendhandle may result in duplicate key for hbase 
index
 Key: HUDI-3026
 URL: https://issues.apache.org/jira/browse/HUDI-3026
 Project: Apache Hudi
  Issue Type: Bug
Reporter: ZiyueGuan


Problem: a same key may occur in two file group. These two file group will have 
same FileID prefix.

How to repro:

We should have a table w/o record sorted in spark. Let's say we have 1,2,3,4,5 
records to write. They may be iterated in different order. 

In the first attempt 1, we write 543 to fileID_1_log.1_attempt1. But this 
attempt failed. Spark will have a try in the second task attempt (attempt 2), 
we write 1234 to  fileID_1_log.1_attempt2. And then, we find this filegroup is 
large enough by call canWrite. So hudi write record 5 to 
fileID_2_log.1_attempt2 and finish this commit.

When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 will 
be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we also 
got 5 in fileID_2. Record 5 will appear in two fileGroup.

Reason: Markerfile doesn't reconcile log file as code show in  
[https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.]

And log file is actually not fail-safe.

I'm not sure if [~danny0405] have found this problem too as I find 
FlinkAppendHandle had been made to always return true. But it was just changed 
back recently. 

Solution:

We may have a quick fix by making canWrite in HoodieAppendHandle always return 
true. However, I think there may be a more elegant solution that we use append 
result to generate compaction plan rather than list log file, in which we will 
have a more granular control on log block instead of log file. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (HUDI-3026) HoodieAppendhandle may result in duplicate key for hbase index

2021-12-19 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan reassigned HUDI-3026:
---

Assignee: ZiyueGuan

> HoodieAppendhandle may result in duplicate key for hbase index
> --
>
> Key: HUDI-3026
> URL: https://issues.apache.org/jira/browse/HUDI-3026
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>
> Problem: a same key may occur in two file group. These two file group will 
> have same FileID prefix.
> How to repro:
> We should have a table w/o record sorted in spark. Let's say we have 
> 1,2,3,4,5 records to write. They may be iterated in different order. 
> In the first attempt 1, we write 543 to fileID_1_log.1_attempt1. But this 
> attempt failed. Spark will have a try in the second task attempt (attempt 2), 
> we write 1234 to  fileID_1_log.1_attempt2. And then, we find this filegroup 
> is large enough by call canWrite. So hudi write record 5 to 
> fileID_2_log.1_attempt2 and finish this commit.
> When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 
> will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we 
> also got 5 in fileID_2. Record 5 will appear in two fileGroup.
> Reason: Markerfile doesn't reconcile log file as code show in  
> [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.]
> And log file is actually not fail-safe.
> I'm not sure if [~danny0405] have found this problem too as I find 
> FlinkAppendHandle had been made to always return true. But it was just 
> changed back recently. 
> Solution:
> We may have a quick fix by making canWrite in HoodieAppendHandle always 
> return true. However, I think there may be a more elegant solution that we 
> use append result to generate compaction plan rather than list log file, in 
> which we will have a more granular control on log block instead of log file. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (HUDI-2917) Rollback may be incorrect for canIndexLogFile index

2021-12-25 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan reassigned HUDI-2917:
---

Assignee: ZiyueGuan

> Rollback may be incorrect for canIndexLogFile index
> ---
>
> Key: HUDI-2917
> URL: https://issues.apache.org/jira/browse/HUDI-2917
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: Common Core
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>
> Problem:
> we may find some data which should be rollbacked in hudi table.
> Root cause:
> Let's first recall how rollback plan generated about log blocks for 
> deltaCommit. Hudi takes two cases into consideration.
>  # For some log file with no base file, they are comprised by records which 
> are all 'insert record'. Delete them directly. Here we assume all inserted 
> record should be covered by this way.
>  # For those fileID which are updated according to inflight commit meta of 
> instant we want to rollback, we append command block to these log file to 
> rollback.  Here all updated record are handled.
> However, the first condition is not always true. For indexes which can index 
> log file, they could insert record to some existing log file. In current 
> process, inflight hoodieCommitMeta was generated before they are assigned to 
> specific filegroup. 
>  
> Fix: 
> What's needed to fix this problem, we need to use the result of partitioner 
> to generate hoodieCommitMeta rather than workProfile. Also, we may need more 
> comments in rollback code to remind this case.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (HUDI-3026) HoodieAppendhandle may result in duplicate key for hbase index

2021-12-26 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan updated HUDI-3026:

Description: 
Problem: a same key may occur in two file group when Hbase index is used. These 
two file group will have same FileID prefix. As Hbase index is global, this is 
unexpected

How to repro:

We should have a table w/o record sorted in spark. Let's say we have 1,2,3,4,5 
records to write. They may be iterated in different order. 

In the first attempt 1, we write 543 to fileID_1_log.1_attempt1. But this 
attempt failed. Spark will have a try in the second task attempt (attempt 2), 
we write 1234 to  fileID_1_log.1_attempt2. And then, we find this filegroup is 
large enough by call canWrite. So hudi write record 5 to 
fileID_2_log.1_attempt2 and finish this commit.

When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 will 
be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we also 
got 5 in fileID_2. Record 5 will appear in two fileGroup.

Reason: Markerfile doesn't reconcile log file as code show in  
[https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.]

And log file is actually not fail-safe.

I'm not sure if [~danny0405] have found this problem too as I find 
FlinkAppendHandle had been made to always return true. But it was just changed 
back recently. 

Solution:

We may have a quick fix by making canWrite in HoodieAppendHandle always return 
true. However, I think there may be a more elegant solution that we use append 
result to generate compaction plan rather than list log file, in which we will 
have a more granular control on log block instead of log file. 

  was:
Problem: a same key may occur in two file group. These two file group will have 
same FileID prefix.

How to repro:

We should have a table w/o record sorted in spark. Let's say we have 1,2,3,4,5 
records to write. They may be iterated in different order. 

In the first attempt 1, we write 543 to fileID_1_log.1_attempt1. But this 
attempt failed. Spark will have a try in the second task attempt (attempt 2), 
we write 1234 to  fileID_1_log.1_attempt2. And then, we find this filegroup is 
large enough by call canWrite. So hudi write record 5 to 
fileID_2_log.1_attempt2 and finish this commit.

When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 will 
be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we also 
got 5 in fileID_2. Record 5 will appear in two fileGroup.

Reason: Markerfile doesn't reconcile log file as code show in  
[https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.]

And log file is actually not fail-safe.

I'm not sure if [~danny0405] have found this problem too as I find 
FlinkAppendHandle had been made to always return true. But it was just changed 
back recently. 

Solution:

We may have a quick fix by making canWrite in HoodieAppendHandle always return 
true. However, I think there may be a more elegant solution that we use append 
result to generate compaction plan rather than list log file, in which we will 
have a more granular control on log block instead of log file. 


> HoodieAppendhandle may result in duplicate key for hbase index
> --
>
> Key: HUDI-3026
> URL: https://issues.apache.org/jira/browse/HUDI-3026
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>  Labels: pull-request-available
>
> Problem: a same key may occur in two file group when Hbase index is used. 
> These two file group will have same FileID prefix. As Hbase index is global, 
> this is unexpected
> How to repro:
> We should have a table w/o record sorted in spark. Let's say we have 
> 1,2,3,4,5 records to write. They may be iterated in different order. 
> In the first attempt 1, we write 543 to fileID_1_log.1_attempt1. But this 
> attempt failed. Spark will have a try in the second task attempt (attempt 2), 
> we write 1234 to  fileID_1_log.1_attempt2. And then, we find this filegroup 
> is large enough by call canWrite. So hudi write record 5 to 
> fileID_2_log.1_attempt2 and finish this commit.
> When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 
> will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we 
> also got 5 in fileID_2. Record 5 will appear in two fileGroup.
> Reason: Markerfile doesn't reconcile log file as code show in  
> [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/Ho

[jira] [Commented] (HUDI-1517) Create marker file for every log file

2022-02-20 Thread ZiyueGuan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495169#comment-17495169
 ] 

ZiyueGuan commented on HUDI-1517:
-

Hi [~shivnarayan], 

Do you have a plan to pick this up recently?

> Create marker file for every log file
> -
>
> Key: HUDI-1517
> URL: https://issues.apache.org/jira/browse/HUDI-1517
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: writer-core
>Reporter: sivabalan narayanan
>Priority: Major
>
> As of now, hudi creates marker file based on base file. But we might need to 
> fix this to create one marker file per log file denoting the actual log file 
> info. We can leverage this during metadata sync w/ rollback and restore 
> rather than doing a file listing. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (HUDI-3694) Not use magic number of next block to determine current log block

2022-03-23 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-3694:
---

 Summary: Not use magic number of next block to determine current 
log block
 Key: HUDI-3694
 URL: https://issues.apache.org/jira/browse/HUDI-3694
 Project: Apache Hudi
  Issue Type: Bug
Reporter: ZiyueGuan


HoodieLogFileReader use magic number of next log block to determine if current 
log block is corrupted. But when next is corrupted but current not. It will 
make it incorrect



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (HUDI-3694) Not use magic number of next block to determine current log block

2022-03-23 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-3694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan updated HUDI-3694:

Description: HoodieLogFileReader use magic number of next log block to 
determine if current log block is corrupted. However, when next block has a 
corrupted magic number, we will abandon current block, which leads to data 
loss.  (was: HoodieLogFileReader use magic number of next log block to 
determine if current log block is corrupted. But when next is corrupted but 
current not. It will make it incorrect)

> Not use magic number of next block to determine current log block
> -
>
> Key: HUDI-3694
> URL: https://issues.apache.org/jira/browse/HUDI-3694
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: ZiyueGuan
>Priority: Major
>
> HoodieLogFileReader use magic number of next log block to determine if 
> current log block is corrupted. However, when next block has a corrupted 
> magic number, we will abandon current block, which leads to data loss.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (HUDI-4055) use loop replace recursive call in ratelimiter

2022-05-06 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-4055:
---

 Summary: use loop replace recursive call in ratelimiter
 Key: HUDI-4055
 URL: https://issues.apache.org/jira/browse/HUDI-4055
 Project: Apache Hudi
  Issue Type: Bug
  Components: index
Reporter: ZiyueGuan


rate limiter recursively call to acquire which may lead stack over flow



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (HUDI-4055) use loop replace recursive call in ratelimiter

2022-05-08 Thread ZiyueGuan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-4055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17533485#comment-17533485
 ] 

ZiyueGuan commented on HUDI-4055:
-

https://github.com/apache/hudi/pull/5530

> use loop replace recursive call in ratelimiter
> --
>
> Key: HUDI-4055
> URL: https://issues.apache.org/jira/browse/HUDI-4055
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: index
>Reporter: ZiyueGuan
>Priority: Minor
>  Labels: pull-request-available
>
> rate limiter recursively call to acquire which may lead stack over flow



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (HUDI-4912) Make write status idempotent

2022-09-24 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-4912:
---

 Summary: Make write status idempotent
 Key: HUDI-4912
 URL: https://issues.apache.org/jira/browse/HUDI-4912
 Project: Apache Hudi
  Issue Type: Bug
  Components: index
Reporter: ZiyueGuan


HBase Index update some times not inconsistent with data. The main reason is 
that the result of task is not idempotent. A task run two times may get 
different bucket assign result. 
 * Hudi on spark cache write status on executor. Once executor exits before 
commit, wrtie status will be regenerated. However, hbase index is updated by 
previous write status and will not be updated by new write status.
 * When we use speculation in bulkinsert, hbase index is updated concurrently. 
Though only one task can succeed, it doesn't mean that all content in index is 
updated by this task. Those content updated by other failed task may be 
inconsistent with data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-4965) automatically adapt COMMITS_ARCHIVAL_BATCH_SIZE

2022-10-01 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-4965:
---

 Summary: automatically adapt COMMITS_ARCHIVAL_BATCH_SIZE
 Key: HUDI-4965
 URL: https://issues.apache.org/jira/browse/HUDI-4965
 Project: Apache Hudi
  Issue Type: Improvement
Reporter: ZiyueGuan


COMMITS_ARCHIVAL_BATCH_SIZE is used to determine how many records will write to 
archived timeline. However, if there are several huge instants which leads to a 
huge batch, it cannot be written as log block is overflow. So determine batch 
size by instants binary size.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (HUDI-3026) HoodieAppendhandle may result in duplicate key for hbase index

2023-05-28 Thread ZiyueGuan (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-3026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726947#comment-17726947
 ] 

ZiyueGuan commented on HUDI-3026:
-

This bug is fixed by HUDI-1517. In HUDI-1517, we allow marker file for log file 
use create type. At the end of commit (reconcile process), we will compare 
marker file with commit write status. Any illegal log files which are created 
during writing will be deleted, which works same as marker file for base file.

See
{code:java}
HoodieTable.reconcileAgainstMarkers{code}

> HoodieAppendhandle may result in duplicate key for hbase index
> --
>
> Key: HUDI-3026
> URL: https://issues.apache.org/jira/browse/HUDI-3026
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 0.14.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Problem: a same key may occur in two file group when Hbase index is used. 
> These two file group will have same FileID prefix. As Hbase index is global, 
> this is unexpected
> How to repro:
> We should have a table w/o record sorted in spark. Let's say we have five 
> records with key 1,2,3,4,5 to write. They may be iterated in different order. 
> In the first attempt 1, we write three records 5,4,3 to 
> fileID_1_log.1_attempt1. But this attempt failed. Spark will have a try in 
> the second task attempt (attempt 2), we write four records 1,2,3,4 to  
> fileID_1_log.1_attempt2. And then, we find this filegroup is large enough by 
> call canWrite. So hudi write record 5 to fileID_2_log.1_attempt2 and finish 
> this commit.
> When we do compaction, fileID_1_log.1_attempt1 and fileID_1_log.1_attempt2 
> will be compacted. And we finally got 543 + 1234 = 12345 in fileID_1 while we 
> also got 5 in fileID_2. Record 5 will appear in two fileGroup.
> Reason: Markerfile doesn't reconcile log file as code show in  
> [https://github.com/apache/hudi/blob/9a2030ab3190acf600ce4820be9a08929595763e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java#L553.]
> And log file is actually not fail-safe.
> I'm not sure if [~danny0405] have found this problem too as I find 
> FlinkAppendHandle had been made to always return true. But it was just 
> changed back recently. 
> Solution:
> We may have a quick fix by making canWrite in HoodieAppendHandle always 
> return true. However, I think there may be a more elegant solution that we 
> use append result to generate compaction plan rather than list log file, in 
> which we will have a more granular control on log block instead of log file. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-6401) should not throw exception when create marker file for log file

2023-06-16 Thread ZiyueGuan (Jira)
ZiyueGuan created HUDI-6401:
---

 Summary: should not throw exception when create marker file for 
log file
 Key: HUDI-6401
 URL: https://issues.apache.org/jira/browse/HUDI-6401
 Project: Apache Hudi
  Issue Type: Bug
Reporter: ZiyueGuan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-6401) should not throw exception when create marker file for log file

2023-06-16 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-6401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan updated HUDI-6401:

Description: when spark task failed or speculation enabled, different task 
will create marker file for the same log file. We should use createIfNotExists 
to check the result of create marker file rather than let it throw exception

> should not throw exception when create marker file for log file
> ---
>
> Key: HUDI-6401
> URL: https://issues.apache.org/jira/browse/HUDI-6401
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: ZiyueGuan
>Priority: Major
>
> when spark task failed or speculation enabled, different task will create 
> marker file for the same log file. We should use createIfNotExists to check 
> the result of create marker file rather than let it throw exception



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (HUDI-6401) should not throw exception when create marker file for log file

2023-06-16 Thread ZiyueGuan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-6401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZiyueGuan reassigned HUDI-6401:
---

Assignee: ZiyueGuan

> should not throw exception when create marker file for log file
> ---
>
> Key: HUDI-6401
> URL: https://issues.apache.org/jira/browse/HUDI-6401
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: ZiyueGuan
>Assignee: ZiyueGuan
>Priority: Major
>
> when spark task failed or speculation enabled, different task will create 
> marker file for the same log file. We should use createIfNotExists to check 
> the result of create marker file rather than let it throw exception



--
This message was sent by Atlassian Jira
(v8.20.10#820010)