there was also already an existing spark ticket for this: SPARK-18779 <https://issues.apache.org/jira/browse/SPARK-18779>
On Sat, Jan 28, 2017 at 1:13 PM, Koert Kuipers <ko...@tresata.com> wrote: > it seems the bug is: > https://issues.apache.org/jira/browse/KAFKA-4547 > > i would advise everyone not to use kafka-clients 0.10.0.2, 0.10.1.0 or > 0.10.1.1 > > On Fri, Jan 27, 2017 at 3:56 PM, Koert Kuipers <ko...@tresata.com> wrote: > >> in case anyone else runs into this: >> >> the issue is that i was using kafka-clients 0.10.1.1 >> >> it works when i use kafka-clients 0.10.0.1 with spark structured streaming >> >> my kafka server is 0.10.1.1 >> >> On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers <ko...@tresata.com> wrote: >> >>> i checked my topic. it has 5 partitions but all the data is written to a >>> single partition: wikipedia-2 >>> i turned on debug logging and i see this: >>> >>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to >>> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2, >>> wikipedia-1]. Seeking to the end. >>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >>> partition wikipedia-0 >>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >>> partition wikipedia-4 >>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >>> partition wikipedia-3 >>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >>> partition wikipedia-2 >>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >>> partition wikipedia-1 >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>> partition wikipedia-0 to latest offset. >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>> offset=0} for partition wikipedia-0 >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>> partition wikipedia-0 to earliest offset. >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>> offset=0} for partition wikipedia-0 >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>> partition wikipedia-4 to latest offset. >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>> offset=0} for partition wikipedia-4 >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>> partition wikipedia-4 to earliest offset. >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>> offset=0} for partition wikipedia-4 >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>> partition wikipedia-3 to latest offset. >>> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received >>> successful heartbeat response for group spark-kafka-source-fac4f749-fd >>> 56-4a32-82c7-e687aadf520b-1923704552-driver-0 >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>> offset=0} for partition wikipedia-3 >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>> partition wikipedia-3 to earliest offset. >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>> offset=0} for partition wikipedia-3 >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>> partition wikipedia-2 to latest offset. >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>> offset=152908} for partition wikipedia-2 >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>> partition wikipedia-2 to earliest offset. >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>> offset=0} for partition wikipedia-2 >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>> partition wikipedia-1 to latest offset. >>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>> offset=0} for partition wikipedia-1 >>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for >>> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0, >>> wikipedia-3 -> 0, wikipedia-0 -> 0) >>> >>> what is confusing to me is this: >>> Resetting offset for partition wikipedia-2 to latest offset. >>> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2 >>> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> >>> 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0) >>> >>> why does it find latest offset 152908 for wikipedia-2 but then sets >>> latest offset to 0 for that partition? or am i misunderstanding? >>> >>> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <ko...@tresata.com> >>> wrote: >>> >>>> code: >>>> val query = spark.readStream >>>> .format("kafka") >>>> .option("kafka.bootstrap.servers", "somenode:9092") >>>> .option("subscribe", "wikipedia") >>>> .load >>>> .select(col("value") cast StringType) >>>> .writeStream >>>> .format("console") >>>> .outputMode(OutputMode.Append) >>>> .start() >>>> >>>> while (true) { >>>> Thread.sleep(10000) >>>> println(query.lastProgress) >>>> } >>>> } >>>> >>>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman < >>>> alons...@gmail.com> wrote: >>>> >>>>> lets see the code... >>>>> >>>>> Alonso Isidoro Roman >>>>> [image: https://]about.me/alonso.isidoro.roman >>>>> >>>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >>>>> >>>>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tresata.com>: >>>>> >>>>>> my little program prints out query.lastProgress every 10 seconds, and >>>>>> this is what it shows: >>>>>> >>>>>> { >>>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>>> "name" : "wiki", >>>>>> "timestamp" : "2017-01-26T22:54:45.732Z", >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0, >>>>>> "durationMs" : { >>>>>> "getOffset" : 9, >>>>>> "triggerExecution" : 10 >>>>>> }, >>>>>> "stateOperators" : [ ], >>>>>> "sources" : [ { >>>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>>> "startOffset" : { >>>>>> "wikipedia" : { >>>>>> "2" : 0, >>>>>> "4" : 0, >>>>>> "1" : 0, >>>>>> "3" : 0, >>>>>> "0" : 0 >>>>>> } >>>>>> }, >>>>>> "endOffset" : { >>>>>> "wikipedia" : { >>>>>> "2" : 0, >>>>>> "4" : 0, >>>>>> "1" : 0, >>>>>> "3" : 0, >>>>>> "0" : 0 >>>>>> } >>>>>> }, >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0 >>>>>> } ], >>>>>> "sink" : { >>>>>> "description" : "org.apache.spark.sql.executio >>>>>> n.streaming.ConsoleSink@4818d2d9" >>>>>> } >>>>>> } >>>>>> { >>>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>>> "name" : "wiki", >>>>>> "timestamp" : "2017-01-26T22:54:55.745Z", >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0, >>>>>> "durationMs" : { >>>>>> "getOffset" : 5, >>>>>> "triggerExecution" : 5 >>>>>> }, >>>>>> "stateOperators" : [ ], >>>>>> "sources" : [ { >>>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>>> "startOffset" : { >>>>>> "wikipedia" : { >>>>>> "2" : 0, >>>>>> "4" : 0, >>>>>> "1" : 0, >>>>>> "3" : 0, >>>>>> "0" : 0 >>>>>> } >>>>>> }, >>>>>> "endOffset" : { >>>>>> "wikipedia" : { >>>>>> "2" : 0, >>>>>> "4" : 0, >>>>>> "1" : 0, >>>>>> "3" : 0, >>>>>> "0" : 0 >>>>>> } >>>>>> }, >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0 >>>>>> } ], >>>>>> "sink" : { >>>>>> "description" : "org.apache.spark.sql.executio >>>>>> n.streaming.ConsoleSink@4818d2d9" >>>>>> } >>>>>> } >>>>>> { >>>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>>> "name" : "wiki", >>>>>> "timestamp" : "2017-01-26T22:55:05.748Z", >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0, >>>>>> "durationMs" : { >>>>>> "getOffset" : 5, >>>>>> "triggerExecution" : 5 >>>>>> }, >>>>>> "stateOperators" : [ ], >>>>>> "sources" : [ { >>>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>>> "startOffset" : { >>>>>> "wikipedia" : { >>>>>> "2" : 0, >>>>>> "4" : 0, >>>>>> "1" : 0, >>>>>> "3" : 0, >>>>>> "0" : 0 >>>>>> } >>>>>> }, >>>>>> "endOffset" : { >>>>>> "wikipedia" : { >>>>>> "2" : 0, >>>>>> "4" : 0, >>>>>> "1" : 0, >>>>>> "3" : 0, >>>>>> "0" : 0 >>>>>> } >>>>>> }, >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0 >>>>>> } ], >>>>>> "sink" : { >>>>>> "description" : "org.apache.spark.sql.executio >>>>>> n.streaming.ConsoleSink@4818d2d9" >>>>>> } >>>>>> } >>>>>> { >>>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>>> "name" : "wiki", >>>>>> "timestamp" : "2017-01-26T22:55:15.758Z", >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0, >>>>>> "durationMs" : { >>>>>> "getOffset" : 4, >>>>>> "triggerExecution" : 4 >>>>>> }, >>>>>> "stateOperators" : [ ], >>>>>> "sources" : [ { >>>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>>> "startOffset" : { >>>>>> "wikipedia" : { >>>>>> "2" : 0, >>>>>> "4" : 0, >>>>>> "1" : 0, >>>>>> "3" : 0, >>>>>> "0" : 0 >>>>>> } >>>>>> }, >>>>>> "endOffset" : { >>>>>> "wikipedia" : { >>>>>> "2" : 0, >>>>>> "4" : 0, >>>>>> "1" : 0, >>>>>> "3" : 0, >>>>>> "0" : 0 >>>>>> } >>>>>> }, >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0 >>>>>> } ], >>>>>> "sink" : { >>>>>> "description" : "org.apache.spark.sql.executio >>>>>> n.streaming.ConsoleSink@4818d2d9" >>>>>> } >>>>>> } >>>>>> { >>>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>>> "name" : "wiki", >>>>>> "timestamp" : "2017-01-26T22:55:25.760Z", >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0, >>>>>> "durationMs" : { >>>>>> "getOffset" : 4, >>>>>> "triggerExecution" : 4 >>>>>> }, >>>>>> "stateOperators" : [ ], >>>>>> "sources" : [ { >>>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>>> "startOffset" : { >>>>>> "wikipedia" : { >>>>>> "2" : 0, >>>>>> "4" : 0, >>>>>> "1" : 0, >>>>>> "3" : 0, >>>>>> "0" : 0 >>>>>> } >>>>>> }, >>>>>> "endOffset" : { >>>>>> "wikipedia" : { >>>>>> "2" : 0, >>>>>> "4" : 0, >>>>>> "1" : 0, >>>>>> "3" : 0, >>>>>> "0" : 0 >>>>>> } >>>>>> }, >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0 >>>>>> } ], >>>>>> "sink" : { >>>>>> "description" : "org.apache.spark.sql.executio >>>>>> n.streaming.ConsoleSink@4818d2d9" >>>>>> } >>>>>> } >>>>>> { >>>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>>> "name" : "wiki", >>>>>> "timestamp" : "2017-01-26T22:55:35.766Z", >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0, >>>>>> "durationMs" : { >>>>>> "getOffset" : 4, >>>>>> "triggerExecution" : 4 >>>>>> }, >>>>>> "stateOperators" : [ ], >>>>>> "sources" : [ { >>>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>>> "startOffset" : { >>>>>> "wikipedia" : { >>>>>> "2" : 0, >>>>>> "4" : 0, >>>>>> "1" : 0, >>>>>> "3" : 0, >>>>>> "0" : 0 >>>>>> } >>>>>> }, >>>>>> "endOffset" : { >>>>>> "wikipedia" : { >>>>>> "2" : 0, >>>>>> "4" : 0, >>>>>> "1" : 0, >>>>>> "3" : 0, >>>>>> "0" : 0 >>>>>> } >>>>>> }, >>>>>> "numInputRows" : 0, >>>>>> "inputRowsPerSecond" : 0.0, >>>>>> "processedRowsPerSecond" : 0.0 >>>>>> } ], >>>>>> "sink" : { >>>>>> "description" : "org.apache.spark.sql.executio >>>>>> n.streaming.ConsoleSink@4818d2d9" >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> On Thu, Jan 26, 2017 at 10:33 PM, Koert Kuipers <ko...@tresata.com> >>>>>> wrote: >>>>>> >>>>>>> hey, >>>>>>> i am just getting started with kafka + spark structured streaming. >>>>>>> so this is probably a pretty dumb mistake. >>>>>>> >>>>>>> i wrote a little program in spark to read messages from a kafka >>>>>>> topic and display them in the console, using the kafka source and >>>>>>> console >>>>>>> sink. i run it it in spark local mode. >>>>>>> >>>>>>> i hooked it up to a test topic that i send messages to using the >>>>>>> kafka console producer, and everything works great. i type a message in >>>>>>> the >>>>>>> console producer, and it pops up in my spark program. very neat! >>>>>>> >>>>>>> next i point it to another topic instead on which a kafka-connect >>>>>>> program is writing lots of irc messages. i can see kafka connect to the >>>>>>> topic successfully, the partitions are discovered etc., and then... >>>>>>> nothing. it just keeps stuck at offsets 0 for all partitions. at the >>>>>>> same >>>>>>> time in another terminal i can see messages coming in just fine using >>>>>>> the >>>>>>> kafka console consumer. >>>>>>> >>>>>>> i dont get it. why doesnt kafka want to consume from this topic in >>>>>>> spark structured streaming? >>>>>>> >>>>>>> thanks! koert >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >