Yeah, the KafkaRDD cannot be reused. It's better to document it. On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <i...@vadio.com> wrote:
> Ok, I have split he KafkaRDD logic to each use their own group and bumped > the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms > ends up with a timeout and exception so I am still perplexed on that one. > The new error I am getting now is a `ConcurrentModificationException` > when Spark is trying to remove the CachedKafkaConsumer. > > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1431) > at org.apache.kafka.clients.consumer.KafkaConsumer.close( > KafkaConsumer.java:1361) > at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ > anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) > at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299) > > Here is the basic logic: > > *Using KafkaRDD* - This takes a list of channels and processes them in > parallel using the KafkaRDD directly. They each use a distinct consumer > group (s"$prefix-$topic"), and each has it's own topic and each topic has > 4 partitions. We routinely get timeout errors when polling for data when > the poll.ms is less then 2 seconds. This occurs whether we process in > parallel. > > *Example usage with KafkaRDD:* > val channels = Seq("channel1", "channel2") > > channels.toParArray.foreach { channel => > val consumer = new KafkaConsumer[K, V](kafkaParams.asJava) > > // Get offsets for the given topic and the consumer group "$prefix-$ > topic" > val offsetRanges = getOffsets(s"$prefix-$topic", channel) > > val ds = KafkaUtils.createRDD[K, V](context, > kafkaParams asJava, > offsetRanges, > PreferConsistent).toDS[V] > > // Do some aggregations > ds.agg(...) > // Save the data > ds.write.mode(SaveMode.Append).parquet(somePath) > // Save offsets using a KafkaConsumer > consumer.commitSync(newOffsets.asJava) > consumer.close() > } > > I am not sure why the concurrent issue is there as I have tried to debug > and also looked at the KafkaConsumer code as well, but everything looks > like it should not occur. The things to figure out is why when running in > parallel does this occur and also why the timeouts still occur. > > Thanks, > > Ivan > > On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> There definitely is Kafka documentation indicating that you should use >> a different consumer group for logically different subscribers, this >> is really basic to Kafka: >> >> http://kafka.apache.org/documentation#intro_consumers >> >> As for your comment that "commit async after each RDD, which is not >> really viable also", how is it not viable? Again, committing offsets >> to Kafka doesn't give you reliable delivery semantics unless your >> downstream data store is idempotent. If your downstream data store is >> idempotent, then it shouldn't matter to you when offset commits >> happen, as long as they happen within a reasonable time after the data >> is written. >> >> Do you want to keep arguing with me, or follow my advice and proceed >> with debugging any remaining issues after you make the changes I >> suggested? >> >> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <i...@vadio.com> wrote: >> > With our stream version, we update the offsets for only the partition we >> > operating on. We even break down the partition into smaller batches and >> then >> > update the offsets after each batch within the partition. With Spark >> 1.6 and >> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not >> > necessarily a Spark issue since Kafka no longer allows you to simply >> update >> > the offsets for a given consumer group. You have to subscribe or assign >> > partitions to even do so. >> > >> > As for storing the offsets in some other place like a DB, it don't find >> this >> > useful because you then can't use tools like Kafka Manager. In order to >> do >> > so you would have to store in a DB and the circle back and update Kafka >> > afterwards. This means you have to keep two sources in sync which is not >> > really a good idea. >> > >> > It is a challenge in Spark to use the Kafka offsets since the drive >> keeps >> > subscribed to the topic(s) and consumer group, while the executors >> prepend >> > "spark-executor-" to the consumer group. The stream (driver) does allow >> you >> > to commit async after each RDD, which is not really viable also. I have >> not >> > of implementing an Akka actor system on the driver and send it messages >> from >> > the executor code to update the offsets, but then that is asynchronous >> as >> > well so not really a good solution. >> > >> > I have no idea why Kafka made this change and also why in the parallel >> > KafkaRDD application we would be advised to use different consumer >> groups >> > for each RDD. That seems strange to me that different consumer groups >> would >> > be required or advised. There is no Kafka documentation that I know if >> that >> > states this. The biggest issue I see with the parallel KafkaRDD is the >> > timeouts. I have tried to set poll.ms to 30 seconds and still get the >> issue. >> > Something is not right here and just not seem right. As I mentioned >> with the >> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw this >> > issue. We have been running the same basic logic for over a year now >> without >> > one hitch at all. >> > >> > Ivan >> > >> > >> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >> >> >> Someone can correct me, but I'm pretty sure Spark dstreams (in >> >> general, not just kafka) have been progressing on to the next batch >> >> after a given batch aborts for quite some time now. Yet another >> >> reason I put offsets in my database transactionally. My jobs throw >> >> exceptions if the offset in the DB isn't what I expected it to be. >> >> >> >> >> >> >> >> >> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <grap...@graphex.com> >> wrote: >> >> > I've been encountering the same kinds of timeout issues as Ivan, >> using >> >> > the "Kafka Stream" approach that he is using, except I'm storing my >> offsets >> >> > manually from the driver to Zookeeper in the Kafka 8 format. I >> haven't yet >> >> > implemented the KafkaRDD approach, and therefore don't have the >> concurrency >> >> > issues, but a very similar use case is coming up for me soon, it's >> just been >> >> > backburnered until I can get streaming to be more reliable (I will >> >> > definitely ensure unique group IDs when I do). Offset commits are >> certainly >> >> > more painful in Kafka 0.10, and that doesn't have anything to do >> with Spark. >> >> > >> >> > While i may be able to alleviate the timeout by just increasing it, >> I've >> >> > noticed something else that is more worrying: When one task fails 4 >> times in >> >> > a row (i.e. "Failed to get records for _ after polling for _"), >> Spark aborts >> >> > the Stage and Job with "Job aborted due to stage failure: Task _ in >> stage _ >> >> > failed 4 times". That's fine, and it's the behavior I want, but >> instead of >> >> > stopping the Application there (as previous versions of Spark did) >> the next >> >> > microbatch marches on and offsets are committed ahead of the failed >> >> > microbatch. Suddenly my at-least-once app becomes more >> >> > sometimes-at-least-once which is no good. In order for spark to >> display that >> >> > failure, I must be propagating the errors up to Spark, but the >> behavior of >> >> > marching forward with the next microbatch seems to be new, and a big >> >> > potential for data loss in streaming applications. >> >> > >> >> > Am I perhaps missing a setting to stop the entire streaming >> application >> >> > once spark.task.maxFailures is reached? Has anyone else seen this >> behavior >> >> > of a streaming application skipping over failed microbatches? >> >> > >> >> > Thanks, >> >> > Sean >> >> > >> >> > >> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >> >> >> >> >> So basically what I am saying is >> >> >> >> >> >> - increase poll.ms >> >> >> - use a separate group id everywhere >> >> >> - stop committing offsets under the covers >> >> >> >> >> >> That should eliminate all of those as possible causes, and then we >> can >> >> >> see if there are still issues. >> >> >> >> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or >> >> >> subscribe to a topic in order to update offsets, Kafka does. If you >> >> >> don't like the new Kafka consumer api, the existing 0.8 simple >> >> >> consumer api should be usable with later brokers. As long as you >> >> >> don't need SSL or dynamic subscriptions, and it meets your needs, >> keep >> >> >> using it. >> >> >> >> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <i...@vadio.com> >> wrote: >> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but each >> RDD >> >> >>> uses a >> >> >>> single distinct topic. For example, the group would be something >> like >> >> >>> "storage-group", and the topics would be "storage-channel1", and >> >> >>> "storage-channel2". In each thread a KafkaConsumer is started, >> >> >>> assigned the >> >> >>> partitions assigned, and then commit offsets are called after the >> RDD >> >> >>> is >> >> >>> processed. This should not interfere with the consumer group used >> by >> >> >>> the >> >> >>> executors which would be "spark-executor-storage-group". >> >> >>> >> >> >>> In the streaming example there is a single topic ("client-events") >> and >> >> >>> group >> >> >>> ("processing-group"). A single stream is created and offsets are >> >> >>> manually >> >> >>> updated from the executor after each partition is handled. This >> was a >> >> >>> challenge since Spark now requires one to assign or subscribe to a >> >> >>> topic in >> >> >>> order to even update the offsets. In 0.8.2.x you did not have to >> worry >> >> >>> about >> >> >>> that. This approach limits your exposure to duplicate data since >> >> >>> idempotent >> >> >>> records are not entirely possible in our scenario. At least >> without a >> >> >>> lot of >> >> >>> re-running of logic to de-dup. >> >> >>> >> >> >>> Thanks, >> >> >>> >> >> >>> Ivan >> >> >>> >> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <c...@koeninger.org >> > >> >> >>> wrote: >> >> >>>> >> >> >>>> So just to be clear, the answers to my questions are >> >> >>>> >> >> >>>> - you are not using different group ids, you're using the same >> group >> >> >>>> id everywhere >> >> >>>> >> >> >>>> - you are committing offsets manually >> >> >>>> >> >> >>>> Right? >> >> >>>> >> >> >>>> If you want to eliminate network or kafka misbehavior as a source, >> >> >>>> tune poll.ms upwards even higher. >> >> >>>> >> >> >>>> You must use different group ids for different rdds or streams. >> >> >>>> Kafka consumers won't behave the way you expect if they are all in >> >> >>>> the >> >> >>>> same group id, and the consumer cache is keyed by group id. Yes, >> the >> >> >>>> executor will tack "spark-executor-" on to the beginning, but if >> you >> >> >>>> give it the same base group id, it will be the same. And the >> driver >> >> >>>> will use the group id you gave it, unmodified. >> >> >>>> >> >> >>>> Finally, I really can't help you if you're manually writing your >> own >> >> >>>> code to commit offsets directly to Kafka. Trying to minimize >> >> >>>> duplicates that way doesn't really make sense, your system must be >> >> >>>> able to handle duplicates if you're using kafka as an offsets >> store, >> >> >>>> it can't do transactional exactly once. >> >> >>>> >> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <i...@vadio.com> wrote: >> >> >>>>> Here are some examples and details of the scenarios. The >> KafkaRDD is >> >> >>>>> the >> >> >>>>> most >> >> >>>>> error prone to polling >> >> >>>>> timeouts and concurrentm modification errors. >> >> >>>>> >> >> >>>>> *Using KafkaRDD* - This takes a list of channels and processes >> them >> >> >>>>> in >> >> >>>>> parallel using the KafkaRDD directly. they all use the same >> consumer >> >> >>>>> group >> >> >>>>> ('storage-group'), but each has it's own topic and each topic >> has 4 >> >> >>>>> partitions. We routinely get timeout errors when polling for >> data. >> >> >>>>> This >> >> >>>>> occurs whether we process in parallel or sequentially. >> >> >>>>> >> >> >>>>> *Spark Kafka setting:* >> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000 >> >> >>>>> >> >> >>>>> *Kafka Consumer Params:* >> >> >>>>> metric.reporters = [] >> >> >>>>> metadata.max.age.ms = 300000 >> >> >>>>> partition.assignment.strategy = >> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor] >> >> >>>>> reconnect.backoff.ms = 50 >> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >> >> >>>>> max.partition.fetch.bytes = 1048576 >> >> >>>>> bootstrap.servers = [somemachine:31000] >> >> >>>>> ssl.keystore.type = JKS >> >> >>>>> enable.auto.commit = false >> >> >>>>> sasl.mechanism = GSSAPI >> >> >>>>> interceptor.classes = null >> >> >>>>> exclude.internal.topics = true >> >> >>>>> ssl.truststore.password = null >> >> >>>>> client.id = >> >> >>>>> ssl.endpoint.identification.algorithm = null >> >> >>>>> max.poll.records = 1000 >> >> >>>>> check.crcs = true >> >> >>>>> request.timeout.ms = 40000 >> >> >>>>> heartbeat.interval.ms = 3000 >> >> >>>>> auto.commit.interval.ms = 5000 >> >> >>>>> receive.buffer.bytes = 65536 >> >> >>>>> ssl.truststore.type = JKS >> >> >>>>> ssl.truststore.location = null >> >> >>>>> ssl.keystore.password = null >> >> >>>>> fetch.min.bytes = 1 >> >> >>>>> send.buffer.bytes = 131072 >> >> >>>>> value.deserializer = class >> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeser >> ializer >> >> >>>>> group.id = storage-group >> >> >>>>> retry.backoff.ms = 100 >> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> >> >>>>> sasl.kerberos.service.name = null >> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05 >> >> >>>>> ssl.trustmanager.algorithm = PKIX >> >> >>>>> ssl.key.password = null >> >> >>>>> fetch.max.wait.ms = 500 >> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000 >> >> >>>>> connections.max.idle.ms = 540000 >> >> >>>>> session.timeout.ms = 30000 >> >> >>>>> metrics.num.samples = 2 >> >> >>>>> key.deserializer = class >> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer >> >> >>>>> ssl.protocol = TLS >> >> >>>>> ssl.provider = null >> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> >> >>>>> ssl.keystore.location = null >> >> >>>>> ssl.cipher.suites = null >> >> >>>>> security.protocol = PLAINTEXT >> >> >>>>> ssl.keymanager.algorithm = SunX509 >> >> >>>>> metrics.sample.window.ms = 30000 >> >> >>>>> auto.offset.reset = earliest >> >> >>>>> >> >> >>>>> *Example usage with KafkaRDD:* >> >> >>>>> val channels = Seq("channel1", "channel2") >> >> >>>>> >> >> >>>>> channels.toParArray.foreach { channel => >> >> >>>>> val consumer = new KafkaConsumer[K, V](kafkaParams.asJava) >> >> >>>>> >> >> >>>>> // Get offsets for the given topic and the consumer group >> >> >>>>> 'storage-group' >> >> >>>>> val offsetRanges = getOffsets("storage-group", channel) >> >> >>>>> >> >> >>>>> val ds = KafkaUtils.createRDD[K, V](context, >> >> >>>>> kafkaParams asJava, >> >> >>>>> offsetRanges, >> >> >>>>> PreferConsistent).toDS[V] >> >> >>>>> >> >> >>>>> // Do some aggregations >> >> >>>>> ds.agg(...) >> >> >>>>> // Save the data >> >> >>>>> ds.write.mode(SaveMode.Append).parquet(somePath) >> >> >>>>> // Save offsets using a KafkaConsumer >> >> >>>>> consumer.commitSync(newOffsets.asJava) >> >> >>>>> consumer.close() >> >> >>>>> } >> >> >>>>> >> >> >>>>> >> >> >>>>> *Example usage with Kafka Stream:* >> >> >>>>> This creates a stream and processes events in each partition. At >> the >> >> >>>>> end >> >> >>>>> of >> >> >>>>> processing for >> >> >>>>> each partition, we updated the offsets for each partition. This >> is >> >> >>>>> challenging to do, but is better >> >> >>>>> then calling commitAysnc on the stream, because that occurs after >> >> >>>>> the >> >> >>>>> /entire/ RDD has been >> >> >>>>> processed. This method minimizes duplicates in an exactly once >> >> >>>>> environment. >> >> >>>>> Since the executors >> >> >>>>> use their own custom group "spark-executor-processor-group" and >> the >> >> >>>>> commit >> >> >>>>> is buried in private >> >> >>>>> functions we are unable to use the executors cached consumer to >> >> >>>>> update >> >> >>>>> the >> >> >>>>> offsets. This requires us >> >> >>>>> to go through multiple steps to update the Kafka offsets >> >> >>>>> accordingly. >> >> >>>>> >> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic") >> >> >>>>> >> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context, >> >> >>>>> PreferConsistent, >> >> >>>>> Subscribe[K, V](Seq("my-topic") asJavaCollection, >> >> >>>>> kafkaParams, >> >> >>>>> offsetRanges)) >> >> >>>>> >> >> >>>>> stream.foreachRDD { rdd => >> >> >>>>> val offsetRanges = rdd.asInstanceOf[HasOffsetRang >> es].offsetRanges >> >> >>>>> >> >> >>>>> // Transform our data >> >> >>>>> rdd.foreachPartition { events => >> >> >>>>> // Establish a consumer in the executor so we can update >> >> >>>>> offsets >> >> >>>>> after each partition. >> >> >>>>> // This class is homegrown and uses the KafkaConsumer to >> help >> >> >>>>> get/set >> >> >>>>> offsets >> >> >>>>> val consumer = new ConsumerUtils(kafkaParams) >> >> >>>>> // do something with our data >> >> >>>>> >> >> >>>>> // Write the offsets that were updated in this partition >> >> >>>>> kafkaConsumer.setConsumerOffsets("processor-group", >> >> >>>>> Map(TopicAndPartition(tp.topic, tp.partition) -> >> >> >>>>> endOffset)) >> >> >>>>> } >> >> >>>>> } >> >> >>>>> >> >> >>>>> >> >> >>>>> >> >> >>>>> -- >> >> >>>>> View this message in context: >> >> >>>>> >> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instabil >> ity-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html >> >> >>>>> Sent from the Apache Spark User List mailing list archive at >> >> >>>>> Nabble.com. >> >> >>>>> >> >> >>>>> >> >> >>>>> ------------------------------------------------------------ >> --------- >> >> >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >>>>> >> >> >>> >> >> >>> >> >> >> >> >> >> ------------------------------------------------------------ >> --------- >> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >> >> >> > >> > >> > >> > >