Hi Jacek, Thank you for responding back,
i have tried memory sink, and below is what i did val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName", functions.get_json_object($"value".cast(StringType), "$.schema.name")) .withColumn("operation", functions.get_json_object($"value".cast(StringType), "$.payload.op")) .withColumn("payloadAfterValue", split(substring_index(debeziumRecords("value"), "\"after\":" ,-1),",\"source\"").getItem(0)) .drop("tableName").drop("operation").drop("value").as[String].writeStream .outputMode(OutputMode.Append()) .queryName("record") .format("memory") .start() spark.sql("select * from record").show(truncate = false) //i was expecting to be able to use the record table to read the JSON string, but the table is empty for the first call. And i do not see any dataframe output after the first one *But yeah the above steps work good and i can do things that i need to, in spark-shell, the problem is when i try to code in Intellij, because the streaming query keeps running and i am not sure how to identify and stop the streaming query and use record memory table.* So i would like to stop the streaming query once i know i have some data in my record memory table(is there a way to do that), so i can stop the streaming query and use the memory table, fetch my record. Any help on how to approach the situation programmatically/any examples pointed would highly be appreciated. Regards, Satyajit. On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> wrote: > Hi, > > What about memory sink? That could work. > > Pozdrawiam, > Jacek Laskowski > ---- > https://about.me/JacekLaskowski > Spark Structured Streaming https://bit.ly/spark-structured-streaming > Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna < > satyajit.apas...@gmail.com> wrote: > >> Hi All, >> >> I would like to infer JSON schema from a sample of data that i receive >> from, Kafka Streams(specific topic), and i have to infer the schema as i am >> going to receive random JSON string with different schema for each topic, >> so i chose to go ahead with below steps, >> >> a. readStream from Kafka(latest offset), from a single Kafka topic. >> b. Some how to store the JSON string into val and infer the schema. >> c. stop the stream. >> d.Create new readStream(smallest offset) and use the above inferred >> schema to process the JSON using spark provided JSON support, like >> from_json, json_object and others and run my actuall business logic. >> >> Now i am not sure how to be successful with step(b). Any help would be >> appreciated. >> And would also like to know if there is any better approach. >> >> Regards, >> Satyajit. >> > >