Re: mapWithState and DataFrames

2016-11-06 Thread Victor Shafran
Hi Daniel,

If you use state in the same app, use foreachRDD method of the
stateSnapshot DStream to either persist RDD to disk (rdd.persist) or
convert to DataFrame and call createOrReplaceTempView  method.
Code from
https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html

val wordCountStateStream = wordStream.mapWithState(stateSpec)
  wordCountStateStream.print()

  // A snapshot of the state for the current batch. This dstream contains
one entry per key.
  val stateSnapshotStream = wordCountStateStream.stateSnapshots()
  stateSnapshotStream.foreachRDD { rdd =>
rdd.toDF("word", "count").registerTempTable("batch_word_count")
  }

Hope it helps
Victor



On Sun, Nov 6, 2016 at 12:53 PM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> How can I utilize mapWithState and DataFrames?
> Right now I stream json messages from Kafka, update their state, output
> the updated state as json and compose a dataframe from it.
> It seems inefficient both in terms of processing and storage (a long
> string instead of a compact DF).
>
> Is there a way to manage state for DataFrame?
>
> Thank you,
> Daniel
>



-- 

Victor Shafran

VP R| Equalum

Mobile: +972-523854883 | Email: victor.shaf...@equalum.io


mapWithState and DataFrames

2016-11-06 Thread Daniel Haviv
Hi,
How can I utilize mapWithState and DataFrames?
Right now I stream json messages from Kafka, update their state, output the
updated state as json and compose a dataframe from it.
It seems inefficient both in terms of processing and storage (a long string
instead of a compact DF).

Is there a way to manage state for DataFrame?

Thank you,
Daniel