I recommend you to use Structured Streaming as it has a patch that can workaround this issue: https://issues.apache.org/jira/browse/SPARK-26267
Best Regards, Ryan On Tue, Apr 30, 2019 at 3:34 PM Shixiong(Ryan) Zhu <shixi...@databricks.com> wrote: > There is a known issue that Kafka may return a wrong offset even if there > is no reset happening: https://issues.apache.org/jira/browse/KAFKA-7703 > > Best Regards, > Ryan > > > On Tue, Apr 30, 2019 at 10:41 AM Austin Weaver <aus...@flyrlabs.com> > wrote: > >> @deng - There was a short erroneous period where 2 streams were reading >> from the same topic and group id were running at the same time. We saw >> errors in this and stopped the extra stream. That being said, I would think >> regardless that the auto.offset.reset would kick in sine documentation says >> that it will kick in if there is no existing current offset or that the >> current offset no longer exists on the kafka topic? Moreover, that doesn't >> explain the fact that the spark logs that it is on one offset for that >> partition (5553330) - and then immediately errors out trying to read the >> old offset (4544296) that no longer exists? >> >> @Akshay - I am using Spark Streaming (D-streams) Here is a snippet of the >> kafka consumer configuration I am using (redacted some fields) - >> >> kakaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ""); >> kakaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, ""); >> kakaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); >> kakaConsumerProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, >> RoundRobinAssignor.class.getName()); >> kakaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> StringDeserializer.class); >> kakaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> StringDeserializer.class); >> kakaConsumerProperties.put("auto.offset.reset", "earliest"); >> kakaConsumerProperties.put("sasl.mechanism", "PLAIN"); >> kakaConsumerProperties.put("sasl.jaas.config", "security.protocol"); >> kakaConsumerProperties.put("security.protocol", ""); >> >> and I'm using LocationStrategies.PreferConsistent() >> >> Thanks >> >> On Tue, Apr 30, 2019 at 5:56 AM Akshay Bhardwaj < >> akshay.bhardwaj1...@gmail.com> wrote: >> >>> Hi Austin, >>> >>> Are you using Spark Streaming or Structured Streaming? >>> >>> For better understanding, could you also provide sample code/config >>> params for your spark-kafka connector for the said streaming job? >>> >>> >>> Akshay Bhardwaj >>> +91-97111-33849 >>> >>> >>> On Mon, Apr 29, 2019 at 10:34 PM Austin Weaver <aus...@flyrlabs.com> >>> wrote: >>> >>>> Hey guys, relatively new Spark Dev here and i'm seeing some kafka >>>> offset issues and was wondering if you guys could help me out. >>>> >>>> I am currently running a spark job on Dataproc and am getting errors >>>> trying to re-join a group and read data from a kafka topic. I have done >>>> some digging and am not sure what the issue is. I have >>>> auto.offset.reset set to earliest so it should being reading from the >>>> earliest available non-committed offset and initially my spark logs look >>>> like this : >>>> >>>> 19/04/29 16:30:30 INFO >>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer >>>> clientId=consumer-1, groupId=demo-group] Resetting offset for >>>> partition demo.topic-11 to offset 5553330. >>>> 19/04/29 16:30:30 INFO >>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer >>>> clientId=consumer-1, groupId=demo-group] Resetting offset for >>>> partition demo.topic-2 to offset 5555553. >>>> 19/04/29 16:30:30 INFO >>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer >>>> clientId=consumer-1, groupId=demo-group] Resetting offset for >>>> partition demo.topic-3 to offset 5555484. >>>> 19/04/29 16:30:30 INFO >>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer >>>> clientId=consumer-1, groupId=demo-group] Resetting offset for >>>> partition demo.topic-4 to offset 5555586. >>>> 19/04/29 16:30:30 INFO >>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer >>>> clientId=consumer-1, groupId=demo-group] Resetting offset for >>>> partition demo.topic-5 to offset 5555502. >>>> 19/04/29 16:30:30 INFO >>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer >>>> clientId=consumer-1, groupId=demo-group] Resetting offset for >>>> partition demo.topic-6 to offset 5555561. >>>> 19/04/29 16:30:30 INFO >>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer >>>> clientId=consumer-1, groupId=demo-group] Resetting offset for >>>> partition demo.topic-7 to offset 5555542.``` >>>> >>>> But then the very next line I get an error trying to read from a >>>> nonexistent offset on the server (you can see that the offset for the >>>> partition differs from the one listed above, so I have no idea why it would >>>> be attempting to read form that offset, here is the error on the next line: >>>> >>>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets >>>> out of range with no configured reset policy for partitions: >>>> {demo.topic-11=4544296} >>>> >>>> Any ideas to why my spark job is constantly going back to this offset >>>> (4544296), and not the one it outputs originally (5553330)? >>>> >>>> It seems to be contradicting itself w a) the actual offset it says its >>>> on and the one it attempts to read and b) saying no configured reset policy >>>> -- >>>> Austin Weaver >>>> Software Engineer >>>> FLYR, Inc. www.flyrlabs.com >>>> >>> >> >> -- >> Austin Weaver >> Software Engineer >> FLYR, Inc. www.flyrlabs.com >> >