HeartSaVioR commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1156348922
########## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ########## @@ -3038,6 +3025,107 @@ class Dataset[T] private[sql]( dropDuplicates(colNames) } + /** + * Returns a new Dataset with duplicates rows removed, within watermark. + * + * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], + * this will keep all data across triggers as intermediate state to drop duplicated rows. The + * state will be kept to guarantee the semantic, "Events are deduplicated as long as the time + * distance of earliest and latest events are smaller than the delay threshold of watermark." + * The watermark for the input [[Dataset]] must be set via [[withWatermark]]. Users are + * encouraged to set the delay threshold of watermark longer than max timestamp differences + * among duplicated events. In addition, too late data older than watermark will be dropped. + * + * @group typedrel + * @since 3.5.0 + */ + def dropDuplicatesWithinWatermark(): Dataset[T] = { + dropDuplicatesWithinWatermark(this.columns) + } + + /** + * Returns a new Dataset with duplicates rows removed, considering only the subset of columns, + * within watermark. + * + * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], + * this will keep all data across triggers as intermediate state to drop duplicated rows. The + * state will be kept to guarantee the semantic, "Events are deduplicated as long as the time + * distance of earliest and latest events are smaller than the delay threshold of watermark." + * The watermark for the input [[Dataset]] must be set via [[withWatermark]]. Users are + * encouraged to set the delay threshold of watermark longer than max timestamp differences + * among duplicated events. In addition, too late data older than watermark will be dropped. + * + * @group typedrel + * @since 3.5.0 + */ + def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = { Review Comment: E.g. Your suggestion passes the new test addition in sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java but had to change (`: _*` as you mentioned) Scala test suite to pass. Having the same set of overloaded methods makes all tests pass. (Here I meant able to compile as pass.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org