We are using the assign API to do batch work with Spark and Kafka.
What I'm seeing is the Spark executor work happening in the back
ground and constantly polling the same data over and over until the
main thread commits the offsets.

Is the below a blocking operation?

  Dataset<Row> df = spark.read().format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("assign", "topic1,topic2")
  .option("startingOffsets",
"{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
  .option("endingOffsets",
"{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");


###########################################

Here is an example.  Our desired batch is 20 records to commit on.
Due to segment size (this is a test) 12 records are returned in each
poll. Spark gets to offset 20 and our program is working to
filter/process/commit but the Spark polling continues again in the
back ground starting at offset -2 since it has not been committed yet.
This suggesting the above .read.().load() is non-blocking.


2019-03-01 09:21:41 INFO  [THREAD ID=main] RawHdfsFlowType:50 -
Getting data from Kafka
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset -2 requested 0
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Seeking to
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 0
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Polled
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
[compacted-gap-message-0]  12
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after
polling

2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 1 requested 1
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 2 requested 2
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 3 requested 3
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 4 requested 4
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 5 requested 5
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 6 requested 6
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 7 requested 7
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 8 requested 8
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 9 requested 9
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 10 requested 10
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 11 requested 11
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 12 requested 12

2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Seeking to
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 12
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Polled
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
[compacted-gap-message-0]  12
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Offset changed from 12 to 24 after
polling

2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 13 requested 13
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 14 requested 14
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 15 requested 15
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 16 requested 16
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 17 requested 17
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 18 requested 18
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 19 requested 19

2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:255 -
Offset to commit from actually pulled data matches our range for
partition 0
2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:264 -
Original offsets to commit for partitions:
{compacted-gap-message-0=20}
2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:265 -
Modified offsets to commit for partitions:
{compacted-gap-message-0=20}

###################### Main thread is working to commit offset 20 but
Spark is moving on and doing it over since
2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
task 201] InternalKafkaConsumer:58 - Get
spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor
compacted-gap-message-0 nextOffset -2 requested 0
2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
task 201] InternalKafkaConsumer:58 - Seeking to
spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor
compacted-gap-message-0 0
2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
task 201] InternalKafkaConsumer:58 - Polled
spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor
[compacted-gap-message-0]  12
2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
task 201] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after
polling

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to