Hi Daniel,

If you use state in the same app, use foreachRDD method of the
stateSnapshot DStream to either persist RDD to disk (rdd.persist) or
convert to DataFrame and call createOrReplaceTempView  method.
Code from
https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html

val wordCountStateStream = wordStream.mapWithState(stateSpec)
  wordCountStateStream.print()

  // A snapshot of the state for the current batch. This dstream contains
one entry per key.
  val stateSnapshotStream = wordCountStateStream.stateSnapshots()
  stateSnapshotStream.foreachRDD { rdd =>
    rdd.toDF("word", "count").registerTempTable("batch_word_count")
  }

Hope it helps
Victor



On Sun, Nov 6, 2016 at 12:53 PM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> How can I utilize mapWithState and DataFrames?
> Right now I stream json messages from Kafka, update their state, output
> the updated state as json and compose a dataframe from it.
> It seems inefficient both in terms of processing and storage (a long
> string instead of a compact DF).
>
> Is there a way to manage state for DataFrame?
>
> Thank you,
> Daniel
>



-- 

Victor Shafran

VP R&D| Equalum

Mobile: +972-523854883 | Email: victor.shaf...@equalum.io

Reply via email to