Hi Jacek,

For now , i am using Thread.sleep() on driver, to make sure my streaming
query receives some data and and stop it, before the control reaches
querying memory table.
Let me know if there is any better way of handling it.

Regards,
Satyajit.

On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi Jacek,
>
> Thank you for responding back,
>
> i have tried memory sink, and below is what i did
>
>  val fetchValue = debeziumRecords.selectExpr("
> value").withColumn("tableName", 
> functions.get_json_object($"value".cast(StringType),
> "$.schema.name"))
>     .withColumn("operation", 
> functions.get_json_object($"value".cast(StringType),
> "$.payload.op"))
>     .withColumn("payloadAfterValue", 
> split(substring_index(debeziumRecords("value"),
> "\"after\":" ,-1),",\"source\"").getItem(0))
>     .drop("tableName").drop("operation").drop("value").as[String].writeStream
>
>     .outputMode(OutputMode.Append())
>     .queryName("record")
>     .format("memory")
>     .start()
>
> spark.sql("select * from record").show(truncate = false) //i was expecting
> to be able to use the record table to read the JSON string, but the table
> is empty for the first call. And i do not see any dataframe output after
> the first one
>
> *But yeah the above steps work good and i can do things that i need to, in
> spark-shell, the problem is when i try to code in Intellij, because the
> streaming query keeps running and i am not sure how to identify and stop
> the streaming query and use record memory table.*
>
> So i would like to stop the streaming query once i know i have some data
> in my record memory table(is there a way to do that), so i can stop the
> streaming query and use the memory table, fetch my record.
> Any help on how to approach the situation programmatically/any examples
> pointed would highly be appreciated.
>
> Regards,
> Satyajit.
>
>
>
> On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi,
>>
>> What about memory sink? That could work.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://about.me/JacekLaskowski
>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
>> satyajit.apas...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I would like to infer JSON schema from a sample of data that i receive
>>> from, Kafka Streams(specific topic), and i have to infer the schema as i am
>>> going to receive random JSON string with different schema for each topic,
>>> so i chose to go ahead with below steps,
>>>
>>> a. readStream from Kafka(latest offset), from a single Kafka topic.
>>> b. Some how to store the JSON string into val and infer the schema.
>>> c. stop the stream.
>>> d.Create new readStream(smallest offset) and use the above inferred
>>> schema to process the JSON using spark provided JSON support, like
>>> from_json, json_object and others and run my actuall business logic.
>>>
>>> Now i am not sure how to be successful with step(b). Any help would be
>>> appreciated.
>>> And would also like to know if there is any better approach.
>>>
>>> Regards,
>>> Satyajit.
>>>
>>
>>
>

Reply via email to