Hi Jacek,

Thank you for responding back,

i have tried memory sink, and below is what i did

 val fetchValue =
debeziumRecords.selectExpr("value").withColumn("tableName",
functions.get_json_object($"value".cast(StringType), "$.schema.name"))
    .withColumn("operation",
functions.get_json_object($"value".cast(StringType), "$.payload.op"))
    .withColumn("payloadAfterValue",
split(substring_index(debeziumRecords("value"), "\"after\":"
,-1),",\"source\"").getItem(0))

.drop("tableName").drop("operation").drop("value").as[String].writeStream

    .outputMode(OutputMode.Append())
    .queryName("record")
    .format("memory")
    .start()

spark.sql("select * from record").show(truncate = false) //i was expecting
to be able to use the record table to read the JSON string, but the table
is empty for the first call. And i do not see any dataframe output after
the first one

*But yeah the above steps work good and i can do things that i need to, in
spark-shell, the problem is when i try to code in Intellij, because the
streaming query keeps running and i am not sure how to identify and stop
the streaming query and use record memory table.*

So i would like to stop the streaming query once i know i have some data in
my record memory table(is there a way to do that), so i can stop the
streaming query and use the memory table, fetch my record.
Any help on how to approach the situation programmatically/any examples
pointed would highly be appreciated.

Regards,
Satyajit.



On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> What about memory sink? That could work.
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
> satyajit.apas...@gmail.com> wrote:
>
>> Hi All,
>>
>> I would like to infer JSON schema from a sample of data that i receive
>> from, Kafka Streams(specific topic), and i have to infer the schema as i am
>> going to receive random JSON string with different schema for each topic,
>> so i chose to go ahead with below steps,
>>
>> a. readStream from Kafka(latest offset), from a single Kafka topic.
>> b. Some how to store the JSON string into val and infer the schema.
>> c. stop the stream.
>> d.Create new readStream(smallest offset) and use the above inferred
>> schema to process the JSON using spark provided JSON support, like
>> from_json, json_object and others and run my actuall business logic.
>>
>> Now i am not sure how to be successful with step(b). Any help would be
>> appreciated.
>> And would also like to know if there is any better approach.
>>
>> Regards,
>> Satyajit.
>>
>
>

Reply via email to