You cant do ".count()" directly on streaming DataFrames. This is because
"count" is an Action (remember RDD actions) that executes and returns a
result immediately which can be done only when the data is bounded (e.g.
batch/interactive queries). For streaming queries, you have to let it run
in the background continuously by starting it using writeStream....start().

And for streaming queries, you have specify schema from before so that at
runtime it explicitly fails when schema is incorrectly changed.

In your case, what you can do is the following.
- Run a streaming query that converts the binary data from KAFka to string,
and saves as text files (i.e. *writeStream.format("text").start("path") *)

- Then run a batch query on the saved text files with format json
(i.e.  *read.format("json").load(path)
*)  with schema inference, and get the schema from the Dataset created (i.e
Dataset.schema ).

- Then you can run the real streaming query with from_json and the learnt
schema.

Make sure that the generated text file have sufficient data to infer the
full schema. Let me know if this works for you.

TD


On Sat, May 13, 2017 at 6:04 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi!
>
> Thanks for the response. Looks like from_json requires schema ahead of
> time. Is there any function I can use to infer schema from the json
> messages I am receiving through Kafka?  I tried with the code below however
> I get the following exception.
>
> org.apache.spark.sql.AnalysisException: Queries with streaming sources
> must be executed with writeStream.start()
>
>
> //code
> val ds = datasetRows.selectExpr("CAST(value AS STRING)").toJSON //
> datasetRows is of type DataSet<Row> that I get from loading from Kafka
>
> val foo = ds.select("*").count()
> val query = foo.writeStream.outputMode("complete").format("console").sta
> rt();
> query.awaitTermination()
>
> I am just trying to parse Json messages from Kafka put into Dataframe or
> Dataset without requiring the schema and doing the simple count.
>
> Thanks!
>
>
>
> On Sat, May 13, 2017 at 3:29 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> I understand the confusing. "json" format is for json encoded files being
>> written in a directory. For Kafka, use "kafk" format. Then you decode the
>> binary data as a json, you can use the function "from_json" (spark 2.1 and
>> above). Here is our blog post on this.
>>
>> https://databricks.com/blog/2017/04/26/processing-data-in-ap
>> ache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>
>> And my talk also explains this.
>>
>> https://spark-summit.org/east-2017/events/making-structured-
>> streaming-ready-for-production-updates-and-future-directions/
>>
>> On Sat, May 13, 2017 at 3:42 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> HI All,
>>>
>>> What is the difference between sparkSession.readStream.format("kafka")
>>> vs sparkSession.readStream.format("json") ?
>>> I am sending json encoded messages in Kafka and I am not sure which one
>>> of the above I should use?
>>>
>>> Thanks!
>>>
>>>
>>
>

Reply via email to