i checked my topic. it has 5 partitions but all the data is written to a single partition: wikipedia-2 i turned on debug logging and i see this:
2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2, wikipedia-1]. Seeking to the end. 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of partition wikipedia-0 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of partition wikipedia-4 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of partition wikipedia-3 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of partition wikipedia-2 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of partition wikipedia-1 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition wikipedia-0 to latest offset. 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, offset=0} for partition wikipedia-0 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition wikipedia-0 to earliest offset. 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, offset=0} for partition wikipedia-0 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition wikipedia-4 to latest offset. 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, offset=0} for partition wikipedia-4 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition wikipedia-4 to earliest offset. 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, offset=0} for partition wikipedia-4 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition wikipedia-3 to latest offset. 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received successful heartbeat response for group spark-kafka-source-fac4f749-fd56-4a32-82c7-e687aadf520b-1923704552-driver-0 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, offset=0} for partition wikipedia-3 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition wikipedia-3 to earliest offset. 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, offset=0} for partition wikipedia-3 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition wikipedia-2 to latest offset. 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, offset=152908} for partition wikipedia-2 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition wikipedia-2 to earliest offset. 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, offset=0} for partition wikipedia-2 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition wikipedia-1 to latest offset. 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, offset=0} for partition wikipedia-1 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0) what is confusing to me is this: Resetting offset for partition wikipedia-2 to latest offset. Fetched {timestamp=-1, offset=152908} for partition wikipedia-2 Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0) why does it find latest offset 152908 for wikipedia-2 but then sets latest offset to 0 for that partition? or am i misunderstanding? On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <ko...@tresata.com> wrote: > code: > val query = spark.readStream > .format("kafka") > .option("kafka.bootstrap.servers", "somenode:9092") > .option("subscribe", "wikipedia") > .load > .select(col("value") cast StringType) > .writeStream > .format("console") > .outputMode(OutputMode.Append) > .start() > > while (true) { > Thread.sleep(10000) > println(query.lastProgress) > } > } > > On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <alons...@gmail.com> > wrote: > >> lets see the code... >> >> Alonso Isidoro Roman >> [image: https://]about.me/alonso.isidoro.roman >> >> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >> >> 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tresata.com>: >> >>> my little program prints out query.lastProgress every 10 seconds, and >>> this is what it shows: >>> >>> { >>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>> "name" : "wiki", >>> "timestamp" : "2017-01-26T22:54:45.732Z", >>> "numInputRows" : 0, >>> "inputRowsPerSecond" : 0.0, >>> "processedRowsPerSecond" : 0.0, >>> "durationMs" : { >>> "getOffset" : 9, >>> "triggerExecution" : 10 >>> }, >>> "stateOperators" : [ ], >>> "sources" : [ { >>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>> "startOffset" : { >>> "wikipedia" : { >>> "2" : 0, >>> "4" : 0, >>> "1" : 0, >>> "3" : 0, >>> "0" : 0 >>> } >>> }, >>> "endOffset" : { >>> "wikipedia" : { >>> "2" : 0, >>> "4" : 0, >>> "1" : 0, >>> "3" : 0, >>> "0" : 0 >>> } >>> }, >>> "numInputRows" : 0, >>> "inputRowsPerSecond" : 0.0, >>> "processedRowsPerSecond" : 0.0 >>> } ], >>> "sink" : { >>> "description" : "org.apache.spark.sql.executio >>> n.streaming.ConsoleSink@4818d2d9" >>> } >>> } >>> { >>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>> "name" : "wiki", >>> "timestamp" : "2017-01-26T22:54:55.745Z", >>> "numInputRows" : 0, >>> "inputRowsPerSecond" : 0.0, >>> "processedRowsPerSecond" : 0.0, >>> "durationMs" : { >>> "getOffset" : 5, >>> "triggerExecution" : 5 >>> }, >>> "stateOperators" : [ ], >>> "sources" : [ { >>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>> "startOffset" : { >>> "wikipedia" : { >>> "2" : 0, >>> "4" : 0, >>> "1" : 0, >>> "3" : 0, >>> "0" : 0 >>> } >>> }, >>> "endOffset" : { >>> "wikipedia" : { >>> "2" : 0, >>> "4" : 0, >>> "1" : 0, >>> "3" : 0, >>> "0" : 0 >>> } >>> }, >>> "numInputRows" : 0, >>> "inputRowsPerSecond" : 0.0, >>> "processedRowsPerSecond" : 0.0 >>> } ], >>> "sink" : { >>> "description" : "org.apache.spark.sql.executio >>> n.streaming.ConsoleSink@4818d2d9" >>> } >>> } >>> { >>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>> "name" : "wiki", >>> "timestamp" : "2017-01-26T22:55:05.748Z", >>> "numInputRows" : 0, >>> "inputRowsPerSecond" : 0.0, >>> "processedRowsPerSecond" : 0.0, >>> "durationMs" : { >>> "getOffset" : 5, >>> "triggerExecution" : 5 >>> }, >>> "stateOperators" : [ ], >>> "sources" : [ { >>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>> "startOffset" : { >>> "wikipedia" : { >>> "2" : 0, >>> "4" : 0, >>> "1" : 0, >>> "3" : 0, >>> "0" : 0 >>> } >>> }, >>> "endOffset" : { >>> "wikipedia" : { >>> "2" : 0, >>> "4" : 0, >>> "1" : 0, >>> "3" : 0, >>> "0" : 0 >>> } >>> }, >>> "numInputRows" : 0, >>> "inputRowsPerSecond" : 0.0, >>> "processedRowsPerSecond" : 0.0 >>> } ], >>> "sink" : { >>> "description" : "org.apache.spark.sql.executio >>> n.streaming.ConsoleSink@4818d2d9" >>> } >>> } >>> { >>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>> "name" : "wiki", >>> "timestamp" : "2017-01-26T22:55:15.758Z", >>> "numInputRows" : 0, >>> "inputRowsPerSecond" : 0.0, >>> "processedRowsPerSecond" : 0.0, >>> "durationMs" : { >>> "getOffset" : 4, >>> "triggerExecution" : 4 >>> }, >>> "stateOperators" : [ ], >>> "sources" : [ { >>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>> "startOffset" : { >>> "wikipedia" : { >>> "2" : 0, >>> "4" : 0, >>> "1" : 0, >>> "3" : 0, >>> "0" : 0 >>> } >>> }, >>> "endOffset" : { >>> "wikipedia" : { >>> "2" : 0, >>> "4" : 0, >>> "1" : 0, >>> "3" : 0, >>> "0" : 0 >>> } >>> }, >>> "numInputRows" : 0, >>> "inputRowsPerSecond" : 0.0, >>> "processedRowsPerSecond" : 0.0 >>> } ], >>> "sink" : { >>> "description" : "org.apache.spark.sql.executio >>> n.streaming.ConsoleSink@4818d2d9" >>> } >>> } >>> { >>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>> "name" : "wiki", >>> "timestamp" : "2017-01-26T22:55:25.760Z", >>> "numInputRows" : 0, >>> "inputRowsPerSecond" : 0.0, >>> "processedRowsPerSecond" : 0.0, >>> "durationMs" : { >>> "getOffset" : 4, >>> "triggerExecution" : 4 >>> }, >>> "stateOperators" : [ ], >>> "sources" : [ { >>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>> "startOffset" : { >>> "wikipedia" : { >>> "2" : 0, >>> "4" : 0, >>> "1" : 0, >>> "3" : 0, >>> "0" : 0 >>> } >>> }, >>> "endOffset" : { >>> "wikipedia" : { >>> "2" : 0, >>> "4" : 0, >>> "1" : 0, >>> "3" : 0, >>> "0" : 0 >>> } >>> }, >>> "numInputRows" : 0, >>> "inputRowsPerSecond" : 0.0, >>> "processedRowsPerSecond" : 0.0 >>> } ], >>> "sink" : { >>> "description" : "org.apache.spark.sql.executio >>> n.streaming.ConsoleSink@4818d2d9" >>> } >>> } >>> { >>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>> "name" : "wiki", >>> "timestamp" : "2017-01-26T22:55:35.766Z", >>> "numInputRows" : 0, >>> "inputRowsPerSecond" : 0.0, >>> "processedRowsPerSecond" : 0.0, >>> "durationMs" : { >>> "getOffset" : 4, >>> "triggerExecution" : 4 >>> }, >>> "stateOperators" : [ ], >>> "sources" : [ { >>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>> "startOffset" : { >>> "wikipedia" : { >>> "2" : 0, >>> "4" : 0, >>> "1" : 0, >>> "3" : 0, >>> "0" : 0 >>> } >>> }, >>> "endOffset" : { >>> "wikipedia" : { >>> "2" : 0, >>> "4" : 0, >>> "1" : 0, >>> "3" : 0, >>> "0" : 0 >>> } >>> }, >>> "numInputRows" : 0, >>> "inputRowsPerSecond" : 0.0, >>> "processedRowsPerSecond" : 0.0 >>> } ], >>> "sink" : { >>> "description" : "org.apache.spark.sql.executio >>> n.streaming.ConsoleSink@4818d2d9" >>> } >>> } >>> >>> >>> On Thu, Jan 26, 2017 at 10:33 PM, Koert Kuipers <ko...@tresata.com> >>> wrote: >>> >>>> hey, >>>> i am just getting started with kafka + spark structured streaming. so >>>> this is probably a pretty dumb mistake. >>>> >>>> i wrote a little program in spark to read messages from a kafka topic >>>> and display them in the console, using the kafka source and console sink. i >>>> run it it in spark local mode. >>>> >>>> i hooked it up to a test topic that i send messages to using the kafka >>>> console producer, and everything works great. i type a message in the >>>> console producer, and it pops up in my spark program. very neat! >>>> >>>> next i point it to another topic instead on which a kafka-connect >>>> program is writing lots of irc messages. i can see kafka connect to the >>>> topic successfully, the partitions are discovered etc., and then... >>>> nothing. it just keeps stuck at offsets 0 for all partitions. at the same >>>> time in another terminal i can see messages coming in just fine using the >>>> kafka console consumer. >>>> >>>> i dont get it. why doesnt kafka want to consume from this topic in >>>> spark structured streaming? >>>> >>>> thanks! koert >>>> >>>> >>>> >>> >> >