OK This is the equivalent Python code
from pyspark.sql import SparkSession from pyspark.sql.functions import expr, when from pyspark.sql.types import StructType, StructField, LongType from datetime import datetime spark = SparkSession.builder \ .master("local[*]") \ .appName("StreamingSparkPartitioned") \ .getOrCreate() expression = when(expr("value % 3 = 1"), "stupid_event") \ .otherwise(when(expr("value % 3 = 2"), "smart_event").otherwise("neutral_event")) # Define the schema to match the rate-micro-batch data source schema = StructType([StructField("timestamp", LongType()), StructField("value", LongType())]) checkpoint_path = "file:///ssd/hduser/randomdata/chkpt" # Convert human-readable timestamp to Unix timestamp in milliseconds start_timestamp = int(datetime(2024, 1, 28).timestamp() * 1000) streamingDF = spark.readStream \ .format("rate-micro-batch") \ .option("rowsPerBatch", "100") \ .option("startTimestamp", start_timestamp) \ .option("numPartitions", 1) \ .load() \ .withColumn("event_type", expression) query = ( streamingDF.writeStream .outputMode("append") .format("console") .trigger(processingTime="1 second") .option("checkpointLocation", checkpoint_path) .start() ) query.awaitTermination() This is the error I am getting File "/home/hduser/dba/bin/python/DSBQ/src/StreamingSparkPartitioned.py", line 38, in <module> query.awaitTermination() File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line 201, in awaitTermination File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 175, in deco pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = db2fd1cc-cc72-439e-9dcb-8acfd2e9a61e, runId = f71cd26f-7e86-491c-bcf2-ab2e3dc63eca] terminated with exception: No usable value for offset Did not find value which can be converted into long Seems like there might be an issue with the *rate-micro-batch* source when using the *startTimestamp* option. You can try using socket source for testing purposes HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 28 Jan 2024 at 22:00, Perfect Stranger <paulpaul1...@gmail.com> wrote: > I described the issue here: > > > https://stackoverflow.com/questions/77893939/how-does-starttimestamp-option-work-for-the-rate-micro-batch-format > > Could someone please respond? > > The rate-micro-batch format doesn't seem to respect the startTimestamp > option. > > Thanks. >