We are using the assign API to do batch work with Spark and Kafka. What I'm seeing is the Spark executor work happening in the back ground and constantly polling the same data over and over until the main thread commits the offsets.
Is the below a blocking operation? Dataset<Row> df = spark.read().format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("assign", "topic1,topic2") .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}") .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}") .load(); df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); ########################################### Here is an example. Our desired batch is 20 records to commit on. Due to segment size (this is a test) 12 records are returned in each poll. Spark gets to offset 20 and our program is working to filter/process/commit but the Spark polling continues again in the back ground starting at offset -2 since it has not been committed yet. This suggesting the above .read.().load() is non-blocking. 2019-03-01 09:21:41 INFO [THREAD ID=main] RawHdfsFlowType:50 - Getting data from Kafka 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset -2 requested 0 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Seeking to spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 0 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Polled spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor [compacted-gap-message-0] 12 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after polling 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 1 requested 1 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 2 requested 2 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 3 requested 3 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 4 requested 4 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 5 requested 5 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 6 requested 6 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 7 requested 7 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 8 requested 8 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 9 requested 9 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 10 requested 10 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 11 requested 11 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 12 requested 12 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Seeking to spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 12 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Polled spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor [compacted-gap-message-0] 12 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Offset changed from 12 to 24 after polling 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 13 requested 13 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 14 requested 14 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 15 requested 15 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 16 requested 16 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 17 requested 17 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 18 requested 18 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 19 requested 19 2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:255 - Offset to commit from actually pulled data matches our range for partition 0 2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:264 - Original offsets to commit for partitions: {compacted-gap-message-0=20} 2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:265 - Modified offsets to commit for partitions: {compacted-gap-message-0=20} ###################### Main thread is working to commit offset 20 but Spark is moving on and doing it over since 2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for task 201] InternalKafkaConsumer:58 - Get spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor compacted-gap-message-0 nextOffset -2 requested 0 2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for task 201] InternalKafkaConsumer:58 - Seeking to spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor compacted-gap-message-0 0 2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for task 201] InternalKafkaConsumer:58 - Polled spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor [compacted-gap-message-0] 12 2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for task 201] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after polling --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org