YannByron commented on code in PR #7138: URL: https://github.com/apache/hudi/pull/7138#discussion_r1025925016
########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala: ########## @@ -72,57 +68,21 @@ class HoodieStreamSource( parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) && parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL) - @transient private var lastOffset: HoodieSourceOffset = _ - @transient private lazy val initialOffsets = { - val metadataLog = - new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) { - override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = { - val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) - writer.write("v" + VERSION + "\n") - writer.write(metadata.json) - writer.flush() - } - - /** - * Deserialize the init offset from the metadata file. - * The format in the metadata file is like this: - * ---------------------------------------------- - * v1 -- The version info in the first line - * offsetJson -- The json string of HoodieSourceOffset in the rest of the file - * ----------------------------------------------- - * @param in - * @return - */ - override def deserialize(in: InputStream): HoodieSourceOffset = { - val content = FileIOUtils.readAsUTFString(in) - // Get version from the first line - val firstLineEnd = content.indexOf("\n") - if (firstLineEnd > 0) { - val version = getVersion(content.substring(0, firstLineEnd)) - if (version > VERSION) { - throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" + - s" current version is: $version") - } - // Get offset from the rest line in the file - HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1)) - } else { - throw new IllegalStateException(s"Bad metadata format, failed to find the version line.") - } - } - } + val metadataLog = new HoodieMetadataLog(sqlContext.sparkSession, metadataPath) metadataLog.get(0).getOrElse { - metadataLog.add(0, INIT_OFFSET) - INIT_OFFSET - } - } - - private def getVersion(versionLine: String): Int = { - if (versionLine.startsWith("v")) { - versionLine.substring(1).toInt - } else { - throw new IllegalStateException(s"Illegal version line: $versionLine " + - s"in the streaming metadata path") + val offset = offsetRangeLimit match { + case HoodieEarliestOffsetRangeLimit => + INIT_OFFSET + case HoodieLatestOffsetRangeLimit => + getLatestOffset.getOrElse(throw new HoodieException("Cannot fetch latest offset from table, " + Review Comment: can we use INIT_OFFSET when `getLatestOffset` is empty ? I mean `getLatestOffset.getOrElse(INIT_OFFSET)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org