Thanks guys! I am using SSS to backfill the past 3 month data. I thought I can use SSS for both history data and new data. I just realized that SSS is not appropriate for backfilling since the watermark relies on receivedAt which could be 3 month ago. I will use batch job for backfill and use SSS (with watermark and spark-states) for the real time processing.
On Sun, Mar 10, 2019 at 2:40 PM Jungtaek Lim <kabh...@gmail.com> wrote: > The query makes state growing infinitely. Could you consider watermark > apply to "receivedAt" to let unnecessary part of state cleared out? Other > than watermark you could implement TTL based eviction via > flatMapGroupsWithState, though you'll need to implement your custom > "dropDuplicate". > > 2019년 3월 11일 (월) 오전 5:59, Georg Heiler <georg.kf.hei...@gmail.com>님이 작성: > >> Use https://github.com/chermenin/spark-states instead >> >> Am So., 10. März 2019 um 20:51 Uhr schrieb Arun Mahadevan < >> ar...@apache.org>: >> >>> >>> Read the link carefully, >>> >>> This solution is available (*only*) in Databricks Runtime. >>> >>> You can enable RockDB-based state management by setting the following >>> configuration in the SparkSession before starting the streaming query. >>> >>> spark.conf.set( >>> "spark.sql.streaming.stateStore.providerClass", >>> "com.databricks.sql.streaming.state.RocksDBStateStoreProvider") >>> >>> >>> On Sun, 10 Mar 2019 at 11:54, Lian Jiang <jiangok2...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I have a very simple SSS pipeline which does: >>>> >>>> val query = df >>>> .dropDuplicates(Array("Id", "receivedAt")) >>>> .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt"))) >>>> .writeStream >>>> .format("parquet") >>>> .partitionBy("availabilityDomain", timePartitionCol) >>>> .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES)) >>>> .option("path", "/data") >>>> .option("checkpointLocation", "/data_checkpoint") >>>> .start() >>>> >>>> After ingesting 2T records, the state under checkpoint folder on HDFS >>>> (replicator factor 2) grows to 2T bytes. >>>> My cluster has only 2T bytes which means the cluster can barely handle >>>> further data growth. >>>> >>>> Online spark documents >>>> (https://docs.databricks.com/spark/latest/structured-streaming/production.html) >>>> says using rocksdb help SSS job reduce JVM memory overhead. But I cannot >>>> find any document how >>>> >>>> to setup rocksdb for SSS. Spark class CheckpointReader seems to only >>>> handle HDFS. >>>> >>>> Any suggestions? Thanks! >>>> >>>> >>>> >>>>