I figured out why.  We are not persisting the data at the end of
.load().  Thus, every operation like count() is going back to Kafka
for the data again.

On Fri, Mar 1, 2019 at 10:10 AM Kristopher Kane <kkane.l...@gmail.com> wrote:
>
> We are using the assign API to do batch work with Spark and Kafka.
> What I'm seeing is the Spark executor work happening in the back
> ground and constantly polling the same data over and over until the
> main thread commits the offsets.
>
> Is the below a blocking operation?
>
>   Dataset<Row> df = spark.read().format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("assign", "topic1,topic2")
>   .option("startingOffsets",
> "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
>   .option("endingOffsets",
> "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
>   .load();
> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>
>
> ###########################################
>
> Here is an example.  Our desired batch is 20 records to commit on.
> Due to segment size (this is a test) 12 records are returned in each
> poll. Spark gets to offset 20 and our program is working to
> filter/process/commit but the Spark polling continues again in the
> back ground starting at offset -2 since it has not been committed yet.
> This suggesting the above .read.().load() is non-blocking.
>
>
> 2019-03-01 09:21:41 INFO  [THREAD ID=main] RawHdfsFlowType:50 -
> Getting data from Kafka
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset -2 requested 0
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Seeking to
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 0
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Polled
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> [compacted-gap-message-0]  12
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after
> polling
>
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 1 requested 1
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 2 requested 2
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 3 requested 3
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 4 requested 4
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 5 requested 5
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 6 requested 6
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 7 requested 7
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 8 requested 8
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 9 requested 9
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 10 requested 10
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 11 requested 11
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 12 requested 12
>
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Seeking to
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 12
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Polled
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> [compacted-gap-message-0]  12
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Offset changed from 12 to 24 after
> polling
>
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 13 requested 13
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 14 requested 14
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 15 requested 15
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 16 requested 16
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 17 requested 17
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 18 requested 18
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 19 requested 19
>
> 2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:255 -
> Offset to commit from actually pulled data matches our range for
> partition 0
> 2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:264 -
> Original offsets to commit for partitions:
> {compacted-gap-message-0=20}
> 2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:265 -
> Modified offsets to commit for partitions:
> {compacted-gap-message-0=20}
>
> ###################### Main thread is working to commit offset 20 but
> Spark is moving on and doing it over since
> 2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
> task 201] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor
> compacted-gap-message-0 nextOffset -2 requested 0
> 2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
> task 201] InternalKafkaConsumer:58 - Seeking to
> spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor
> compacted-gap-message-0 0
> 2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
> task 201] InternalKafkaConsumer:58 - Polled
> spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor
> [compacted-gap-message-0]  12
> 2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
> task 201] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after
> polling

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to