[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-07-16 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r671607330



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##
@@ -212,6 +234,9 @@ public KafkaOffsetGen(TypedProperties props) {
   Set topicPartitions = partitionInfoList.stream()
   .map(x -> new TopicPartition(x.topic(), 
x.partition())).collect(Collectors.toSet());
 
+  if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) 
&& isValidCheckpointType(lastCheckpointStr)) {
+lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList, 
topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get()));
+  }

Review comment:
   ok, I get it now. makes sense. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-07-16 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r671353717



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##
@@ -212,6 +234,9 @@ public KafkaOffsetGen(TypedProperties props) {
   Set topicPartitions = partitionInfoList.stream()
   .map(x -> new TopicPartition(x.topic(), 
x.partition())).collect(Collectors.toSet());
 
+  if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) 
&& isValidCheckpointType(lastCheckpointStr)) {
+lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList, 
topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get()));
+  }

Review comment:
   not sure I understand. this is what I am thinking
   ```
   if (timestamp based checkpoint)
   lastCheckpoint = getOffsetByTimestamp()
   else if regular checkpoint type
  lastCheckpoint = fetValidOffsets()
   else 
  reset based on auto.offset.reset. 
   ```
   
   Am I misunderstanding anything here? Can you help me understand please.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-07-16 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r671353717



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##
@@ -212,6 +234,9 @@ public KafkaOffsetGen(TypedProperties props) {
   Set topicPartitions = partitionInfoList.stream()
   .map(x -> new TopicPartition(x.topic(), 
x.partition())).collect(Collectors.toSet());
 
+  if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) 
&& isValidCheckpointType(lastCheckpointStr)) {
+lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList, 
topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get()));
+  }

Review comment:
   not sure I understand. this is what I am thinking
   ```
   if (timestamp based checkpoint)
   lastCheckpoint = getOffsetByTimestamp()
   else if regular checkpoint type
  lastCheckpoint = fetValidOffsets()
   else 
  reset based on auto.offset.reset. 
   ```
   
   Am I misunderstanding anything here? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-07-15 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r670978737



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##
@@ -212,6 +234,9 @@ public KafkaOffsetGen(TypedProperties props) {
   Set topicPartitions = partitionInfoList.stream()
   .map(x -> new TopicPartition(x.topic(), 
x.partition())).collect(Collectors.toSet());
 
+  if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) 
&& isValidCheckpointType(lastCheckpointStr)) {
+lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList, 
topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get()));
+  }

Review comment:
   I was expecting a else if block after this line. Can you clarify please. 
If not, we might go into the else block ? 

##
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
##
@@ -64,7 +63,7 @@ public void teardown() throws Exception {
 
   private TypedProperties getConsumerConfigs(String autoOffsetReset) {
 TypedProperties props = new TypedProperties();
-props.put(Config.KAFKA_AUTO_OFFSET_RESET, autoOffsetReset);
+props.put("auto.offset.reset", autoOffsetReset);

Review comment:
   Do you think we can add some tests to this class for the timestamp type? 

##
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
##
@@ -193,7 +193,7 @@ public void testJsonKafkaSourceWithDefaultUpperCap() {
 
 Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, 
schemaProvider, metrics);
 SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
-Config.maxEventsFromKafkaSource = 500;
+//props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");

Review comment:
   why commented out? 

##
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
##
@@ -193,7 +193,7 @@ public void testJsonKafkaSourceWithDefaultUpperCap() {
 
 Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, 
schemaProvider, metrics);
 SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
-Config.maxEventsFromKafkaSource = 500;
+//props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");

Review comment:
   I tried your patch locally. the test fails if I uncomment this line. I 
don't understand ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-07-03 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r663377708



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##
@@ -282,6 +301,36 @@ private Long delayOffsetCalculation(Option 
lastCheckpointStr, Set lastCheckpointStr) {

Review comment:
   should we name this "isValidCheckpointType" or something? also, can you 
add java docs as to what validation we are doing here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-06-24 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r657945070



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -312,13 +313,13 @@ public void refreshTimeline() throws IOException {
   if (lastCommit.isPresent()) {
 HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
 
.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), 
HoodieCommitMetadata.class);
-if (cfg.checkpoint != null && 
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
-  resumeCheckpointStr = Option.of(cfg.checkpoint);
-} else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
-  //if previous checkpoint is an empty string, skip resume use 
Option.empty()
-  if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) {
-resumeCheckpointStr = 
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
+if (cfg.checkpoint != null) {

Review comment:
   we could club both these within single if condition. 
   ```
   if (cfg.checkpoint != null && 
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
 || 
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
   resumeCheckpointStr = Option.of(cfg.checkpoint);
   }

##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -330,6 +331,9 @@ public void refreshTimeline() throws IOException {
   + 
commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", 
CommitMetadata="
   + commitMetadata.toJsonString());
 }
+if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
+  props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", 
"string");

Review comment:
   actually better thing to do here is to remove the entry from props. wdyt?

##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -330,6 +331,9 @@ public void refreshTimeline() throws IOException {
   + 
commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", 
CommitMetadata="
   + commitMetadata.toJsonString());
 }
+if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
+  props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", 
"string");

Review comment:
   rather than hardcoding the config here, can we use variable please.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-06-23 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r657546921



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -461,7 +465,7 @@ public void refreshTimeline() throws IOException {
 if (!hasErrors || cfg.commitOnErrors) {
   HashMap checkpointCommitMetadata = new HashMap<>();
   checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
-  if (cfg.checkpoint != null) {
+  if (cfg.checkpoint != null && 
!"timestamp".equals(props.getString("hoodie.deltastreamer.source.kafka.checkpoint.type")))
 {

Review comment:
   Can you help me understand why we need this ? My understanding is that, 
user will set cfg.checkpoint during first batch and set the checkpoint type (to 
timestamp) as well. but even for any checkpoint types, we should add the 
checkpoint_reset_key here at the end of 1st batch. Am I missing something. can 
you please help me understand. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-05-29 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r641925742



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##
@@ -326,6 +328,16 @@ private boolean onDeltaSyncShutdown(boolean error) {
 @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer 
from this checkpoint.")
 public String checkpoint = null;
 
+/**
+ * 1. string: topicName,partition number 0:offset value,partition number 
1:offset value

Review comment:
   this format is specific to kafka. lets call it out. other sources could 
have checkpoint differently. 

##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##
@@ -326,6 +328,16 @@ private boolean onDeltaSyncShutdown(boolean error) {
 @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer 
from this checkpoint.")
 public String checkpoint = null;
 
+/**
+ * 1. string: topicName,partition number 0:offset value,partition number 
1:offset value
+ * 2. timestamp: kafka offset timestamp
+ * example
+ * 1. hudi_topic,0:100,1:101,2:201
+ * 2. 1621947081
+ */
+@Parameter(names = {"--checkpoint-type"}, description = "Checkpoint type, 
divided into timestamp or string offset")
+public String checkpointType = "string";

Review comment:
   I am contemplating between "string" or "default" or "regular" to be set 
as default checkpoint type. @n3nash : any thoughts. We are looking to introduce 
a new config called checkpoint type. by default we need to set some value. this 
patch adds a new checkpoint type "timestamp" for kafka source. 

##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -38,6 +38,7 @@
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;

Review comment:
   can we revert unintended changes in this file. 

##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##
@@ -326,6 +328,16 @@ private boolean onDeltaSyncShutdown(boolean error) {
 @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer 
from this checkpoint.")
 public String checkpoint = null;
 
+/**
+ * 1. string: topicName,partition number 0:offset value,partition number 
1:offset value
+ * 2. timestamp: kafka offset timestamp
+ * example
+ * 1. hudi_topic,0:100,1:101,2:201
+ * 2. 1621947081
+ */
+@Parameter(names = {"--checkpoint-type"}, description = "Checkpoint type, 
divided into timestamp or string offset")
+public String checkpointType = "string";

Review comment:
   sorry, why do we have this config in two places. We have it defined as 
top level config in HoodieDeltaStreamer.Config. But in KafkaOffsetGen, I see 
you are accessing it as "hoodie.deltastreamer.source.kafka.checkpoint.type". 
May be we should rely on this config param and remove it from top level since 
this is applicable just to kafka for now. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-03-31 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r604841403



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##
@@ -247,6 +266,32 @@ private Long delayOffsetCalculation(Option 
lastCheckpointStr, Set partitionInfoList, String topicName, Long timestamp) {

Review comment:
   Can we add tests for the new code that is added. I don't see any tests. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-03-31 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r604840433



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##
@@ -553,6 +555,11 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, 
FileSystem fs, Config
   "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to 
ensure updates are not missed.");
 
   this.props = properties.get();
+  String kafkaCheckpointTimestamp = 
props.getString(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TIMESTAMP, "");

Review comment:
   Let me think more on this. Wondering if we should just rely on existing 
"HoodieDeltaStreamer.Config.checkpoint" only and add another config named 
"checkpoint.type" or something which could be set to timestamp for this 
purpose. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-03-31 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r604828288



##
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
##
@@ -65,7 +65,7 @@ public void scheduleCompact() throws Exception {
 return upsert(WriteOperationType.UPSERT);
   }
 
-  public Pair>> 
fetchSource() throws Exception {
+  public Pair>, Pair> fetchSource() throws Exception {

Review comment:
   actually my PR was closed as it was invalid. But 
[here](https://github.com/nsivabalan/hudi/blob/f7439e2e28748bf7b713fb72ba611f8af7bb97a1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ReadBatch.java)
 is the class that I added. May be you can add it in this patch only. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-03-15 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r594471195



##
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
##
@@ -65,7 +65,7 @@ public void scheduleCompact() throws Exception {
 return upsert(WriteOperationType.UPSERT);
   }
 
-  public Pair>> 
fetchSource() throws Exception {
+  public Pair>, Pair> fetchSource() throws Exception {

Review comment:
   this is getting out of hand(two pairs within a pair). we can't keep 
adding more Pairs here. I am adding a class to hold the return value in a class 
here in one of my PRs. Lets see if we can rebase once the other PR lands.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #2438: [HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp

2021-03-15 Thread GitBox


nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r594471195



##
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
##
@@ -65,7 +65,7 @@ public void scheduleCompact() throws Exception {
 return upsert(WriteOperationType.UPSERT);
   }
 
-  public Pair>> 
fetchSource() throws Exception {
+  public Pair>, Pair> fetchSource() throws Exception {

Review comment:
   this is getting out of hand(two pairs within a pair). we can't keep 
adding more Pairs here. I am adding a class to hold the return value here in 
one of my PRs. Lets see if we can rebase once the other PR lands.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org