[ 
https://issues.apache.org/jira/browse/HUDI-1007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liujinhui updated HUDI-1007:
----------------------------
    Description: 
Use deltastreamer to consume kafka,
 When earliestOffsets is greater than checkpoint, Hudi will not be able to 
successfully consume data

org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen#checkupValidOffsets

boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
 .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));

return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;

Kafka data is continuously generated, which means that some data will continue 
to expire.
 When earliestOffsets is greater than checkpoint, earliestOffsets will be 
taken. But at this moment, some data expired. In the end, consumption fails. 
This process is an endless cycle. I can understand that this design may be to 
avoid the loss of data, but it will lead to such a situation, I want to fix 
this problem, I want to hear your opinion  

  was:
Use deltastreamer to consume kafka,
 When earliestOffsets is greater than checkpoint, Hudi will not be able to 
successfully consume data



org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen#checkupValidOffsets

boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
 .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));

return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;


Kafka data is continuously generated, which means that some data will continue 
to expire.
When earliestOffsets is greater than checkpoint, earliestOffsets will be taken. 
But at this moment, some data expired. In the end, consumption fails. This 
process is an endless cycle. I can understand that this design may be to avoid 
the loss of data, but it will lead to such a situation, I want to fix this 
problem, I want to hear your opinion


> When earliestOffsets is greater than checkpoint, Hudi will not be able to 
> successfully consume data
> ---------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-1007
>                 URL: https://issues.apache.org/jira/browse/HUDI-1007
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: liujinhui
>            Assignee: liujinhui
>            Priority: Major
>             Fix For: 0.6.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Use deltastreamer to consume kafka,
>  When earliestOffsets is greater than checkpoint, Hudi will not be able to 
> successfully consume data
> org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen#checkupValidOffsets
> boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
>  .anyMatch(offset -> offset.getValue() < 
> earliestOffsets.get(offset.getKey()));
> return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
> Kafka data is continuously generated, which means that some data will 
> continue to expire.
>  When earliestOffsets is greater than checkpoint, earliestOffsets will be 
> taken. But at this moment, some data expired. In the end, consumption fails. 
> This process is an endless cycle. I can understand that this design may be to 
> avoid the loss of data, but it will lead to such a situation, I want to fix 
> this problem, I want to hear your opinion  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to