[GitHub] [hudi] nsivabalan edited a comment on pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-06-19 Thread GitBox


nsivabalan edited a comment on pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#issuecomment-864418016


   actually, we can make it even more simpler. 
   DeltaSync.read()
   ```
   // set right checkpoint value
   if(cfg.checkpoint != null && ! 
(commitMetadata.contains(Checkpoint_RESET_Key) ) {
  checkpoint = cfg.checkpoint;
   } else if (commitMetadata.contains(Checkpoint_Key)) {
   checkpoint = commitMetadata.get(Checkpoint_Key));
   } else {
   Option.empty() 
   }
   // New addition
   if (commitMetadata.contains(Checkpoint_RESET_Key)) {
 **reset checkpoint type if set.** 
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan edited a comment on pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-06-19 Thread GitBox


nsivabalan edited a comment on pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#issuecomment-864413719


   Guess we can simplify things. Let me go over some pseudo code of interest. 
   
   Code before this patch. 
   
   within DeltaSync.read()
   ```
   // set right checkpoint value 
   if(cfg.checkpoint != null && ! 
(commitMetadata.contains(Checkpoint_RESET_Key) ) {
  checkpoint = cfg.checkpoint;
   } else if (commitMetadata.contains(Checkpoint_Key)) {
   checkpoint = commitMetadata.get(Checkpoint_Key));
   } else {
   Option.empty() 
   }
   ```
   // Note that first if condition deals with RESET_key where as 2nd else if 
conditions deals with Checkpoint_key. 
   I have simplified some exception cases, but should give you the gist.
   
   within write() 
   ```
   // towards the end
   commitMetadata.out(Checkpoint_Key, updated checkpoint after writing)
   if(cfg.checkpoint != null) {
 commitMetadata.add(Checkpoint_RESET_Key);
   }
   ```
   
   If cfg.checkpoint is set, only during first round, it will be honored. At 
the end of first batch, we add Checkpoint_RESET_Key to the commitmetadata and 
hence from subsequent batches, checkpoint will be parsed from commitMetadata. 
   
   With this PR, only addition is that we are introducing a new checkpoint 
type. Let me propose a simple add on to above code that would work for us. 
   
   within DeltaSync.read()
   ```
   // set right checkpoint value 
   boolean resetCheckpointType = true; // New addition
   if(cfg.checkpoint != null && ! 
(commitMetadata.contains(Checkpoint_RESET_Key) ) {
  checkpoint = cfg.checkpoint;
  resetCheckpointType = false; // New addition
   } else if (commitMetadata.contains(Checkpoint_Key)) {
   checkpoint = commitMetadata.get(Checkpoint_Key));
   } else {
   Option.empty() 
   }
   // New addition
   if (resetCheckpointType) {
 **reset checkpoint type if set.** 
   }
   ```
   
   No other changes are required. This is based of the assumption that 
Checkpoint_RESET_Key and checkpoint type goes hand in hand. During first batch, 
checkpoint type could be set, there won't be any Checkpoint_RESET_Key set. But 
from 2nd batch, it should be reverse. check point type should not be set, but 
Checkpoint_RESET_Key should be part of the commit metadata. Given this 
assumption, we don't really need to add checkpoint type to commitMetadata, but 
still decide whether to use the checkpoint type or not. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan edited a comment on pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-06-19 Thread GitBox


nsivabalan edited a comment on pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#issuecomment-864413719


   Guess we can simplify things. Let me go over some pseudo code of interest. 
   
   within DeltaSync.read()
   ```
   // set right checkpoint value 
   if(cfg.checkpoint != null && ! 
(commitMetadata.contains(Checkpoint_RESET_Key) ) {
  checkpoint = cfg.checkpoint;
   } else if (commitMetadata.contains(Checkpoint_Key)) {
   checkpoint = commitMetadata.get(Checkpoint_Key));
   } else {
   Option.empty() 
   }
   ```
   // Note that first if condition deals with RESET_key where as 2nd else if 
conditions deals with Checkpoint_key. 
   I have simplified some exception cases, but should give you the gist.
   
   within write() 
   ```
   // towards the end
   commitMetadata.out(Checkpoint_Key, updated checkpoint after writing)
   if(cfg.checkpoint != null) {
 commitMetadata.add(Checkpoint_RESET_Key);
   }
   ```
   
   If cfg.checkpoint is set, only during first round, it will be honored. At 
the end of first batch, we add Checkpoint_RESET_Key to the commitmetadata and 
hence from subsequent batches, checkpoint will be parsed from commitMetadata. 
   
   With this PR, only addition is that we are introducing a new checkpoint 
type. Let me propose a simple add on to above code that would work for us. 
   
   Code before this patch. 
   within DeltaSync.read()
   ```
   // set right checkpoint value 
   boolean resetCheckpointType = true; // New addition
   if(cfg.checkpoint != null && ! 
(commitMetadata.contains(Checkpoint_RESET_Key) ) {
  checkpoint = cfg.checkpoint;
  resetCheckpointType = false; // New addition
   } else if (commitMetadata.contains(Checkpoint_Key)) {
   checkpoint = commitMetadata.get(Checkpoint_Key));
   } else {
   Option.empty() 
   }
   // New addition
   if (resetCheckpointType) {
 **reset checkpoint type if set.** 
   }
   ```
   
   No other changes are required. This is based of the assumption that 
Checkpoint_RESET_Key and checkpoint type goes hand in hand. During first batch, 
checkpoint type could be set, there won't be any Checkpoint_RESET_Key set. But 
from 2nd batch, it should be reverse. check point type should not be set, but 
Checkpoint_RESET_Key should be part of the commit metadata. Given this 
assumption, we don't really need to add checkpoint type to commitMetadata, but 
still decide whether to use the checkpoint type or not. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan edited a comment on pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-06-19 Thread GitBox


nsivabalan edited a comment on pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#issuecomment-864413719


   Guess we can simplify things. Let me go over some pseudo code of interest. 
   
   within DeltaSync.read()
   ```
   // set right checkpoint value 
   if(cfg.checkpoint != null && ! 
(commitMetadata.contains(Checkpoint_RESET_Key) ) {
  checkpoint = cfg.checkpoint;
   } else if (commitMetadata.contains(Checkpoint_Key)) {
   checkpoint = commitMetadata.get(Checkpoint_Key));
   } else {
   Option.empty() 
   }
   ```
   // Note that first if condition deals with RESET_key where as 2nd else if 
conditions deals with Checkpoint_key. 
   I have simplified some exception cases, but should give you the gist.
   
   within write() 
   ```
   // towards the end
   commitMetadata.out(Checkpoint_Key, updated checkpoint after writing)
   if(cfg.checkpoint != null) {
 commitMetadata.add(Checkpoint_RESET_Key);
   }
   ```
   
   If cfg.checkpoint is set, only during first round, it will be honored. At 
the end of first batch, we add Checkpoint_RESET_Key to the commitmetadata and 
hence from subsequent batches, checkpoint will be parsed from commitMetadata. 
   
   With this PR, only addition is that we are introducing a new checkpoint 
type. Let me propose a simple add on to above code that would work for us. 
   
   Code before this patch. 
   
   within DeltaSync.read()
   ```
   // set right checkpoint value 
   boolean resetCheckpointType = true; // New addition
   if(cfg.checkpoint != null && ! 
(commitMetadata.contains(Checkpoint_RESET_Key) ) {
  checkpoint = cfg.checkpoint;
  resetCheckpointType = false; // New addition
   } else if (commitMetadata.contains(Checkpoint_Key)) {
   checkpoint = commitMetadata.get(Checkpoint_Key));
   } else {
   Option.empty() 
   }
   // New addition
   if (resetCheckpointType) {
 **reset checkpoint type if set.** 
   }
   ```
   
   No other changes are required. This is based of the assumption that 
Checkpoint_RESET_Key and checkpoint type goes hand in hand. During first batch, 
checkpoint type could be set, there won't be any Checkpoint_RESET_Key set. But 
from 2nd batch, it should be reverse. check point type should not be set, but 
Checkpoint_RESET_Key should be part of the commit metadata. Given this 
assumption, we don't really need to add checkpoint type to commitMetadata, but 
still decide whether to use the checkpoint type or not. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan edited a comment on pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-06-19 Thread GitBox


nsivabalan edited a comment on pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#issuecomment-864413719


   Guess we can simplify things. Let me go over some pseudo code of interest. 
   
   within DeltaSync.read()
   ```
   // set right checkpoint value 
   if(cfg.checkpoint != null && ! 
(commitMetadata.contains(Checkpoint_RESET_Key) ) {
  checkpoint = cfg.checkpoint;
   } else if (commitMetadata.contains(Checkpoint_Key)) {
   checkpoint = commitMetadata.get(Checkpoint_Key));
   } else {
   Option.empty() 
   }
   ```
   // Note that first if condition deals with RESET_key where as 2nd else if 
conditions deals with Checkpoint_key. 
   I have simplified some exception cases, but should give you the gist.
   
   within write() 
   ```
   // towards the end
   commitMetadata.out(Checkpoint_Key, updated checkpoint after writing)
   if(cfg.checkpoint != null) {
 commitMetadata.add(Checkpoint_RESET_Key);
   }
   ```
   
   If cfg.checkpoint is set, only during first round, it will be honored. At 
the end of first batch, we add Checkpoint_RESET_Key to the commitmetadata and 
hence from subsequent batches, checkpoint will be parsed from commitMetadata. 
   
   With this PR, only addition is that we are introducing a new checkpoint 
type. Let me propose a simple add on to above code that would work for us. 
   
   within DeltaSync.read()
   ```
   // set right checkpoint value 
   boolean resetCheckpointType = true; // New addition
   if(cfg.checkpoint != null && ! 
(commitMetadata.contains(Checkpoint_RESET_Key) ) {
  checkpoint = cfg.checkpoint;
  resetCheckpointType = false; // New addition
   } else if (commitMetadata.contains(Checkpoint_Key)) {
   checkpoint = commitMetadata.get(Checkpoint_Key));
   } else {
   Option.empty() 
   }
   // New addition
   if (resetCheckpointType) {
 **reset checkpoint type if set.** 
   }
   ```
   
   No other changes are required. This is based of the assumption that 
Checkpoint_RESET_Key and checkpoint type goes hand in hand. During first batch, 
checkpoint type could be set, there won't be any Checkpoint_RESET_Key set. But 
from 2nd batch, it should be reverse. check point type should not be set, but 
Checkpoint_RESET_Key should be part of the commit metadata. Given this 
assumption, we don't really need to add checkpoint type to commitMetadata, but 
still decide whether to use the checkpoint type or not. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan edited a comment on pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-06-19 Thread GitBox


nsivabalan edited a comment on pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#issuecomment-864413719


   Guess we can simplify things. Let me go over some pseudo code of interest. 
   
   within DeltaSync.read()
   ```
   // set right checkpoint value 
   if(cfg.checkpoint != null && ! 
(commitMetadata.contains(Checkpoint_RESET_Key) ) {
  checkpoint = cfg.checkpoint;
   } else if (commitMetadata.contains(Checkpoint_Key)) {
   checkpoint = commitMetadata.get(Checkpoint_Key));
   } else {
   Option.empty() 
   }
   ```
   // Note that first if condition deals with RESET_key where as 2nd else if 
conditions deals with Checkpoint_key
   
   within write() 
   ```
   // towards the end
   commitMetadata.out(Checkpoint_Key, updated checkpoint after writing)
   if(cfg.checkpoint != null) {
 commitMetadata.add(Checkpoint_RESET_Key);
   }
   ```
   
   If cfg.checkpoint is set, only during first round, it will be honored. At 
the end of first batch, we add Checkpoint_RESET_Key to the commitmetadata and 
hence from subsequent batches, checkpoint will be parsed from commitMetadata. 
   
   With this PR, only addition is that we are introducing a new checkpoint 
type. Let me propose a simple add on to above code that would work for us. 
   
   within DeltaSync.read()
   ```
   // set right checkpoint value 
   boolean resetCheckpointType = true; // New addition
   if(cfg.checkpoint != null && ! 
(commitMetadata.contains(Checkpoint_RESET_Key) ) {
  checkpoint = cfg.checkpoint;
  resetCheckpointType = false; // New addition
   } else if (commitMetadata.contains(Checkpoint_Key)) {
   checkpoint = commitMetadata.get(Checkpoint_Key));
   } else {
   Option.empty() 
   }
   // New addition
   if (resetCheckpointType) {
 **reset checkpoint type if set.** 
   }
   ```
   
   No other changes are required. This is based of the assumption that 
Checkpoint_RESET_Key and checkpoint type goes hand in hand. During first batch, 
checkpoint type could be set, there won't be any Checkpoint_RESET_Key set. But 
from 2nd batch, it should be reverse. check point type should not be set, but 
Checkpoint_RESET_Key should be part of the commit metadata. Given this 
assumption, we don't really need to add checkpoint type to commitMetadata, but 
still decide whether to use the checkpoint type or not. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan edited a comment on pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-06-08 Thread GitBox


nsivabalan edited a comment on pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#issuecomment-856862390


   @liujinhui1994 : here is what we can do. 
   If someone is running it just one, this should not be an issue. Issue arises 
when someone runs deltastreamer in a continuous manner. 
   
   So, user is expected to set HoodieDeltaStreamer.Config.checkpoint or 
InitialCheckpointProvider. 
   Also user sets the new config 
(hoodie.deltastreamer.source.kafka.checkpoint.type) to timestamp. 
   
   KafkaOffset gen should be capable of parsing the checkpoint as timestamp. 
   at the end write, deltaSync should reset this(...kafka.checkpoint.type) 
config (similar to how we reset the checkpoint).
   So, for subsequent runs, this(...kafka.checkpoint.type) config value will 
not be set. So, KafkaOffsetGen should parse checkpoint and fetch from source as 
a regular checkpoint. 
   
   Let me know if you can understand the approach, and if it makes sense. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan edited a comment on pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-06-04 Thread GitBox


nsivabalan edited a comment on pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#issuecomment-853866564


   good point. 
   Tell me if my understanding is right in general wrt usage of timestamp based 
checkpointing. 
   user would like to use timestamp based checkpointing in deltastreamer only 
for bootstrap case. 
   and further on, checkpointing will be using the regular kafka checkpoint 
format of "topicName,0:123,1:456". 
   
   if my understanding (stated above) is true, essentially, within 
kafkaOffsenGen, we might have to parse checkpoint as timestamp for first 
time(bootstrap), but from 2nd time, we fallback to regular checkpoint parsing 
mechanism. 
   
   I see we have InitialCheckPointProvider. Let me think about how to go about 
this and will get back to you. For now, this is what I can think of. 
   InitialCheckpointProvider will expose getCheckpointType() method. 
   and we add it as a property to configs if initialCheckpointProvider is set 
around 
[here](https://github.com/apache/hudi/blob/f6eee77636223077cfd2ce516f1b8805dfa6e35e/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java#L132).
 
   Within readFromSource in DeltaSync(), if checkpoint is fetched from commit 
metadata, we may not honor this checkpoint type. or we will clear the 
checkpoint type property if set. 
   but if fetched from cfg.checkPoint, we will leave the property as is and let 
kafkaOffsetGen handle checkpoint parsing. 
   
   But let me think through this more. But in the mean time, if you can confirm 
my understanding of the usage of timestamp based checkpointing, would be great. 
   
   CC @n3nash @bvaradar 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan edited a comment on pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-06-03 Thread GitBox


nsivabalan edited a comment on pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#issuecomment-853866564


   good point. 
   Tell me if my understanding is right in general wrt usage of timestamp based 
checkpointing. 
   user would like to use timestamp based checkpointing in deltastreamer only 
for bootstrap case. 
   and further on, checkpointing will be using the regular kafka checkpoint 
format of "topicName,0:123,1:456". 
   
   if my understanding (stated above) is true, essentially, within 
kafkaOffsenGen, we might have to parse checkpoint as timestamp for first 
time(bootstrap), but from 2nd time, we fallback to regular checkpoint parsing 
mechanism. 
   
   I see we have InitialCheckPointProvider. Let me think about how to go about 
this and will get back to you. For now, this is what I can think of. 
   InitialCheckpointProvider will expose getCheckpointType() method. 
   and we add it as a property to configs if initialCheckpointProvider is set 
around 
[here](https://github.com/apache/hudi/blob/f6eee77636223077cfd2ce516f1b8805dfa6e35e/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java#L132).
 
   Within readFromSource in DeltaSync(), if checkpoint is fetched from commit 
metadata, we may not honor this checkpoint type. or we will clear the 
checkpoint type property if set. 
   but if fetched from cfg.checkPoint, we will leave the property as is and let 
kafkaOffsetGen handle checkpoint parsing. 
   
   But let me think through this more. But in the mean time, if you can confirm 
my understanding of the usage of timestamp based checkpointing, would be great. 
   
   CC @n3nash @bvaradar 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan edited a comment on pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-03-15 Thread GitBox


nsivabalan edited a comment on pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#issuecomment-799571386


   Thanks for your contribution. this is going to be useful to the community. 
   Few high level questions.
   1. Why not we leverage DeltaSreamerConfig.checkpoint to pass in a timestamp 
for Kafka source? Or do we expect the format of this config to be 
"topic_name,partition_num:offset,partition_num:offset," and hence we need a 
new config for timestamp based checkpoint. 
   2. If yes to (1), Did we think about parsing the checkpoint config and 
determining whether its above format or timestamp and then proceeding from 
there. Just trying to avoid introducing new configs if possible. 
   3. Checkpoint in deltastreamer in general is getting too complicated. I 
definitely see a benefit in this patch. But, is there a way we can abstract it 
out based on source. Bcoz, the new config introduced as part of this PR, is 
very specific to Kafka. So, trying to see if we can keep it abstracted out from 
deltastreamer if possible. 
   4. I see KafkaConsumer.offsetsForTimes() could return null for partitions w/ 
msgs of old format. So, what's the expected behavior for such partitions. Do we 
resume from earliest offset? 
   
   @n3nash @vinothchandar : open to hear your thoughts if any. One of my 
suggestion above, could potentially add apis to Source and hence CCing you. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org