Hi Daniel, If you use state in the same app, use foreachRDD method of the stateSnapshot DStream to either persist RDD to disk (rdd.persist) or convert to DataFrame and call createOrReplaceTempView method. Code from https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html
val wordCountStateStream = wordStream.mapWithState(stateSpec) wordCountStateStream.print() // A snapshot of the state for the current batch. This dstream contains one entry per key. val stateSnapshotStream = wordCountStateStream.stateSnapshots() stateSnapshotStream.foreachRDD { rdd => rdd.toDF("word", "count").registerTempTable("batch_word_count") } Hope it helps Victor On Sun, Nov 6, 2016 at 12:53 PM, Daniel Haviv < daniel.ha...@veracity-group.com> wrote: > Hi, > How can I utilize mapWithState and DataFrames? > Right now I stream json messages from Kafka, update their state, output > the updated state as json and compose a dataframe from it. > It seems inefficient both in terms of processing and storage (a long > string instead of a compact DF). > > Is there a way to manage state for DataFrame? > > Thank you, > Daniel > -- Victor Shafran VP R&D| Equalum Mobile: +972-523854883 | Email: victor.shaf...@equalum.io