Re: uncontinuous offset in kafka will cause the spark streamingfailure
When you say the patch is not suitable, can you clarify why? Probably best to get the various findings centralized on https://issues.apache.org/jira/browse/SPARK-17147 Happy to help with getting the patch up to date and working. On Wed, Jan 24, 2018 at 1:19 AM, namesuperwood wrote: > It seems this patch is not suitable for our problem。 > > https://github.com/apache/spark/compare/master...koeninger:SPARK-17147 > > wood.super > > 原始邮件 > 发件人: namesuperwood > 收件人: Justin Miller > 抄送: user; Cody Koeninger > 发送时间: 2018年1月24日(周三) 14:45 > 主题: Re: uncontinuous offset in kafka will cause the spark streamingfailure > > Yes. My spark streaming application works with uncompacted topic. I will > check the patch. > > > wood.super > > 原始邮件 > 发件人: Justin Miller > 收件人: namesuperwood > 抄送: user; Cody Koeninger > 发送时间: 2018年1月24日(周三) 14:23 > 主题: Re: uncontinuous offset in kafka will cause the spark streamingfailure > > We appear to be kindred spirits, I’ve recently run into the same issue. Are > you running compacted topics? I’ve run into this issue on non-compacted > topics as well, it happens rarely but is still a pain. You might check out > this patch and related spark streaming Kafka ticket: > > https://github.com/apache/spark/compare/master...koeninger:SPARK-17147 > https://issues.apache.org/jira/browse/SPARK-17147 > > I’ll be testing out the patch on somewhat large scale stream processor > tomorrow. > > CCing: Cody Koeninger > > Best, > Justin > > On Jan 23, 2018, at 10:48 PM, namesuperwood wrote: > > Hi all > > kafka version : kafka_2.11-0.11.0.2 >spark version : 2.0.1 > > A topic-partition "adn-tracking,15" in kafka who's earliest offset is > 1255644602 and latest offset is 1271253441. > > While starting a spark streaming to process the data from the topic , we > got a exception with "Got wrong record even after seeking to offset > 1266921577”. [ (earliest offset) 1255644602 < 1266921577 < > 1271253441 ( latest offset ) ] > > Finally, I found the following source code in class CachedKafkaCounsumer > from spark-streaming. This is obviously due to the fact that the offset from > consumer poll and the offset which the comsuner seek is not equal. > > > Here is the “ CachedKafkaCounsumer.scala” code: > > def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { > > logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested > $offset") if (offset != nextOffset) { > > logInfo(s"Initial fetch for $groupId $topic $partition $offset") > seek(offset) poll(timeout) } > > if (!buffer.hasNext()) { poll(timeout) } > assert(buffer.hasNext(), > s"Failed to get records for $groupId $topic $partition $offset after > polling for $timeout") > var record = buffer.next() > > if (record.offset != offset) { > logInfo(s"Buffer miss for $groupId $topic $partition $offset") > seek(offset) > poll(timeout) > assert(buffer.hasNext(), > s"Failed to get records for $groupId $topic $partition $offset after > polling for $timeout") > record = buffer.next() > assert(record.offset == offset, > s"Got wrong record for $groupId $topic $partition even after seeking to > offset $offset") > } > > nextOffset = offset + 1 > record > > } > > I reproduce this problem, and found out that offset from one > topicAndPartition is uncontinuous in Kafka。I think this is a bug that needs > to be repaired. > > I implemented a simple project to use consumer to seek offset 1266921577. > But it return the offset 1266921578. Then while seek to 1266921576, it > return the 1266921576 exactly。 > > > > > > There is the code: > > public class consumerDemo { > > public static void main(String[] argv){ > > Properties props = new Properties(); > props.put("bootstrap.servers", "172.31.29.31:9091"); > props.put("group.id", "consumer-tutorial-demo"); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("value.deserializer", StringDeserializer.class.getName()); > KafkaConsumer consumer = new KafkaConsumer String>(props); > TopicPartition tp = new TopicPartition("adn-tracking-click", 15); > Collection collection = new ArrayList(); > collection.add(tp); consumer.assign(collection); > consumer.seek(tp, 1266921576); ConsumerRecords > consumerRecords = consumer.poll(1); > List> listR = consumerRecords.records(tp); > Iterator > iter = listR.iterator(); > ConsumerRecord record = iter.next(); > System.out.println(" the next record " + record.offset() + " recode topic " > + record.topic()); >} > > } > > > > > > > > > > > wood.super > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: uncontinuous offset in kafka will cause the spark streamingfailure
It seems this patch is not suitable for our problem。 https://github.com/apache/spark/compare/master...koeninger:SPARK-17147 wood.super 原始邮件 发件人:namesuperwoodnamesuperw...@gmail.com 收件人:Justin millerjustin.mil...@protectwise.com 抄送:useru...@spark.apache.org; Cody koeningerc...@koeninger.org 发送时间:2018年1月24日(周三) 14:45 主题:Re: uncontinuous offset in kafka will cause the spark streamingfailure Yes. My spark streaming application works with uncompacted topic. I will check the patch. wood.super 原始邮件 发件人:Justin millerjustin.mil...@protectwise.com 收件人:namesuperwoodnamesuperw...@gmail.com 抄送:useru...@spark.apache.org; Cody koeningerc...@koeninger.org 发送时间:2018年1月24日(周三) 14:23 主题:Re: uncontinuous offset in kafka will cause the spark streamingfailure We appear to be kindred spirits, I’ve recently run into the same issue. Are you running compacted topics? I’ve run into this issue on non-compacted topics as well, it happens rarely but is still a pain. You might check out this patch and related spark streaming Kafka ticket: https://github.com/apache/spark/compare/master...koeninger:SPARK-17147 https://issues.apache.org/jira/browse/SPARK-17147 I’ll be testing out the patch on somewhat large scale stream processor tomorrow. CCing: Cody Koeninger Best, Justin On Jan 23, 2018, at 10:48 PM, namesuperwood namesuperw...@gmail.com wrote: Hi all kafka version : kafka_2.11-0.11.0.2 spark version : 2.0.1 A topic-partition "adn-tracking,15" in kafka who's earliest offset is1255644602 andlatest offset is1271253441. While starting a spark streaming to process the data from the topic , we got a exception with "Got wrong record even afterseeking to offset 1266921577”. [(earliest offset) 1255644602 1266921577 1271253441 ( latest offset ) ] Finally, Ifound the following source code in class CachedKafkaCounsumer from spark-streaming. This is obviously due to the fact that the offset from consumer poll and the offset which the comsuner seek is not equal. Here is the “ CachedKafkaCounsumer.scala” code: def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") if (offset != nextOffset) { logInfo(s"Initial fetch for $groupId $topic $partition $offset") seek(offset) poll(timeout) } if (!buffer.hasNext()) { poll(timeout) } assert(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") var record = buffer.next() if (record.offset != offset) { logInfo(s"Buffer miss for $groupId $topic $partition $offset") seek(offset) poll(timeout) assert(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() assert(record.offset == offset, s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") } nextOffset = offset + 1 record } I reproduce this problem, and found out that offset from one topicAndPartition is uncontinuous in Kafka。I think this is a bug that needs to be repaired. Iimplemented a simple project to use consumer to seek offset 1266921577. But it return the offset1266921578. Then while seek to1266921576, it return the1266921576exactly。 There is the code: public class consumerDemo { public static void main(String[] argv){ Properties props = new Properties(); props.put("bootstrap.servers", "172.31.29.31:9091"); props.put("group.id", "consumer-tutorial-demo"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumerString, String consumer = new KafkaConsumerString, String(props); TopicPartition tp = new TopicPartition("adn-tracking-click", 15); CollectionTopicPartition collection = new ArrayListTopicPartition(); collection.add(tp); consumer.assign(collection); consumer.seek(tp, 1266921576); ConsumerRecordsString, String consumerRecords = consumer.poll(1); ListConsumerRecordString, String listR = consumerRecords.records(tp); IteratorConsumerRecordString, String iter = listR.iterator(); ConsumerRecordString, String record = iter.next(); System.out.println(" the next record " + record.offset() + " recode topic " + record.topic()); } } wood.super
Re: uncontinuous offset in kafka will cause the spark streamingfailure
Yes. My spark streaming application works with uncompacted topic. I will check the patch. wood.super 原始邮件 发件人:Justin millerjustin.mil...@protectwise.com 收件人:namesuperwoodnamesuperw...@gmail.com 抄送:useru...@spark.apache.org; Cody koeningerc...@koeninger.org 发送时间:2018年1月24日(周三) 14:23 主题:Re: uncontinuous offset in kafka will cause the spark streamingfailure We appear to be kindred spirits, I’ve recently run into the same issue. Are you running compacted topics? I’ve run into this issue on non-compacted topics as well, it happens rarely but is still a pain. You might check out this patch and related spark streaming Kafka ticket: https://github.com/apache/spark/compare/master...koeninger:SPARK-17147 https://issues.apache.org/jira/browse/SPARK-17147 I’ll be testing out the patch on somewhat large scale stream processor tomorrow. CCing: Cody Koeninger Best, Justin On Jan 23, 2018, at 10:48 PM, namesuperwood namesuperw...@gmail.com wrote: Hi all kafka version : kafka_2.11-0.11.0.2 spark version : 2.0.1 A topic-partition "adn-tracking,15" in kafka who's earliest offset is1255644602 andlatest offset is1271253441. While starting a spark streaming to process the data from the topic , we got a exception with "Got wrong record even afterseeking to offset 1266921577”. [(earliest offset) 1255644602 1266921577 1271253441 ( latest offset ) ] Finally, Ifound the following source code in class CachedKafkaCounsumer from spark-streaming. This is obviously due to the fact that the offset from consumer poll and the offset which the comsuner seek is not equal. Here is the “ CachedKafkaCounsumer.scala” code: def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") if (offset != nextOffset) { logInfo(s"Initial fetch for $groupId $topic $partition $offset") seek(offset) poll(timeout) } if (!buffer.hasNext()) { poll(timeout) } assert(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") var record = buffer.next() if (record.offset != offset) { logInfo(s"Buffer miss for $groupId $topic $partition $offset") seek(offset) poll(timeout) assert(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() assert(record.offset == offset, s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") } nextOffset = offset + 1 record } I reproduce this problem, and found out that offset from one topicAndPartition is uncontinuous in Kafka。I think this is a bug that needs to be repaired. Iimplemented a simple project to use consumer to seek offset 1266921577. But it return the offset1266921578. Then while seek to1266921576, it return the1266921576exactly。 There is the code: public class consumerDemo { public static void main(String[] argv){ Properties props = new Properties(); props.put("bootstrap.servers", "172.31.29.31:9091"); props.put("group.id", "consumer-tutorial-demo"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumerString, String consumer = new KafkaConsumerString, String(props); TopicPartition tp = new TopicPartition("adn-tracking-click", 15); CollectionTopicPartition collection = new ArrayListTopicPartition(); collection.add(tp); consumer.assign(collection); consumer.seek(tp, 1266921576); ConsumerRecordsString, String consumerRecords = consumer.poll(1); ListConsumerRecordString, String listR = consumerRecords.records(tp); IteratorConsumerRecordString, String iter = listR.iterator(); ConsumerRecordString, String record = iter.next(); System.out.println(" the next record " + record.offset() + " recode topic " + record.topic()); } } wood.super