For that simple count, you don't actually have to even parse the JSON
data.  You can just do a count.  The following code assumes you are
running Spark
2.2
<http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html>
.

df.groupBy().count().writeStream.outputMode("co
mplete").format("console").start()

If you want to do something more complicated, you will need specify the
schema at least for the columns that you want Spark to understand.  We need
to know the names and types of the column so that we know how to extract
them from the JSON.  Its okay however to omit columns that you don't care
about.

df.select(from_json($"value".cast("string"), "name STRING, age INT") as
'message).groupBy($"message.name").agg(avg($"age"))

If you are not sure what json looks like, you can ask spark to infer it
based on a sample.

spark.read.json(spark.read.format("kafka").option(...).load().limit(1000).select($"value".as[String])).printSchema()

On Sat, May 13, 2017 at 8:48 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi,
>
> Here is a little bit of background.
>
> I've been using stateless streaming API's for a while like using
> JavaDstream and so on and they worked well. It's has come to a point where
> we need to do realtime stateful streaming based on event time and other
> things but for now I am just trying to get used to structured streaming
> API's by running simple aggregations like count(). so naturally, I tend to
> think that the concepts that are behind JavaDstream would also apply in
> StructuredStreaming as well (Please correct me if I am wrong). For example,
> I can do the following with Dstreams without writing to any text files.
>
> // dstreams version
> jsonDStream.foreachRDD{rdd =>
>     val jsonDF = spark.read.json(rdd)
>     jsonDF.createOrReplaceTempView("dataframe")
> }
> javaStreamingContext.start()
> select count(*) from dataframe;
>
> or I can also do javaDstream.count() such that at every batch interval it
> spits out the count.
>
> I was expecting something like this with Structured Streaming as well. so
> I thought of doing something like below to mimic the above version. It
> looks very similar to me so I am not sure what you mean by
>
> "For streaming queries, you have to let it run in the background
> continuously by starting it using writeStream....start()." Dstreams are
> also unbounded right? except at every batch interval the count() action
> gets invoked so why I can't call .count() on stream of dataframes in
> structured streaming (especially when it is possible with stream of RDD's
> like Dstreams)? I guess I probably have some misconception somewhere.
>
> //structured streaming version
> val ds = datasetRows.selectExpr("CAST(value AS STRING)").toJSON
> val foo = ds.select("*").count()
> val query = foo.writeStream.outputMode("complete").format("console").sta
> rt();
> query.awaitTermination()
>
> How should I change this code to do a simple count in structured
> streaming? you can assume there is schema ahead of time if thats really a
> problem.
>
> since we want to do real time structured streaming we would like to avoid
> any extra level of indirections such as writing to text files and so on but
> if I really have to do a workaround to infer schema like writing to text
> files I rather try and figure out how I can get schemas ahead of time which
> is not ideal for our case but I can try to survive.
>
> Thanks a lot!
>
>
>
>
>
>
>
>
>
>
>
>
> On Sat, May 13, 2017 at 7:11 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> You cant do ".count()" directly on streaming DataFrames. This is because
>> "count" is an Action (remember RDD actions) that executes and returns a
>> result immediately which can be done only when the data is bounded (e.g.
>> batch/interactive queries). For streaming queries, you have to let it run
>> in the background continuously by starting it using writeStream....start().
>>
>> And for streaming queries, you have specify schema from before so that at
>> runtime it explicitly fails when schema is incorrectly changed.
>>
>> In your case, what you can do is the following.
>> - Run a streaming query that converts the binary data from KAFka to
>> string, and saves as text files (i.e. 
>> *writeStream.format("text").start("path")
>> *)
>>
>> - Then run a batch query on the saved text files with format json (i.e.  
>> *read.format("json").load(path)
>> *)  with schema inference, and get the schema from the Dataset created
>> (i.e Dataset.schema ).
>>
>> - Then you can run the real streaming query with from_json and the learnt
>> schema.
>>
>> Make sure that the generated text file have sufficient data to infer the
>> full schema. Let me know if this works for you.
>>
>> TD
>>
>>
>> On Sat, May 13, 2017 at 6:04 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> Thanks for the response. Looks like from_json requires schema ahead of
>>> time. Is there any function I can use to infer schema from the json
>>> messages I am receiving through Kafka?  I tried with the code below however
>>> I get the following exception.
>>>
>>> org.apache.spark.sql.AnalysisException: Queries with streaming sources
>>> must be executed with writeStream.start()
>>>
>>>
>>> //code
>>> val ds = datasetRows.selectExpr("CAST(value AS STRING)").toJSON //
>>> datasetRows is of type DataSet<Row> that I get from loading from Kafka
>>>
>>> val foo = ds.select("*").count()
>>> val query = foo.writeStream.outputMode("complete").format("console").sta
>>> rt();
>>> query.awaitTermination()
>>>
>>> I am just trying to parse Json messages from Kafka put into Dataframe or
>>> Dataset without requiring the schema and doing the simple count.
>>>
>>> Thanks!
>>>
>>>
>>>
>>> On Sat, May 13, 2017 at 3:29 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> I understand the confusing. "json" format is for json encoded files
>>>> being written in a directory. For Kafka, use "kafk" format. Then you decode
>>>> the binary data as a json, you can use the function "from_json" (spark 2.1
>>>> and above). Here is our blog post on this.
>>>>
>>>> https://databricks.com/blog/2017/04/26/processing-data-in-ap
>>>> ache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>>>
>>>> And my talk also explains this.
>>>>
>>>> https://spark-summit.org/east-2017/events/making-structured-
>>>> streaming-ready-for-production-updates-and-future-directions/
>>>>
>>>> On Sat, May 13, 2017 at 3:42 AM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> HI All,
>>>>>
>>>>> What is the difference between sparkSession.readStream.format("kafka")
>>>>> vs sparkSession.readStream.format("json") ?
>>>>> I am sending json encoded messages in Kafka and I am not sure which
>>>>> one of the above I should use?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to