Hello All,

i've a long-running Apache Spark structured streaming job running in GCP
Dataproc, which reads data from Kafka every 10 mins, and does some
processing. Kafka topic has 3 partitions, and a retention period of 3 days.

The issue i'm facing is that after few hours, the program stops reading
data from Kafka. If i delete the gcp bucket (which is the checkpoint
directory), and then restart the streaming job - it starts consuming the
data again.

here is the code, where I'm reading data from Kafka & using foreachbatch to
call a function where the processing happens

```

df_stream = spark.readStream.format('kafka') \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location", ssl_keystore_location) \
        .option("kafka.ssl.keystore.password", ssl_keystore_password) \
        .option("kafka.bootstrap.servers", kafkaBrokers) \
        .option("subscribe", topic) \
        .option("startingOffsets", "latest") \
        .option("failOnDataLoss", "false") \
        .option("kafka.metadata.max.age.ms", "1000") \
        .option("kafka.ssl.keystore.type", "PKCS12") \
        .option("kafka.ssl.truststore.type", "PKCS12") \
        .option("maxOffsetsPerTrigger", 100000) \
        .option("max.poll.records", 500) \
        .option("max.poll.interval.ms", 1000000) \
        .load()

query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
"topic").writeStream \
        .outputMode("append") \
        .trigger(processingTime='3 minutes') \
        .option("truncate", "false") \
        .option("checkpointLocation", checkpoint) \
        .foreachBatch(convertToDictForEachBatch) \
        .start()

```

Log snippet, offset being set to latest & data not being read :

```

total time take, convertToDict :  0:00:00.00669522/08/26 17:45:03 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Member 
consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1-26755c4c-93d6-4ab6-8799-411439e310bc
sending LeaveGroup request to coordinator 35.185.24.226:9094 (id:
2147483646 rack: null) due to consumer poll timeout has expired. This
means the time between subsequent calls to poll() was longer than the
configured max.poll.interval.ms, which typically implies that the poll
loop is spending too much time processing messages. You can address
this either by increasing max.poll.interval.ms or by reducing the
maximum size of batches returned in poll() with
max.poll.records.22/08/26 17:50:00 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Giving away all assigned partitions as lost since generation has been
reset,indicating that consumer is no longer part of the group22/08/26
17:50:00 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Lost previously assigned partitions syslog.ueba-us4.v1.versa.demo3-0,
syslog.ueba-us4.v1.versa.demo3-1,
syslog.ueba-us4.v1.versa.demo3-222/08/26 17:50:00 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
(Re-)joining group22/08/26 17:50:00 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Join group failed with
org.apache.kafka.common.errors.MemberIdRequiredException: The group
member needs to have a valid member id before actually entering a
consumer group.22/08/26 17:50:00 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
(Re-)joining group22/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Finished assignment for group at generation 1:
{consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1-307570e0-ca20-42d5-b4d1-255da4fca485=Assignment(partitions=[syslog.ueba-us4.v1.versa.demo3-0,
syslog.ueba-us4.v1.versa.demo3-1,
syslog.ueba-us4.v1.versa.demo3-2])}22/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Successfully joined group with generation 122/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Notifying assignor about the new
Assignment(partitions=[syslog.ueba-us4.v1.versa.demo3-0,
syslog.ueba-us4.v1.versa.demo3-1,
syslog.ueba-us4.v1.versa.demo3-2])22/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Adding newly assigned partitions: syslog.ueba-us4.v1.versa.demo3-0,
syslog.ueba-us4.v1.versa.demo3-1,
syslog.ueba-us4.v1.versa.demo3-222/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Found no committed offset for partition
syslog.ueba-us4.v1.versa.demo3-022/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Found no committed offset for partition
syslog.ueba-us4.v1.versa.demo3-122/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Found no committed offset for partition
syslog.ueba-us4.v1.versa.demo3-222/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-1 to
offset 247969068.22/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-2 to
offset 246383018.22/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-0 to
offset 248913006.22/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Seeking to LATEST offset of partition
syslog.ueba-us4.v1.versa.demo3-022/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Seeking to LATEST offset of partition
syslog.ueba-us4.v1.versa.demo3-122/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Seeking to LATEST offset of partition
syslog.ueba-us4.v1.versa.demo3-222/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-2 to
offset 248534038.22/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-1 to
offset 248185455.22/08/26 17:50:03 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState:
[Consumer 
clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1,
groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0]
Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-0 to
offset 248990456.
 IN CONVERT TO DICT  84  currentTime  2022-08-26 17:50:03.995993  df
->  DataFrame[value: string, timestamp: timestamp, topic: string]
 before adding topic, count in batch  0
+-----+---------+-----+
|value|timestamp|topic|
+-----+---------+-----+
+-----+---------+-----+

```


Here is the oldest & latest offset in the Kafka topic (from Prometheus)

```

Current Offset (metric : kafka_topic_partition_current_offset)
partition 0 : 249185343
partition 1 : 248380971
partition 2 : 248728475

Oldest Offset (metric : kafka_topic_partition_oldest_offset)
partition 0 : 248913006
partition 1 : 247969068
partition 2 : 248541752
```

What i see in the checkpoint bucket:

```

(base) Karans-MacBook-Pro:prometheus-yamls karanalang$ gsutil cat
gs://ss-checkpoint-10m-noconsumergrp/offsets/96
v1
{"batchWatermarkMs":0,"batchTimestampMs":1661538780007,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"syslog.ueba-us4.v1.versa.demo3":{"2":246124174,"1":245765547,"0":246582707}}


```

The offsets in the checkpoint bucket are lower than the oldest offset in
the topic, is that the reason why the data is not being read ? What needs
to be done to resolve this issue ?

tia!


Here is the stackoverflow link :

https://stackoverflow.com/questions/73505391/structured-streaming-data-not-being-read-offsets-not-getting-committed

Reply via email to