i checked my topic. it has 5 partitions but all the data is written to a
single partition: wikipedia-2
i turned on debug logging and i see this:

2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
wikipedia-1]. Seeking to the end.
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-0
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-4
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-3
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-2
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-1
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-0 to latest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-0
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-0 to earliest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-0
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-4 to latest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-4
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-4 to earliest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-4
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-3 to latest offset.
2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
successful heartbeat response for group
spark-kafka-source-fac4f749-fd56-4a32-82c7-e687aadf520b-1923704552-driver-0
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-3
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-3 to earliest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-3
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-2 to latest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=152908} for partition wikipedia-2
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-2 to earliest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-2
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-1 to latest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-1
2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
wikipedia-3 -> 0, wikipedia-0 -> 0)

what is confusing to me is this:
Resetting offset for partition wikipedia-2 to latest offset.
Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0,
wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)

why does it find latest offset 152908 for wikipedia-2 but then sets latest
offset to 0 for that partition? or am i misunderstanding?

On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <ko...@tresata.com> wrote:

> code:
>       val query = spark.readStream
>         .format("kafka")
>         .option("kafka.bootstrap.servers", "somenode:9092")
>         .option("subscribe", "wikipedia")
>         .load
>         .select(col("value") cast StringType)
>         .writeStream
>         .format("console")
>         .outputMode(OutputMode.Append)
>         .start()
>
>       while (true) {
>         Thread.sleep(10000)
>         println(query.lastProgress)
>       }
>     }
>
> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <alons...@gmail.com>
> wrote:
>
>> lets see the code...
>>
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>
>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tresata.com>:
>>
>>> my little program prints out query.lastProgress every 10 seconds, and
>>> this is what it shows:
>>>
>>> {
>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>   "name" : "wiki",
>>>   "timestamp" : "2017-01-26T22:54:45.732Z",
>>>   "numInputRows" : 0,
>>>   "inputRowsPerSecond" : 0.0,
>>>   "processedRowsPerSecond" : 0.0,
>>>   "durationMs" : {
>>>     "getOffset" : 9,
>>>     "triggerExecution" : 10
>>>   },
>>>   "stateOperators" : [ ],
>>>   "sources" : [ {
>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>     "startOffset" : {
>>>       "wikipedia" : {
>>>         "2" : 0,
>>>         "4" : 0,
>>>         "1" : 0,
>>>         "3" : 0,
>>>         "0" : 0
>>>       }
>>>     },
>>>     "endOffset" : {
>>>       "wikipedia" : {
>>>         "2" : 0,
>>>         "4" : 0,
>>>         "1" : 0,
>>>         "3" : 0,
>>>         "0" : 0
>>>       }
>>>     },
>>>     "numInputRows" : 0,
>>>     "inputRowsPerSecond" : 0.0,
>>>     "processedRowsPerSecond" : 0.0
>>>   } ],
>>>   "sink" : {
>>>     "description" : "org.apache.spark.sql.executio
>>> n.streaming.ConsoleSink@4818d2d9"
>>>   }
>>> }
>>> {
>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>   "name" : "wiki",
>>>   "timestamp" : "2017-01-26T22:54:55.745Z",
>>>   "numInputRows" : 0,
>>>   "inputRowsPerSecond" : 0.0,
>>>   "processedRowsPerSecond" : 0.0,
>>>   "durationMs" : {
>>>     "getOffset" : 5,
>>>     "triggerExecution" : 5
>>>   },
>>>   "stateOperators" : [ ],
>>>   "sources" : [ {
>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>     "startOffset" : {
>>>       "wikipedia" : {
>>>         "2" : 0,
>>>         "4" : 0,
>>>         "1" : 0,
>>>         "3" : 0,
>>>         "0" : 0
>>>       }
>>>     },
>>>     "endOffset" : {
>>>       "wikipedia" : {
>>>         "2" : 0,
>>>         "4" : 0,
>>>         "1" : 0,
>>>         "3" : 0,
>>>         "0" : 0
>>>       }
>>>     },
>>>     "numInputRows" : 0,
>>>     "inputRowsPerSecond" : 0.0,
>>>     "processedRowsPerSecond" : 0.0
>>>   } ],
>>>   "sink" : {
>>>     "description" : "org.apache.spark.sql.executio
>>> n.streaming.ConsoleSink@4818d2d9"
>>>   }
>>> }
>>> {
>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>   "name" : "wiki",
>>>   "timestamp" : "2017-01-26T22:55:05.748Z",
>>>   "numInputRows" : 0,
>>>   "inputRowsPerSecond" : 0.0,
>>>   "processedRowsPerSecond" : 0.0,
>>>   "durationMs" : {
>>>     "getOffset" : 5,
>>>     "triggerExecution" : 5
>>>   },
>>>   "stateOperators" : [ ],
>>>   "sources" : [ {
>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>     "startOffset" : {
>>>       "wikipedia" : {
>>>         "2" : 0,
>>>         "4" : 0,
>>>         "1" : 0,
>>>         "3" : 0,
>>>         "0" : 0
>>>       }
>>>     },
>>>     "endOffset" : {
>>>       "wikipedia" : {
>>>         "2" : 0,
>>>         "4" : 0,
>>>         "1" : 0,
>>>         "3" : 0,
>>>         "0" : 0
>>>       }
>>>     },
>>>     "numInputRows" : 0,
>>>     "inputRowsPerSecond" : 0.0,
>>>     "processedRowsPerSecond" : 0.0
>>>   } ],
>>>   "sink" : {
>>>     "description" : "org.apache.spark.sql.executio
>>> n.streaming.ConsoleSink@4818d2d9"
>>>   }
>>> }
>>> {
>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>   "name" : "wiki",
>>>   "timestamp" : "2017-01-26T22:55:15.758Z",
>>>   "numInputRows" : 0,
>>>   "inputRowsPerSecond" : 0.0,
>>>   "processedRowsPerSecond" : 0.0,
>>>   "durationMs" : {
>>>     "getOffset" : 4,
>>>     "triggerExecution" : 4
>>>   },
>>>   "stateOperators" : [ ],
>>>   "sources" : [ {
>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>     "startOffset" : {
>>>       "wikipedia" : {
>>>         "2" : 0,
>>>         "4" : 0,
>>>         "1" : 0,
>>>         "3" : 0,
>>>         "0" : 0
>>>       }
>>>     },
>>>     "endOffset" : {
>>>       "wikipedia" : {
>>>         "2" : 0,
>>>         "4" : 0,
>>>         "1" : 0,
>>>         "3" : 0,
>>>         "0" : 0
>>>       }
>>>     },
>>>     "numInputRows" : 0,
>>>     "inputRowsPerSecond" : 0.0,
>>>     "processedRowsPerSecond" : 0.0
>>>   } ],
>>>   "sink" : {
>>>     "description" : "org.apache.spark.sql.executio
>>> n.streaming.ConsoleSink@4818d2d9"
>>>   }
>>> }
>>> {
>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>   "name" : "wiki",
>>>   "timestamp" : "2017-01-26T22:55:25.760Z",
>>>   "numInputRows" : 0,
>>>   "inputRowsPerSecond" : 0.0,
>>>   "processedRowsPerSecond" : 0.0,
>>>   "durationMs" : {
>>>     "getOffset" : 4,
>>>     "triggerExecution" : 4
>>>   },
>>>   "stateOperators" : [ ],
>>>   "sources" : [ {
>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>     "startOffset" : {
>>>       "wikipedia" : {
>>>         "2" : 0,
>>>         "4" : 0,
>>>         "1" : 0,
>>>         "3" : 0,
>>>         "0" : 0
>>>       }
>>>     },
>>>     "endOffset" : {
>>>       "wikipedia" : {
>>>         "2" : 0,
>>>         "4" : 0,
>>>         "1" : 0,
>>>         "3" : 0,
>>>         "0" : 0
>>>       }
>>>     },
>>>     "numInputRows" : 0,
>>>     "inputRowsPerSecond" : 0.0,
>>>     "processedRowsPerSecond" : 0.0
>>>   } ],
>>>   "sink" : {
>>>     "description" : "org.apache.spark.sql.executio
>>> n.streaming.ConsoleSink@4818d2d9"
>>>   }
>>> }
>>> {
>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>   "name" : "wiki",
>>>   "timestamp" : "2017-01-26T22:55:35.766Z",
>>>   "numInputRows" : 0,
>>>   "inputRowsPerSecond" : 0.0,
>>>   "processedRowsPerSecond" : 0.0,
>>>   "durationMs" : {
>>>     "getOffset" : 4,
>>>     "triggerExecution" : 4
>>>   },
>>>   "stateOperators" : [ ],
>>>   "sources" : [ {
>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>     "startOffset" : {
>>>       "wikipedia" : {
>>>         "2" : 0,
>>>         "4" : 0,
>>>         "1" : 0,
>>>         "3" : 0,
>>>         "0" : 0
>>>       }
>>>     },
>>>     "endOffset" : {
>>>       "wikipedia" : {
>>>         "2" : 0,
>>>         "4" : 0,
>>>         "1" : 0,
>>>         "3" : 0,
>>>         "0" : 0
>>>       }
>>>     },
>>>     "numInputRows" : 0,
>>>     "inputRowsPerSecond" : 0.0,
>>>     "processedRowsPerSecond" : 0.0
>>>   } ],
>>>   "sink" : {
>>>     "description" : "org.apache.spark.sql.executio
>>> n.streaming.ConsoleSink@4818d2d9"
>>>   }
>>> }
>>>
>>>
>>> On Thu, Jan 26, 2017 at 10:33 PM, Koert Kuipers <ko...@tresata.com>
>>> wrote:
>>>
>>>> hey,
>>>> i am just getting started with kafka + spark structured streaming. so
>>>> this is probably a pretty dumb mistake.
>>>>
>>>> i wrote a little program in spark to read messages from a kafka topic
>>>> and display them in the console, using the kafka source and console sink. i
>>>> run it it in spark local mode.
>>>>
>>>> i hooked it up to a test topic that i send messages to using the kafka
>>>> console producer, and everything works great. i type a message in the
>>>> console producer, and it pops up in my spark program. very neat!
>>>>
>>>> next i point it to another topic instead on which a kafka-connect
>>>> program is writing lots of irc messages. i can see kafka connect to the
>>>> topic successfully, the partitions are discovered etc., and then...
>>>> nothing. it just keeps stuck at offsets 0 for all partitions. at the same
>>>> time in another terminal i can see messages coming in just fine using the
>>>> kafka console consumer.
>>>>
>>>> i dont get it. why doesnt kafka want to consume from this topic in
>>>> spark structured streaming?
>>>>
>>>> thanks! koert
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to