Thanks for reporting this. Which Spark version are you using? Could you provide the full log, please?
On Fri, Jan 27, 2017 at 10:24 AM, Koert Kuipers <ko...@tresata.com> wrote: > i checked my topic. it has 5 partitions but all the data is written to a > single partition: wikipedia-2 > i turned on debug logging and i see this: > > 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to > consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2, > wikipedia-1]. Seeking to the end. > 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of > partition wikipedia-0 > 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of > partition wikipedia-4 > 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of > partition wikipedia-3 > 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of > partition wikipedia-2 > 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of > partition wikipedia-1 > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for > partition wikipedia-0 to latest offset. > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, > offset=0} for partition wikipedia-0 > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for > partition wikipedia-0 to earliest offset. > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, > offset=0} for partition wikipedia-0 > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for > partition wikipedia-4 to latest offset. > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, > offset=0} for partition wikipedia-4 > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for > partition wikipedia-4 to earliest offset. > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, > offset=0} for partition wikipedia-4 > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for > partition wikipedia-3 to latest offset. > 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received > successful heartbeat response for group spark-kafka-source-fac4f749- > fd56-4a32-82c7-e687aadf520b-1923704552-driver-0 > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, > offset=0} for partition wikipedia-3 > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for > partition wikipedia-3 to earliest offset. > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, > offset=0} for partition wikipedia-3 > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for > partition wikipedia-2 to latest offset. > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, > offset=152908} for partition wikipedia-2 > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for > partition wikipedia-2 to earliest offset. > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, > offset=0} for partition wikipedia-2 > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for > partition wikipedia-1 to latest offset. > 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, > offset=0} for partition wikipedia-1 > 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for > partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0, > wikipedia-3 -> 0, wikipedia-0 -> 0) > > what is confusing to me is this: > Resetting offset for partition wikipedia-2 to latest offset. > Fetched {timestamp=-1, offset=152908} for partition wikipedia-2 > Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, > wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0) > > why does it find latest offset 152908 for wikipedia-2 but then sets latest > offset to 0 for that partition? or am i misunderstanding? > > On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <ko...@tresata.com> wrote: > >> code: >> val query = spark.readStream >> .format("kafka") >> .option("kafka.bootstrap.servers", "somenode:9092") >> .option("subscribe", "wikipedia") >> .load >> .select(col("value") cast StringType) >> .writeStream >> .format("console") >> .outputMode(OutputMode.Append) >> .start() >> >> while (true) { >> Thread.sleep(10000) >> println(query.lastProgress) >> } >> } >> >> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <alons...@gmail.com >> > wrote: >> >>> lets see the code... >>> >>> Alonso Isidoro Roman >>> [image: https://]about.me/alonso.isidoro.roman >>> >>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >>> >>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tresata.com>: >>> >>>> my little program prints out query.lastProgress every 10 seconds, and >>>> this is what it shows: >>>> >>>> { >>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>> "name" : "wiki", >>>> "timestamp" : "2017-01-26T22:54:45.732Z", >>>> "numInputRows" : 0, >>>> "inputRowsPerSecond" : 0.0, >>>> "processedRowsPerSecond" : 0.0, >>>> "durationMs" : { >>>> "getOffset" : 9, >>>> "triggerExecution" : 10 >>>> }, >>>> "stateOperators" : [ ], >>>> "sources" : [ { >>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>> "startOffset" : { >>>> "wikipedia" : { >>>> "2" : 0, >>>> "4" : 0, >>>> "1" : 0, >>>> "3" : 0, >>>> "0" : 0 >>>> } >>>> }, >>>> "endOffset" : { >>>> "wikipedia" : { >>>> "2" : 0, >>>> "4" : 0, >>>> "1" : 0, >>>> "3" : 0, >>>> "0" : 0 >>>> } >>>> }, >>>> "numInputRows" : 0, >>>> "inputRowsPerSecond" : 0.0, >>>> "processedRowsPerSecond" : 0.0 >>>> } ], >>>> "sink" : { >>>> "description" : "org.apache.spark.sql.executio >>>> n.streaming.ConsoleSink@4818d2d9" >>>> } >>>> } >>>> { >>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>> "name" : "wiki", >>>> "timestamp" : "2017-01-26T22:54:55.745Z", >>>> "numInputRows" : 0, >>>> "inputRowsPerSecond" : 0.0, >>>> "processedRowsPerSecond" : 0.0, >>>> "durationMs" : { >>>> "getOffset" : 5, >>>> "triggerExecution" : 5 >>>> }, >>>> "stateOperators" : [ ], >>>> "sources" : [ { >>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>> "startOffset" : { >>>> "wikipedia" : { >>>> "2" : 0, >>>> "4" : 0, >>>> "1" : 0, >>>> "3" : 0, >>>> "0" : 0 >>>> } >>>> }, >>>> "endOffset" : { >>>> "wikipedia" : { >>>> "2" : 0, >>>> "4" : 0, >>>> "1" : 0, >>>> "3" : 0, >>>> "0" : 0 >>>> } >>>> }, >>>> "numInputRows" : 0, >>>> "inputRowsPerSecond" : 0.0, >>>> "processedRowsPerSecond" : 0.0 >>>> } ], >>>> "sink" : { >>>> "description" : "org.apache.spark.sql.executio >>>> n.streaming.ConsoleSink@4818d2d9" >>>> } >>>> } >>>> { >>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>> "name" : "wiki", >>>> "timestamp" : "2017-01-26T22:55:05.748Z", >>>> "numInputRows" : 0, >>>> "inputRowsPerSecond" : 0.0, >>>> "processedRowsPerSecond" : 0.0, >>>> "durationMs" : { >>>> "getOffset" : 5, >>>> "triggerExecution" : 5 >>>> }, >>>> "stateOperators" : [ ], >>>> "sources" : [ { >>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>> "startOffset" : { >>>> "wikipedia" : { >>>> "2" : 0, >>>> "4" : 0, >>>> "1" : 0, >>>> "3" : 0, >>>> "0" : 0 >>>> } >>>> }, >>>> "endOffset" : { >>>> "wikipedia" : { >>>> "2" : 0, >>>> "4" : 0, >>>> "1" : 0, >>>> "3" : 0, >>>> "0" : 0 >>>> } >>>> }, >>>> "numInputRows" : 0, >>>> "inputRowsPerSecond" : 0.0, >>>> "processedRowsPerSecond" : 0.0 >>>> } ], >>>> "sink" : { >>>> "description" : "org.apache.spark.sql.executio >>>> n.streaming.ConsoleSink@4818d2d9" >>>> } >>>> } >>>> { >>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>> "name" : "wiki", >>>> "timestamp" : "2017-01-26T22:55:15.758Z", >>>> "numInputRows" : 0, >>>> "inputRowsPerSecond" : 0.0, >>>> "processedRowsPerSecond" : 0.0, >>>> "durationMs" : { >>>> "getOffset" : 4, >>>> "triggerExecution" : 4 >>>> }, >>>> "stateOperators" : [ ], >>>> "sources" : [ { >>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>> "startOffset" : { >>>> "wikipedia" : { >>>> "2" : 0, >>>> "4" : 0, >>>> "1" : 0, >>>> "3" : 0, >>>> "0" : 0 >>>> } >>>> }, >>>> "endOffset" : { >>>> "wikipedia" : { >>>> "2" : 0, >>>> "4" : 0, >>>> "1" : 0, >>>> "3" : 0, >>>> "0" : 0 >>>> } >>>> }, >>>> "numInputRows" : 0, >>>> "inputRowsPerSecond" : 0.0, >>>> "processedRowsPerSecond" : 0.0 >>>> } ], >>>> "sink" : { >>>> "description" : "org.apache.spark.sql.executio >>>> n.streaming.ConsoleSink@4818d2d9" >>>> } >>>> } >>>> { >>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>> "name" : "wiki", >>>> "timestamp" : "2017-01-26T22:55:25.760Z", >>>> "numInputRows" : 0, >>>> "inputRowsPerSecond" : 0.0, >>>> "processedRowsPerSecond" : 0.0, >>>> "durationMs" : { >>>> "getOffset" : 4, >>>> "triggerExecution" : 4 >>>> }, >>>> "stateOperators" : [ ], >>>> "sources" : [ { >>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>> "startOffset" : { >>>> "wikipedia" : { >>>> "2" : 0, >>>> "4" : 0, >>>> "1" : 0, >>>> "3" : 0, >>>> "0" : 0 >>>> } >>>> }, >>>> "endOffset" : { >>>> "wikipedia" : { >>>> "2" : 0, >>>> "4" : 0, >>>> "1" : 0, >>>> "3" : 0, >>>> "0" : 0 >>>> } >>>> }, >>>> "numInputRows" : 0, >>>> "inputRowsPerSecond" : 0.0, >>>> "processedRowsPerSecond" : 0.0 >>>> } ], >>>> "sink" : { >>>> "description" : "org.apache.spark.sql.executio >>>> n.streaming.ConsoleSink@4818d2d9" >>>> } >>>> } >>>> { >>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>> "name" : "wiki", >>>> "timestamp" : "2017-01-26T22:55:35.766Z", >>>> "numInputRows" : 0, >>>> "inputRowsPerSecond" : 0.0, >>>> "processedRowsPerSecond" : 0.0, >>>> "durationMs" : { >>>> "getOffset" : 4, >>>> "triggerExecution" : 4 >>>> }, >>>> "stateOperators" : [ ], >>>> "sources" : [ { >>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>> "startOffset" : { >>>> "wikipedia" : { >>>> "2" : 0, >>>> "4" : 0, >>>> "1" : 0, >>>> "3" : 0, >>>> "0" : 0 >>>> } >>>> }, >>>> "endOffset" : { >>>> "wikipedia" : { >>>> "2" : 0, >>>> "4" : 0, >>>> "1" : 0, >>>> "3" : 0, >>>> "0" : 0 >>>> } >>>> }, >>>> "numInputRows" : 0, >>>> "inputRowsPerSecond" : 0.0, >>>> "processedRowsPerSecond" : 0.0 >>>> } ], >>>> "sink" : { >>>> "description" : "org.apache.spark.sql.executio >>>> n.streaming.ConsoleSink@4818d2d9" >>>> } >>>> } >>>> >>>> >>>> On Thu, Jan 26, 2017 at 10:33 PM, Koert Kuipers <ko...@tresata.com> >>>> wrote: >>>> >>>>> hey, >>>>> i am just getting started with kafka + spark structured streaming. so >>>>> this is probably a pretty dumb mistake. >>>>> >>>>> i wrote a little program in spark to read messages from a kafka topic >>>>> and display them in the console, using the kafka source and console sink. >>>>> i >>>>> run it it in spark local mode. >>>>> >>>>> i hooked it up to a test topic that i send messages to using the kafka >>>>> console producer, and everything works great. i type a message in the >>>>> console producer, and it pops up in my spark program. very neat! >>>>> >>>>> next i point it to another topic instead on which a kafka-connect >>>>> program is writing lots of irc messages. i can see kafka connect to the >>>>> topic successfully, the partitions are discovered etc., and then... >>>>> nothing. it just keeps stuck at offsets 0 for all partitions. at the same >>>>> time in another terminal i can see messages coming in just fine using the >>>>> kafka console consumer. >>>>> >>>>> i dont get it. why doesnt kafka want to consume from this topic in >>>>> spark structured streaming? >>>>> >>>>> thanks! koert >>>>> >>>>> >>>>> >>>> >>> >> >