Hello, Spark Users. We have an application using Spark 2.3.0 and the 0.8 Kafka client. We're have a Spark streaming job, and we're reading a reasonable amount of data from Kafka (40 GB / minute or so). We would like to move to using the Kafka 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having to modify formats.
We've run into https://issues.apache.org/jira/browse/SPARK-19185, 'ConcurrentModificationExceptions with CachedKafkaConsumers'. I've tried to work around it as follows: 1. Disabled consumer caching. This increased the total job time from ~1 minute per batch to ~1.8 minutes per batch. This performance penalty is unacceptable for our use-case. We also saw some partitions stop receiving for an extended period of time - I was unable to get a simple repro for this effect though. 2. Disabled speculation and multiple-job concurrency and added caching for the stream directly after reading from Kafka & caching offsets. This approach seems to work well for simple examples (read from a Kafka topic, write to another topic). However, when we move through more complex logic we continue to see this type of error - despite only creating the stream for a given topic a single time. We validated that we're creating the stream from a given topic / partition a single time by logging on stream creation, caching the stream and (eventually) calling 'runJob' to actually go and fetch the data. Nonetheless with multiple outputs we see the ConcurrentModificationException. I've included some code down below. I would be happy if anyone had debugging tips for the workaround. However, my main concern is to ensure that the 2.4 version will have a bug fix that will work for Spark Streaming in which multiple input topics map data to multiple outputs. I would also like to understand if the fix (https://github.com/apache/spark/pull/20997) will be backported to Spark 2.3.x In our code, read looks like the following: case class StreamLookupKey(topic: Set[String], brokers: String) private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map() // Given inputs return a direct stream. def createDirectStream(ssc: StreamingContext, additionalKafkaParameters: Map[String, String], brokersToUse: Array[String], // broker1,broker2|broker3,broker4 topicsToUse: Array[String], applicationName: String, persist: Option[PersistenceManager], useOldestOffsets: Boolean, maxRatePerPartition: Long, batchSeconds: Int ): DStream[DecodedData] = { val streams: Array[DStream[DecodedData]] = brokersToUse.map(brokers => { val groupId = s"${applicationName}~${topicsToUse.mkString("~")}" val kafkaParameters: Map[String, String] = getKafkaParameters(brokers, useOldestOffsets, groupId) ++ additionalKafkaParameters logger.info(s"Kafka Params: ${kafkaParameters}") val topics = topicsToUse.toSet logger.info(s"Creating Kafka direct connection - ${kafkaParameters.mkString(GeneralConstants.comma)} " + s"topics: ${topics.mkString(GeneralConstants.comma)} w/ applicationGroup: ${groupId}") streamMap.getOrElse(StreamLookupKey(topics, brokers), createKafkaStream(ssc, applicationName, topics, brokers, maxRatePerPartition, batchSeconds, kafkaParameters)) }) ssc.union(streams) } private def createKafkaStream(ssc: StreamingContext, applicationName: String, topics: Set[String], brokers: String, maxRatePerPartition: Long, batchSeconds: Int, kafkaParameters: Map[String,String]): DStream[DecodedData] = { logger.info(s"Creating a stream from Kafka for application ${applicationName} w/ topic ${topics} and " + s"brokers: ${brokers.split(',').head} with parameters: ${kafkaParameters.mkString("|")}") try { val consumerStrategy = ConsumerStrategies.Subscribe[String, DecodedData](topics.toSeq, kafkaParameters) val stream: InputDStream[ConsumerRecord[String, DecodedData]] = KafkaUtils.createDirectStream(ssc, locationStrategy = LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy) KafkaStreamFactory.writeStreamOffsets(applicationName, brokers, stream, maxRatePerPartition, batchSeconds) val result = stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel) streamMap += StreamLookupKey(topics, brokers) -> result result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator: Iterator[_]) => {})) result } catch ErrorHandling.safelyCatch { case e: Exception => logger.error("Unable to create direct stream:") e.printStackTrace() throw KafkaDirectStreamException(topics.toArray, brokers, e) } } def getKafkaParameters(brokers: String, useOldestOffsets: Boolean, applicationName: String): Map[String, String] = Map[String, String]( "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else "latest"), "enable.auto.commit" -> false.toString, // we'll commit these manually "key.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName, "value.deserializer" -> classOf[Decoders.MixedDecoder].getCanonicalName, "partition.assignment.strategy" -> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName, "bootstrap.servers" -> brokers, "group.id" -> applicationName, "session.timeout.ms" -> 240000.toString, "request.timeout.ms"-> 300000.toString ) Write code looks like the following: def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String], conv: (T) => Array[Byte], numPartitions: Int): Unit = { val rddToWrite = if (numPartitions > 0) { rdd.repartition(numPartitions) } else { rdd } // Get session from current threads session val session = SparkSession.builder().getOrCreate() val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))), StructType(Array(StructField("value", BinaryType)))) df.selectExpr("CAST('' AS STRING)", "value") .write .format("kafka") .option("kafka.bootstrap.servers", getBrokersToUse(brokers)) .option("compression.type", "gzip") .option("retries", "3") .option("topic", topic) .save() } Regards, Bryan Jeffrey