Executor heartbeats on Kubernetes
Due to settings like, "spark.kubernetes.executor.missingPodDetectDelta" I've begun to wonder about heartbeats on Kubernetes. Do executors still conduct the traditional heartbeat to the driver when run on Kubernetes? Thanks, Kris - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark 2.4 Structured Streaming Kafka assign API polling same offsets
I figured out why. We are not persisting the data at the end of .load(). Thus, every operation like count() is going back to Kafka for the data again. On Fri, Mar 1, 2019 at 10:10 AM Kristopher Kane wrote: > > We are using the assign API to do batch work with Spark and Kafka. > What I'm seeing is the Spark executor work happening in the back > ground and constantly polling the same data over and over until the > main thread commits the offsets. > > Is the below a blocking operation? > > Dataset df = spark.read().format("kafka") > .option("kafka.bootstrap.servers", "host1:port1,host2:port2") > .option("assign", "topic1,topic2") > .option("startingOffsets", > "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}") > .option("endingOffsets", > "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}") > .load(); > df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); > > > ### > > Here is an example. Our desired batch is 20 records to commit on. > Due to segment size (this is a test) 12 records are returned in each > poll. Spark gets to offset 20 and our program is working to > filter/process/commit but the Spark polling continues again in the > back ground starting at offset -2 since it has not been committed yet. > This suggesting the above .read.().load() is non-blocking. > > > 2019-03-01 09:21:41 INFO [THREAD ID=main] RawHdfsFlowType:50 - > Getting data from Kafka > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset -2 requested 0 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Seeking to > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 0 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Polled > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > [compacted-gap-message-0] 12 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after > polling > > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 1 requested 1 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 2 requested 2 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 3 requested 3 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 4 requested 4 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 5 requested 5 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 6 requested 6 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 7 requested 7 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 8 requested 8 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor > compacted-gap-message-0 nextOffset 9 requested 9 > 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for > task 0] InternalKafkaConsumer:58 - Get > spark-kafka-relation-23c2c012-41dd-46ec-8319
Spark 2.4 Structured Streaming Kafka assign API polling same offsets
We are using the assign API to do batch work with Spark and Kafka. What I'm seeing is the Spark executor work happening in the back ground and constantly polling the same data over and over until the main thread commits the offsets. Is the below a blocking operation? Dataset df = spark.read().format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("assign", "topic1,topic2") .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}") .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}") .load(); df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); ### Here is an example. Our desired batch is 20 records to commit on. Due to segment size (this is a test) 12 records are returned in each poll. Spark gets to offset 20 and our program is working to filter/process/commit but the Spark polling continues again in the back ground starting at offset -2 since it has not been committed yet. This suggesting the above .read.().load() is non-blocking. 2019-03-01 09:21:41 INFO [THREAD ID=main] RawHdfsFlowType:50 - Getting data from Kafka 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset -2 requested 0 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Seeking to spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 0 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Polled spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor [compacted-gap-message-0] 12 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after polling 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 1 requested 1 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 2 requested 2 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 3 requested 3 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 4 requested 4 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 5 requested 5 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 6 requested 6 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 7 requested 7 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 8 requested 8 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 9 requested 9 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 10 requested 10 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 11 requested 11 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Get spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 nextOffset 12 requested 12 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Seeking to spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor compacted-gap-message-0 12 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for task 0] InternalKafkaConsumer:58 - Polled