In Spark 2.2, you can read from Kafka in batch mode, and then use the json
reader to infer schema:

val df = spark.read.format("kafka")...
  .select($"value.cast("string"))
val json = spark.read.json(df)
val schema = json.schema

While the above should be slow (since you're reading almost all data in
Kafka in batch), but it would work.

My question to you is, do you think it's worth it? Why do you have a random
json schema being inputted to your Kafka stream? Can this randomness not
mess up everything in the future if someone messes up? Not having fixed,
known schemas with streaming data (or any data for that matter) is
dangerous for most purposes.
Just food for thought.

Best,
Burak



On Mon, Dec 11, 2017 at 4:01 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> What about a custom streaming Sink that would stop the query after
> addBatch has been called?
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna <
> satyajit.apas...@gmail.com> wrote:
>
>> Hi Jacek,
>>
>> For now , i am using Thread.sleep() on driver, to make sure my streaming
>> query receives some data and and stop it, before the control reaches
>> querying memory table.
>> Let me know if there is any better way of handling it.
>>
>> Regards,
>> Satyajit.
>>
>> On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <
>> satyajit.apas...@gmail.com> wrote:
>>
>>> Hi Jacek,
>>>
>>> Thank you for responding back,
>>>
>>> i have tried memory sink, and below is what i did
>>>
>>>  val fetchValue = 
>>> debeziumRecords.selectExpr("value").withColumn("tableName",
>>> functions.get_json_object($"value".cast(StringType), "$.schema.name"))
>>>     .withColumn("operation", 
>>> functions.get_json_object($"value".cast(StringType),
>>> "$.payload.op"))
>>>     .withColumn("payloadAfterValue", 
>>> split(substring_index(debeziumRecords("value"),
>>> "\"after\":" ,-1),",\"source\"").getItem(0))
>>>     
>>> .drop("tableName").drop("operation").drop("value").as[String].writeStream
>>>
>>>     .outputMode(OutputMode.Append())
>>>     .queryName("record")
>>>     .format("memory")
>>>     .start()
>>>
>>> spark.sql("select * from record").show(truncate = false) //i was
>>> expecting to be able to use the record table to read the JSON string, but
>>> the table is empty for the first call. And i do not see any dataframe
>>> output after the first one
>>>
>>> *But yeah the above steps work good and i can do things that i need to,
>>> in spark-shell, the problem is when i try to code in Intellij, because the
>>> streaming query keeps running and i am not sure how to identify and stop
>>> the streaming query and use record memory table.*
>>>
>>> So i would like to stop the streaming query once i know i have some data
>>> in my record memory table(is there a way to do that), so i can stop the
>>> streaming query and use the memory table, fetch my record.
>>> Any help on how to approach the situation programmatically/any examples
>>> pointed would highly be appreciated.
>>>
>>> Regards,
>>> Satyajit.
>>>
>>>
>>>
>>> On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> What about memory sink? That could work.
>>>>
>>>> Pozdrawiam,
>>>> Jacek Laskowski
>>>> ----
>>>> https://about.me/JacekLaskowski
>>>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>>>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>>>> Follow me at https://twitter.com/jaceklaskowski
>>>>
>>>> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
>>>> satyajit.apas...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I would like to infer JSON schema from a sample of data that i receive
>>>>> from, Kafka Streams(specific topic), and i have to infer the schema as i 
>>>>> am
>>>>> going to receive random JSON string with different schema for each topic,
>>>>> so i chose to go ahead with below steps,
>>>>>
>>>>> a. readStream from Kafka(latest offset), from a single Kafka topic.
>>>>> b. Some how to store the JSON string into val and infer the schema.
>>>>> c. stop the stream.
>>>>> d.Create new readStream(smallest offset) and use the above inferred
>>>>> schema to process the JSON using spark provided JSON support, like
>>>>> from_json, json_object and others and run my actuall business logic.
>>>>>
>>>>> Now i am not sure how to be successful with step(b). Any help would be
>>>>> appreciated.
>>>>> And would also like to know if there is any better approach.
>>>>>
>>>>> Regards,
>>>>> Satyajit.
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to