[GitHub] [hudi] YannByron commented on a diff in pull request #7138: [HUDI-5162] Allow user specified start offset for streaming query

2022-11-17 Thread GitBox


YannByron commented on code in PR #7138:
URL: https://github.com/apache/hudi/pull/7138#discussion_r1025940652


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##
@@ -18,7 +18,6 @@
 package org.apache.hudi
 
 import org.apache.hadoop.fs.Path
-

Review Comment:
   please keep the import code style that separate the different package by a 
blank line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] YannByron commented on a diff in pull request #7138: [HUDI-5162] Allow user specified start offset for streaming query

2022-11-17 Thread GitBox


YannByron commented on code in PR #7138:
URL: https://github.com/apache/hudi/pull/7138#discussion_r1025925016


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala:
##
@@ -72,57 +68,21 @@ class HoodieStreamSource(
 
parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
 &&
 
parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
 
-  @transient private var lastOffset: HoodieSourceOffset = _
-
   @transient private lazy val initialOffsets = {
-val metadataLog =
-  new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, 
metadataPath) {
-override def serialize(metadata: HoodieSourceOffset, out: 
OutputStream): Unit = {
-  val writer = new BufferedWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))
-  writer.write("v" + VERSION + "\n")
-  writer.write(metadata.json)
-  writer.flush()
-}
-
-/**
-  * Deserialize the init offset from the metadata file.
-  * The format in the metadata file is like this:
-  * --
-  * v1 -- The version info in the first line
-  * offsetJson -- The json string of HoodieSourceOffset in the rest of 
the file
-  * ---
-  * @param in
-  * @return
-  */
-override def deserialize(in: InputStream): HoodieSourceOffset = {
-  val content = FileIOUtils.readAsUTFString(in)
-  // Get version from the first line
-  val firstLineEnd = content.indexOf("\n")
-  if (firstLineEnd > 0) {
-val version = getVersion(content.substring(0, firstLineEnd))
-if (version > VERSION) {
-  throw new IllegalStateException(s"UnSupportVersion: max support 
version is: $VERSION" +
-s" current version is: $version")
-}
-// Get offset from the rest line in the file
-HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
-  } else {
-throw new IllegalStateException(s"Bad metadata format, failed to 
find the version line.")
-  }
-}
-  }
+val metadataLog = new HoodieMetadataLog(sqlContext.sparkSession, 
metadataPath)
 metadataLog.get(0).getOrElse {
-  metadataLog.add(0, INIT_OFFSET)
-  INIT_OFFSET
-}
-  }
-
-  private def getVersion(versionLine: String): Int = {
-if (versionLine.startsWith("v")) {
-  versionLine.substring(1).toInt
-} else {
-  throw new IllegalStateException(s"Illegal version line: $versionLine " +
-s"in the streaming metadata path")
+  val offset = offsetRangeLimit match {
+case HoodieEarliestOffsetRangeLimit =>
+  INIT_OFFSET
+case HoodieLatestOffsetRangeLimit =>
+  getLatestOffset.getOrElse(throw new HoodieException("Cannot fetch 
latest offset from table, " +

Review Comment:
   can we use INIT_OFFSET when `getLatestOffset` is empty ? I mean 
`getLatestOffset.getOrElse(INIT_OFFSET)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org