Which version do you use? Above app works with Spark 2.3.1, 200 partitions
are stored for State.

    val queryStatusFile = conf.queryStatusFile()
    val rateRowPerSecond = conf.rateRowPerSecond()
    val rateRampUpTimeSecond = conf.rateRampUpTimeSecond()

    val ss = SparkSession
      .builder()
      .master("local[3]")
      .appName("state coalesce test")
      .getOrCreate()

    ss.streams.addListener(new
QueryListenerWriteProgressToFile(queryStatusFile))

    import ss.implicits._

    val df = ss.readStream
      .format("rate")
      .option("rowsPerSecond", rateRowPerSecond)
      .option("rampUpTime", s"${rateRampUpTimeSecond}s")
      .load()

    df.printSchema()

    val outDf = df.withWatermark("timestamp", "10 seconds")
      .selectExpr(
        "timestamp", "mod(value, 100) as mod", "value",
        BenchmarkQueryHelper.createCaseExprStr(
          "mod(CAST(RANDN(0) * 1000 as INTEGER), 50)", 50, 10) + " as word")
      .groupBy(
        window($"timestamp", "1 minute", "10 seconds"),
        $"mod", $"word")
      .agg(max("value").as("max_value"), min("value").as("min_value"),
avg("value").as("avg_value"))
      .coalesce(8)

    val query = outDf.writeStream
      .format("memory")
      .option("queryName", "stateCoalesceTest")
      .option("checkpointLocation", "/tmp/state-coalesce-test")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .outputMode(OutputMode.Update())
      .start()

    query.awaitTermination()

-Jungtaek Lim (HeartSaVioR)


2018년 8월 9일 (목) 오후 8:38, WangXiaolong <roland8...@163.com>님이 작성:

> Hi,
>
>    Lately, I encountered a problem, when I was writing as structured
> streaming job to write things into opentsdb.
>   The write-stream part looks something like
>
>       outputDs
>           .coalesce(14)
>           .writeStream
>           .outputMode("append")
>           .trigger(Trigger.ProcessingTime(s"$triggerSeconds seconds"))
>           .option("checkpointLocation",s"$checkpointDir/$appName/tsdb")
>           .foreach {
>             TsdbWriter(
>               tsdbUrl,
>               MongoProp(mongoUrl, mongoPort, mongoUser, mongoPassword,
> mongoDatabase, mongoCollection,mongoAuthenticationDatabase)
>             )(createMetricBuilder(tsdbMetricPrefix))
>           }
>           .start()
>
>     And when I check the checkpoint dir, I discover that the
> "/checkpoint/state" dir  is empty. I looked into the executor's log and
> found that the HDFSBackedStateStoreProvider didn't write anything on the
> checkpoint dir.
>
>    Strange thing is, when I replace the "coalesce" function into
> "repartition" function, the problem solved. Is there a difference between
> these two functions when using structured streaming?
>
>   Looking forward to you help, thanks.
>
>
>
>
>

Reply via email to