Hello All, i've a Spark structured streaming job which reads from Kafka, does processing and puts data into Mongo/Kafka/GCP Buckets (i.e. it is processing heavy)
I'm consistently seeing the following warnings: ``` 22/09/06 16:55:03 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-4e7e7f32-19ab-44d5-99f5-59fb5a462af2-594190416-driver-0-1, groupId=spark-kafka-source-4e7e7f32-19ab-44d5-99f5-59fb5a462af2-594190416-driver-0] Member consumer-spark-kafka-source-4e7e7f32-19ab-44d5-99f5-59fb5a462af2-594190416-driver-0-1-604d740f-16d1-46b3-955c-502be5b02be1 sending LeaveGroup request to coordinator 35.237.40.54:9094 (id: 2147483645 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. ``` I'm trying to change the values of the parameters - max.poll.interval.ms & max.poll.records in the spark.readStream (shown below) ``` df_stream = spark.readStream.format('kafka') \ .option("kafka.security.protocol", "SSL") \ .option("kafka.ssl.truststore.location", ssl_truststore_location) \ .option("kafka.ssl.truststore.password", ssl_truststore_password) \ .option("kafka.ssl.keystore.location", ssl_keystore_location) \ .option("kafka.ssl.keystore.password", ssl_keystore_password) \ .option("kafka.bootstrap.servers", kafkaBrokers) \ .option("subscribe", topic) \ .option("startingOffsets", "latest") \ .option("failOnDataLoss", "false") \ .option("kafka.metadata.max.age.ms", "1000") \ .option("kafka.ssl.keystore.type", "PKCS12") \ .option("kafka.ssl.truststore.type", "PKCS12") \ .option("kafka.max.poll.interval.ms", 600000) \ .option("kafka.max.poll.records", 7000) \ .load() ``` The values that I see in the KafkaConsumer are : max.poll.interval.ms - 600000 (changed from 300000) max.poll.records - not getting changed, it is showing as 1 ``` 22/09/07 02:29:32 INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [34.138.213.152:9094] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-spark-kafka-source-17ac0d19-f30c-4db7-91b9-1dbe9172829e-594190416-driver-0-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = spark-kafka-source-17ac0d19-f30c-4db7-91b9-1dbe9172829e-594190416-driver-0 group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 600000 max.poll.records = 1 metadata.max.age.ms = 1000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = SSL security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = syslog-vani-noacl.p12 ssl.keystore.password = [hidden] ssl.keystore.type = PKCS12 ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = versa-kafka-gke-ca.p12 ssl.truststore.password = [hidden] ssl.truststore.type = PKCS12 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ``` Kafka documentation states that the default value of - max.poll.records = 500 so, few questions - How do I change the value of max.poll.records to the desired value ? Also, why is the value showing as 1, instead of the default value of 500 ? Here is the stackoverflow link as well: https://stackoverflow.com/questions/73630738/spark-structured-streaming-unable-to-change-max-poll-records-showing-as-1 tia!