Hi Mich, the issue was related to incorrect, which is resolved. However, wrt your comment - 'OK sounds like your watermark is done outside of your processing.'
In my use-case which primarily deals with syslogs, syslog is a string which needs to be parsed (with defensive coding built in to ensure records are in correct format), before it is fed to 3 different classes (AlarmProc being one of them) - where there is additional parsing + aggregation for specific types of logs. The way I'm handling this is by using -- foreachBatch(convertToDict) in the writeStream method, and the parsing + aggregation happens for the microbatch. foreachBatch - will wait for the parsing and aggregation to complete for the microbatch, and then proceed to do the same with the next microbatch. Since it involves a lot of parsing + aggregation, it requires more than a df.select() - hence the approach above is taken. >From what I understand, the watermark is done within the processing .. since it is done per microbatch pulled with each trigger. Pls let me know if you have comments/suggestions on this approach. thanks, Karan Alang On Wed, Feb 16, 2022 at 12:52 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > OK sounds like your watermark is done outside of your processing. > > Check this > > # construct a streaming dataframe streamingDataFrame that > subscribes to topic temperature > streamingDataFrame = self.spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", > config['MDVariables']['bootstrapServers'],) \ > .option("schema.registry.url", > config['MDVariables']['schemaRegistryURL']) \ > .option("group.id", config['common']['appName']) \ > .option("zookeeper.connection.timeout.ms", > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ > .option("rebalance.backoff.ms", > config['MDVariables']['rebalanceBackoffMS']) \ > .option("zookeeper.session.timeout.ms", > config['MDVariables']['zookeeperSessionTimeOutMs']) \ > .option("auto.commit.interval.ms", > config['MDVariables']['autoCommitIntervalMS']) \ > .option("subscribe", "temperature") \ > .option("failOnDataLoss", "false") \ > .option("includeHeaders", "true") \ > .option("startingOffsets", "latest") \ > .load() \ > .select(from_json(col("value").cast("string"), > schema).alias("parsed_value")) > > > resultM = streamingDataFrame.select( \ > col("parsed_value.rowkey").alias("rowkey") \ > , col("parsed_value.timestamp").alias("timestamp") \ > , col("parsed_value.temperature").alias("temperature")) > result = resultM. \ > withWatermark("timestamp", "5 minutes"). \ > groupBy(window(resultM.timestamp, "5 minutes", "5 > minutes")). \ > avg('temperature'). \ > writeStream. \ > outputMode('complete'). \ > option("numRows", 1000). \ > option("truncate", "false"). \ > format('console'). \ > option('checkpointLocation', checkpoint_path). \ > queryName("temperature"). \ > start() > > HTH > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 16 Feb 2022 at 06:37, karan alang <karan.al...@gmail.com> wrote: > >> >> Hello All, >> >> I have a Structured Streaming pyspark program running on GCP Dataproc, >> which reads data from Kafka, and does some data massaging, and aggregation. >> I'm trying to use withWatermark(), and it is giving error. >> >> py4j.Py4JException: An exception was raised by the Python Proxy. Return >> Message: Traceback (most recent call last): >> >> File >> "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line >> 2442, in _call_proxy >> >> return_value = getattr(self.pool[obj_id], method)(*params) >> >> File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", >> line 196, in call >> >> raise e >> >> File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", >> line 193, in call >> >> self.func(DataFrame(jdf, self.sql_ctx), batch_id) >> >> File >> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", >> line 444, in convertToDictForEachBatch >> >> ap = Alarm(tdict, spark) >> >> File >> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", >> line 356, in __init__ >> >> computeCount(l_alarm_df, l_alarm1_df) >> >> File >> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", >> line 262, in computeCount >> >> window(col("timestamp"), "10 minutes").alias("window") >> >> TypeError: 'module' object is not callable >> >> Details are in stackoverflow below : >> >> https://stackoverflow.com/questions/71137296/structuredstreaming-withwatermark-typeerror-module-object-is-not-callable >> >> Any ideas on how to debug/fix this ? >> tia ! >> >