[ 
https://issues.apache.org/jira/browse/SPARK-30553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bettermouse updated SPARK-30553:
--------------------------------
    Description: 
[http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking]

I write code according to this by java and scala.

java
{code:java}
    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = 
SparkSession.builder().appName("test").master("local[*]")
                .config("spark.sql.shuffle.partitions", 1)
                .getOrCreate();        Dataset<Row> lines = 
spark.readStream().format("socket")
                .option("host", "skynet")
                .option("includeTimestamp", true)
                .option("port", 8888).load();
        Dataset<Row> words = lines.select("timestamp", "value");
        Dataset<Row> count = words.withWatermark("timestamp", "10 seconds")
                .groupBy(functions.window(words.col("timestamp"), "10 seconds", 
"10 seconds")
                        , words.col("value")).count();
        StreamingQuery start = count.writeStream()
                .outputMode("update")
                .format("console").start();
        start.awaitTermination();    }
{code}
scala

 
{code:java}
 def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("test").
      master("local[*]").
      config("spark.sql.shuffle.partitions", 1)
      .getOrCreate
    import spark.implicits._
    val lines = spark.readStream.format("socket").
      option("host", "skynet").option("includeTimestamp", true).
      option("port", 8888).load
    val words = lines.select("timestamp", "value")
    val count = words.withWatermark("timestamp", "10 seconds").
      groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"value")
      .count()
    val start = count.writeStream.outputMode("update").format("console").start
    start.awaitTermination()
  }
{code}
This is according to official documents. written in Java I found metrics 
"stateOnCurrentVersionSizeBytes" always increase .but scala is ok.

 

java

 
{code:java}
== Physical Plan ==
WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4176a001
+- *(4) HashAggregate(keys=[window#11, value#0], functions=[count(1)], 
output=[window#11, value#0, count#10L])
   +- StateStoreSave [window#11, value#0], state info [ checkpoint = 
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state,
 runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, numPartitions 
= 1], Update, 1579274372624, 2
      +- *(3) HashAggregate(keys=[window#11, value#0], 
functions=[merge_count(1)], output=[window#11, value#0, count#21L])
         +- StateStoreRestore [window#11, value#0], state info [ checkpoint = 
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state,
 runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, numPartitions 
= 1], 2
            +- *(2) HashAggregate(keys=[window#11, value#0], 
functions=[merge_count(1)], output=[window#11, value#0, count#21L])
               +- Exchange hashpartitioning(window#11, value#0, 1)
                  +- *(1) HashAggregate(keys=[window#11, value#0], 
functions=[partial_count(1)], output=[window#11, value#0, count#21L])
                     +- *(1) Project [named_struct(start, 
precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, 
LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as 
double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, 
TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 
0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, 
TimestampType), end, precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, 
LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as 
double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, 
TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 
0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, 
TimestampType)) AS window#11, value#0]
                        +- *(1) Filter isnotnull(timestamp#1)
                           +- EventTimeWatermark timestamp#1: timestamp, 
interval 10 seconds
                              +- LocalTableScan <empty>, [timestamp#1, value#0]

{code}
 

 

scala 

 

 
{code:java}
WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4149892c
+- *(4) HashAggregate(keys=[window#11-T10000ms, value#0], functions=[count(1)], 
output=[window#6-T10000ms, value#0, count#10L])
   +- StateStoreSave [window#11-T10000ms, value#0], state info [ checkpoint = 
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state,
 runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, numPartitions 
= 1], Update, 1579275214256, 2
      +- *(3) HashAggregate(keys=[window#11-T10000ms, value#0], 
functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
         +- StateStoreRestore [window#11-T10000ms, value#0], state info [ 
checkpoint = 
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state,
 runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, numPartitions 
= 1], 2
            +- *(2) HashAggregate(keys=[window#11-T10000ms, value#0], 
functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
               +- Exchange hashpartitioning(window#11-T10000ms, value#0, 1)
                  +- *(1) HashAggregate(keys=[window#11-T10000ms, value#0], 
functions=[partial_count(1)], output=[window#11-T10000ms, value#0, count#21L])
                     +- *(1) Project [named_struct(start, 
precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, 
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) THEN 
(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, 
TimestampType), end, precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, 
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) THEN 
(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), 
LongType, TimestampType)) AS window#11-T10000ms, value#0]
                        +- *(1) Filter isnotnull(timestamp#1-T10000ms)
                           +- EventTimeWatermark timestamp#1: timestamp, 
interval 10 seconds
                              +- LocalTableScan <empty>, [timestamp#1, value#0]
{code}
 

 you also can debug in statefulOperators.scala  
{code:java}
  protected def removeKeysOlderThanWatermark(
      storeManager: StreamingAggregationStateManager,
      store: StateStore): Unit = {
    if (watermarkPredicateForKeys.nonEmpty) {
      storeManager.keys(store).foreach { keyRow =>
        if (watermarkPredicateForKeys.get.eval(keyRow)) {
          storeManager.remove(store, keyRow)  //this line
        }
      }
    }
  }
}

{code}
you will find java does not remove old state.

 I think java should write like this
{code:java}
        SparkSession spark = 
SparkSession.builder().appName("test").master("local[*]")
                .config("spark.sql.shuffle.partitions", 1)
                .getOrCreate();        Dataset<Row> lines = 
spark.readStream().format("socket")
                .option("host", "skynet")
                .option("includeTimestamp",true)
                .option("port", 8888).load();
        Dataset<Row> words = lines.select("timestamp", "value");
        Dataset<Row> wordsWatermark = words.withWatermark("timestamp", "10 
seconds");
        Dataset<Row> count = wordsWatermark
                .groupBy(functions.window(wordsWatermark.col("timestamp"), "10 
seconds", "10 seconds")
                        , wordsWatermark.col("value")).count();
        StreamingQuery start = count.writeStream()
                .outputMode("update")
                .format("console").start();
        start.awaitTermination();    }
{code}

  was:
[http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking]

I write code according to this by java and scala.

java
{code:java}
    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = 
SparkSession.builder().appName("test").master("local[*]")
                .config("spark.sql.shuffle.partitions", 1)
                .getOrCreate();        Dataset<Row> lines = 
spark.readStream().format("socket")
                .option("host", "skynet")
                .option("includeTimestamp",true)
                .option("port", 8888).load();
        Dataset<Row> words = lines.select("timestamp", "value");
        Dataset<Row> count = words.withWatermark("timestamp", "10 seconds")
                .groupBy(functions.window(words.col("timestamp"), "10 seconds", 
"10 seconds")
                        , words.col("value")).count();
        StreamingQuery start = count.writeStream()
                .outputMode("update")
                .format("console").start();
        start.awaitTermination();    }
{code}
scala

 
{code:java}
 def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("test").
      master("local[*]").
      config("spark.sql.shuffle.partitions", 1)
      .getOrCreate
    import spark.implicits._
    val lines = spark.readStream.format("socket").
      option("host", "skynet").option("includeTimestamp", true).
      option("port", 8888).load
    val words = lines.select("timestamp", "value")
    val count = words.withWatermark("timestamp", "10 seconds").
      groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"value")
      .count()
    val start = count.writeStream.outputMode("update").format("console").start
    start.awaitTermination()
  }
{code}
This is according to official documents. written in Java I found metrics 
"stateOnCurrentVersionSizeBytes" always increase .but scala is ok.

 

java

 
{code:java}
== Physical Plan ==
== Physical Plan ==
WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4176a001
+- *(4) HashAggregate(keys=[window#11, value#0], functions=[count(1)], 
output=[window#11, value#0, count#10L])
   +- StateStoreSave [window#11, value#0], state info [ checkpoint = 
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state,
 runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, numPartitions 
= 1], Update, 1579274372624, 2
      +- *(3) HashAggregate(keys=[window#11, value#0], 
functions=[merge_count(1)], output=[window#11, value#0, count#21L])
         +- StateStoreRestore [window#11, value#0], state info [ checkpoint = 
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state,
 runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, numPartitions 
= 1], 2
            +- *(2) HashAggregate(keys=[window#11, value#0], 
functions=[merge_count(1)], output=[window#11, value#0, count#21L])
               +- Exchange hashpartitioning(window#11, value#0, 1)
                  +- *(1) HashAggregate(keys=[window#11, value#0], 
functions=[partial_count(1)], output=[window#11, value#0, count#21L])
                     +- *(1) Project [named_struct(start, 
precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, 
LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as 
double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, 
TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 
0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, 
TimestampType), end, precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, 
LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as 
double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, 
TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 
0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, 
TimestampType)) AS window#11, value#0]
                        +- *(1) Filter isnotnull(timestamp#1)
                           +- EventTimeWatermark timestamp#1: timestamp, 
interval 10 seconds
                              +- LocalTableScan <empty>, [timestamp#1, value#0]

{code}
 

 

scala 

 

 
{code:java}
WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4149892c
+- *(4) HashAggregate(keys=[window#11-T10000ms, value#0], functions=[count(1)], 
output=[window#6-T10000ms, value#0, count#10L])
   +- StateStoreSave [window#11-T10000ms, value#0], state info [ checkpoint = 
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state,
 runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, numPartitions 
= 1], Update, 1579275214256, 2
      +- *(3) HashAggregate(keys=[window#11-T10000ms, value#0], 
functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
         +- StateStoreRestore [window#11-T10000ms, value#0], state info [ 
checkpoint = 
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state,
 runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, numPartitions 
= 1], 2
            +- *(2) HashAggregate(keys=[window#11-T10000ms, value#0], 
functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
               +- Exchange hashpartitioning(window#11-T10000ms, value#0, 1)
                  +- *(1) HashAggregate(keys=[window#11-T10000ms, value#0], 
functions=[partial_count(1)], output=[window#11-T10000ms, value#0, count#21L])
                     +- *(1) Project [named_struct(start, 
precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, 
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) THEN 
(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, 
TimestampType), end, precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, 
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) THEN 
(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), 
LongType, TimestampType)) AS window#11-T10000ms, value#0]
                        +- *(1) Filter isnotnull(timestamp#1-T10000ms)
                           +- EventTimeWatermark timestamp#1: timestamp, 
interval 10 seconds
                              +- LocalTableScan <empty>, [timestamp#1, value#0]
{code}
 

 you also can debug in statefulOperators.scala  
{code:java}
  protected def removeKeysOlderThanWatermark(
      storeManager: StreamingAggregationStateManager,
      store: StateStore): Unit = {
    if (watermarkPredicateForKeys.nonEmpty) {
      storeManager.keys(store).foreach { keyRow =>
        if (watermarkPredicateForKeys.get.eval(keyRow)) {
          storeManager.remove(store, keyRow)  //this line
        }
      }
    }
  }
}

{code}
you will find java does not remove old state.

 I think java should write like this
{code:java}
        SparkSession spark = 
SparkSession.builder().appName("test").master("local[*]")
                .config("spark.sql.shuffle.partitions", 1)
                .getOrCreate();        Dataset<Row> lines = 
spark.readStream().format("socket")
                .option("host", "skynet")
                .option("includeTimestamp",true)
                .option("port", 8888).load();
        Dataset<Row> words = lines.select("timestamp", "value");
        Dataset<Row> wordsWatermark = words.withWatermark("timestamp", "10 
seconds");
        Dataset<Row> count = wordsWatermark
                .groupBy(functions.window(wordsWatermark.col("timestamp"), "10 
seconds", "10 seconds")
                        , wordsWatermark.col("value")).count();
        StreamingQuery start = count.writeStream()
                .outputMode("update")
                .format("console").start();
        start.awaitTermination();    }
{code}


> structured-streaming documentation  java   watermark group by
> -------------------------------------------------------------
>
>                 Key: SPARK-30553
>                 URL: https://issues.apache.org/jira/browse/SPARK-30553
>             Project: Spark
>          Issue Type: Bug
>          Components: Documentation
>    Affects Versions: 2.4.4
>            Reporter: bettermouse
>            Priority: Trivial
>
> [http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking]
> I write code according to this by java and scala.
> java
> {code:java}
>     public static void main(String[] args) throws StreamingQueryException {
>         SparkSession spark = 
> SparkSession.builder().appName("test").master("local[*]")
>                 .config("spark.sql.shuffle.partitions", 1)
>                 .getOrCreate();        Dataset<Row> lines = 
> spark.readStream().format("socket")
>                 .option("host", "skynet")
>                 .option("includeTimestamp", true)
>                 .option("port", 8888).load();
>         Dataset<Row> words = lines.select("timestamp", "value");
>         Dataset<Row> count = words.withWatermark("timestamp", "10 seconds")
>                 .groupBy(functions.window(words.col("timestamp"), "10 
> seconds", "10 seconds")
>                         , words.col("value")).count();
>         StreamingQuery start = count.writeStream()
>                 .outputMode("update")
>                 .format("console").start();
>         start.awaitTermination();    }
> {code}
> scala
>  
> {code:java}
>  def main(args: Array[String]): Unit = {
>     val spark = SparkSession.builder.appName("test").
>       master("local[*]").
>       config("spark.sql.shuffle.partitions", 1)
>       .getOrCreate
>     import spark.implicits._
>     val lines = spark.readStream.format("socket").
>       option("host", "skynet").option("includeTimestamp", true).
>       option("port", 8888).load
>     val words = lines.select("timestamp", "value")
>     val count = words.withWatermark("timestamp", "10 seconds").
>       groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"value")
>       .count()
>     val start = count.writeStream.outputMode("update").format("console").start
>     start.awaitTermination()
>   }
> {code}
> This is according to official documents. written in Java I found metrics 
> "stateOnCurrentVersionSizeBytes" always increase .but scala is ok.
>  
> java
>  
> {code:java}
> == Physical Plan ==
> WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4176a001
> +- *(4) HashAggregate(keys=[window#11, value#0], functions=[count(1)], 
> output=[window#11, value#0, count#10L])
>    +- StateStoreSave [window#11, value#0], state info [ checkpoint = 
> file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state,
>  runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, 
> numPartitions = 1], Update, 1579274372624, 2
>       +- *(3) HashAggregate(keys=[window#11, value#0], 
> functions=[merge_count(1)], output=[window#11, value#0, count#21L])
>          +- StateStoreRestore [window#11, value#0], state info [ checkpoint = 
> file:/C:/Users/chenhao/AppData/Local/Temp/temporary-63acf9b1-9249-40db-ab33-9dcadf5736aa/state,
>  runId = d38b8fee-6cd0-441c-87da-a4e3660856a3, opId = 0, ver = 5, 
> numPartitions = 1], 2
>             +- *(2) HashAggregate(keys=[window#11, value#0], 
> functions=[merge_count(1)], output=[window#11, value#0, count#21L])
>                +- Exchange hashpartitioning(window#11, value#0, 1)
>                   +- *(1) HashAggregate(keys=[window#11, value#0], 
> functions=[partial_count(1)], output=[window#11, value#0, count#21L])
>                      +- *(1) Project [named_struct(start, 
> precisetimestampconversion(((((CASE WHEN 
> (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, 
> LongType) - 0) as double) / 1.0E7)) as double) = 
> (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) 
> as double) / 1.0E7)) THEN 
> (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) 
> - 0) as double) / 1.0E7)) + 1) ELSE 
> CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) 
> - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, 
> TimestampType), end, precisetimestampconversion(((((CASE WHEN 
> (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, 
> LongType) - 0) as double) / 1.0E7)) as double) = 
> (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) 
> as double) / 1.0E7)) THEN 
> (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) 
> - 0) as double) / 1.0E7)) + 1) ELSE 
> CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) 
> - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, 
> TimestampType)) AS window#11, value#0]
>                         +- *(1) Filter isnotnull(timestamp#1)
>                            +- EventTimeWatermark timestamp#1: timestamp, 
> interval 10 seconds
>                               +- LocalTableScan <empty>, [timestamp#1, 
> value#0]
> {code}
>  
>  
> scala 
>  
>  
> {code:java}
> WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4149892c
> +- *(4) HashAggregate(keys=[window#11-T10000ms, value#0], 
> functions=[count(1)], output=[window#6-T10000ms, value#0, count#10L])
>    +- StateStoreSave [window#11-T10000ms, value#0], state info [ checkpoint = 
> file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state,
>  runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, 
> numPartitions = 1], Update, 1579275214256, 2
>       +- *(3) HashAggregate(keys=[window#11-T10000ms, value#0], 
> functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
>          +- StateStoreRestore [window#11-T10000ms, value#0], state info [ 
> checkpoint = 
> file:/C:/Users/chenhao/AppData/Local/Temp/temporary-8b17f74b-0963-4fee-82cd-2c1e63a75a98/state,
>  runId = dac4413d-5a82-4d61-b134-c81bfab704d8, opId = 0, ver = 7, 
> numPartitions = 1], 2
>             +- *(2) HashAggregate(keys=[window#11-T10000ms, value#0], 
> functions=[merge_count(1)], output=[window#11-T10000ms, value#0, count#21L])
>                +- Exchange hashpartitioning(window#11-T10000ms, value#0, 1)
>                   +- *(1) HashAggregate(keys=[window#11-T10000ms, value#0], 
> functions=[partial_count(1)], output=[window#11-T10000ms, value#0, count#21L])
>                      +- *(1) Project [named_struct(start, 
> precisetimestampconversion(((((CASE WHEN 
> (cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, 
> TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
> (cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
> LongType) - 0) as double) / 1.0E7)) THEN 
> (CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
> LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
> CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
> LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, 
> TimestampType), end, precisetimestampconversion(((((CASE WHEN 
> (cast(CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, 
> TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
> (cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
> LongType) - 0) as double) / 1.0E7)) THEN 
> (CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
> LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
> CEIL((cast((precisetimestampconversion(timestamp#1-T10000ms, TimestampType, 
> LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), 
> LongType, TimestampType)) AS window#11-T10000ms, value#0]
>                         +- *(1) Filter isnotnull(timestamp#1-T10000ms)
>                            +- EventTimeWatermark timestamp#1: timestamp, 
> interval 10 seconds
>                               +- LocalTableScan <empty>, [timestamp#1, 
> value#0]
> {code}
>  
>  you also can debug in statefulOperators.scala  
> {code:java}
>   protected def removeKeysOlderThanWatermark(
>       storeManager: StreamingAggregationStateManager,
>       store: StateStore): Unit = {
>     if (watermarkPredicateForKeys.nonEmpty) {
>       storeManager.keys(store).foreach { keyRow =>
>         if (watermarkPredicateForKeys.get.eval(keyRow)) {
>           storeManager.remove(store, keyRow)  //this line
>         }
>       }
>     }
>   }
> }
> {code}
> you will find java does not remove old state.
>  I think java should write like this
> {code:java}
>         SparkSession spark = 
> SparkSession.builder().appName("test").master("local[*]")
>                 .config("spark.sql.shuffle.partitions", 1)
>                 .getOrCreate();        Dataset<Row> lines = 
> spark.readStream().format("socket")
>                 .option("host", "skynet")
>                 .option("includeTimestamp",true)
>                 .option("port", 8888).load();
>         Dataset<Row> words = lines.select("timestamp", "value");
>         Dataset<Row> wordsWatermark = words.withWatermark("timestamp", "10 
> seconds");
>         Dataset<Row> count = wordsWatermark
>                 .groupBy(functions.window(wordsWatermark.col("timestamp"), 
> "10 seconds", "10 seconds")
>                         , wordsWatermark.col("value")).count();
>         StreamingQuery start = count.writeStream()
>                 .outputMode("update")
>                 .format("console").start();
>         start.awaitTermination();    }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to