Hi,

Here is a little bit of background.

I've been using stateless streaming API's for a while like using
JavaDstream and so on and they worked well. It's has come to a point where
we need to do realtime stateful streaming based on event time and other
things but for now I am just trying to get used to structured streaming
API's by running simple aggregations like count(). so naturally, I tend to
think that the concepts that are behind JavaDstream would also apply in
StructuredStreaming as well (Please correct me if I am wrong). For example,
I can do the following with Dstreams without writing to any text files.

// dstreams version
jsonDStream.foreachRDD{rdd =>
    val jsonDF = spark.read.json(rdd)
    jsonDF.createOrReplaceTempView("dataframe")
}
javaStreamingContext.start()
select count(*) from dataframe;

or I can also do javaDstream.count() such that at every batch interval it
spits out the count.

I was expecting something like this with Structured Streaming as well. so I
thought of doing something like below to mimic the above version. It looks
very similar to me so I am not sure what you mean by

"For streaming queries, you have to let it run in the background
continuously by starting it using writeStream....start()." Dstreams are
also unbounded right? except at every batch interval the count() action
gets invoked so why I can't call .count() on stream of dataframes in
structured streaming (especially when it is possible with stream of RDD's
like Dstreams)? I guess I probably have some misconception somewhere.

//structured streaming version
val ds = datasetRows.selectExpr("CAST(value AS STRING)").toJSON
val foo = ds.select("*").count()
val query = foo.writeStream.outputMode("complete").format("console").
start();
query.awaitTermination()

How should I change this code to do a simple count in structured streaming?
you can assume there is schema ahead of time if thats really a problem.

since we want to do real time structured streaming we would like to avoid
any extra level of indirections such as writing to text files and so on but
if I really have to do a workaround to infer schema like writing to text
files I rather try and figure out how I can get schemas ahead of time which
is not ideal for our case but I can try to survive.

Thanks a lot!












On Sat, May 13, 2017 at 7:11 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> You cant do ".count()" directly on streaming DataFrames. This is because
> "count" is an Action (remember RDD actions) that executes and returns a
> result immediately which can be done only when the data is bounded (e.g.
> batch/interactive queries). For streaming queries, you have to let it run
> in the background continuously by starting it using writeStream....start().
>
> And for streaming queries, you have specify schema from before so that at
> runtime it explicitly fails when schema is incorrectly changed.
>
> In your case, what you can do is the following.
> - Run a streaming query that converts the binary data from KAFka to
> string, and saves as text files (i.e. 
> *writeStream.format("text").start("path")
> *)
>
> - Then run a batch query on the saved text files with format json (i.e.  
> *read.format("json").load(path)
> *)  with schema inference, and get the schema from the Dataset created
> (i.e Dataset.schema ).
>
> - Then you can run the real streaming query with from_json and the learnt
> schema.
>
> Make sure that the generated text file have sufficient data to infer the
> full schema. Let me know if this works for you.
>
> TD
>
>
> On Sat, May 13, 2017 at 6:04 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi!
>>
>> Thanks for the response. Looks like from_json requires schema ahead of
>> time. Is there any function I can use to infer schema from the json
>> messages I am receiving through Kafka?  I tried with the code below however
>> I get the following exception.
>>
>> org.apache.spark.sql.AnalysisException: Queries with streaming sources
>> must be executed with writeStream.start()
>>
>>
>> //code
>> val ds = datasetRows.selectExpr("CAST(value AS STRING)").toJSON //
>> datasetRows is of type DataSet<Row> that I get from loading from Kafka
>>
>> val foo = ds.select("*").count()
>> val query = foo.writeStream.outputMode("complete").format("console").sta
>> rt();
>> query.awaitTermination()
>>
>> I am just trying to parse Json messages from Kafka put into Dataframe or
>> Dataset without requiring the schema and doing the simple count.
>>
>> Thanks!
>>
>>
>>
>> On Sat, May 13, 2017 at 3:29 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> I understand the confusing. "json" format is for json encoded files
>>> being written in a directory. For Kafka, use "kafk" format. Then you decode
>>> the binary data as a json, you can use the function "from_json" (spark 2.1
>>> and above). Here is our blog post on this.
>>>
>>> https://databricks.com/blog/2017/04/26/processing-data-in-ap
>>> ache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>>
>>> And my talk also explains this.
>>>
>>> https://spark-summit.org/east-2017/events/making-structured-
>>> streaming-ready-for-production-updates-and-future-directions/
>>>
>>> On Sat, May 13, 2017 at 3:42 AM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> HI All,
>>>>
>>>> What is the difference between sparkSession.readStream.format("kafka")
>>>> vs sparkSession.readStream.format("json") ?
>>>> I am sending json encoded messages in Kafka and I am not sure which one
>>>> of the above I should use?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>
>>
>

Reply via email to