Re: Structured Streaming With Kafka - processing each event

2021-03-02 Thread Gourav Sengupta
Hi, Are you using structured streaming, which is the spark version and Kafka version, and where are you fetching the data from? Semantically speaking if your data in Kafka represents an action to be performed then it should be actually a queue like rabbitmq or SQS. If it is simply data then it

Re: Structured Streaming With Kafka - processing each event

2021-03-02 Thread Sachit Murarka
Hi Mich, Thanks for reply. Will checkout this. Kind Regards, Sachit Murarka On Fri, Feb 26, 2021 at 2:14 AM Mich Talebzadeh wrote: > Hi Sachit, > > I managed to make mine work using the *foreachBatch function *in > writeStream. > > "foreach" performs custom write logic on each row and

Re: Structured Streaming With Kafka - processing each event

2021-02-25 Thread Mich Talebzadeh
Hi Sachit, I managed to make mine work using the *foreachBatch function *in writeStream. "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function here foreachBatch(SendToBigQuery) expects 2 parameters,

Re: Structured Streaming With Kafka - processing each event

2021-02-25 Thread Mich Talebzadeh
BTW you intend to process these in 30 seconds? processingTime="30 seconds So how many rows of data are sent in microbatch and what is the interval at which you receive the data in batches from the producer? LinkedIn *

Re: Structured Streaming With Kafka - processing each event

2021-02-25 Thread Mich Talebzadeh
If you are receiving data from Kafka, Wouldn't that be better in Json format? . try: # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data) streamingDataFrame = self.spark \

Structured Streaming With Kafka - processing each event

2021-02-24 Thread Sachit Murarka
Hello Users, I am using Spark 3.0.1 Structuring streaming with Pyspark. My use case:: I get so many records in kafka(essentially some metadata with the location of actual data). I have to take that metadata from kafka and apply some processing. Processing includes : Reading the actual data