Cody, We are connecting to multiple clusters for each topic. I did experiment this morning with both adding a cluster identifier to the group id, as well as simply moving to use only a single one of our clusters. Neither of these were successful. I am not able to run a test against master now.
Regards, Bryan Jeffrey On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger <c...@koeninger.org> wrote: > I doubt that fix will get backported to 2.3.x > > Are you able to test against master? 2.4 with the fix you linked to > is likely to hit code freeze soon. > > From a quick look at your code, I'm not sure why you're mapping over > an array of brokers. It seems like that would result in different > streams with the same group id, because broker isn't part of your > group id string. > > On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> > wrote: > > Hello, Spark Users. > > > > We have an application using Spark 2.3.0 and the 0.8 Kafka client. We're > > have a Spark streaming job, and we're reading a reasonable amount of data > > from Kafka (40 GB / minute or so). We would like to move to using the > Kafka > > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having > to > > modify formats. > > > > We've run into https://issues.apache.org/jira/browse/SPARK-19185, > > 'ConcurrentModificationExceptions with CachedKafkaConsumers'. I've > tried to > > work around it as follows: > > > > 1. Disabled consumer caching. This increased the total job time from ~1 > > minute per batch to ~1.8 minutes per batch. This performance penalty is > > unacceptable for our use-case. We also saw some partitions stop receiving > > for an extended period of time - I was unable to get a simple repro for > this > > effect though. > > 2. Disabled speculation and multiple-job concurrency and added caching > for > > the stream directly after reading from Kafka & caching offsets. This > > approach seems to work well for simple examples (read from a Kafka topic, > > write to another topic). However, when we move through more complex > logic we > > continue to see this type of error - despite only creating the stream > for a > > given topic a single time. We validated that we're creating the stream > from > > a given topic / partition a single time by logging on stream creation, > > caching the stream and (eventually) calling 'runJob' to actually go and > > fetch the data. Nonetheless with multiple outputs we see the > > ConcurrentModificationException. > > > > I've included some code down below. I would be happy if anyone had > > debugging tips for the workaround. However, my main concern is to ensure > > that the 2.4 version will have a bug fix that will work for Spark > Streaming > > in which multiple input topics map data to multiple outputs. I would also > > like to understand if the fix ( > https://github.com/apache/spark/pull/20997) > > will be backported to Spark 2.3.x > > > > In our code, read looks like the following: > > > > case class StreamLookupKey(topic: Set[String], brokers: String) > > > > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map() > > > > // Given inputs return a direct stream. > > def createDirectStream(ssc: StreamingContext, > > additionalKafkaParameters: Map[String, String], > > brokersToUse: Array[String], // > > broker1,broker2|broker3,broker4 > > topicsToUse: Array[String], > > applicationName: String, > > persist: Option[PersistenceManager], > > useOldestOffsets: Boolean, > > maxRatePerPartition: Long, > > batchSeconds: Int > > ): DStream[DecodedData] = { > > val streams: Array[DStream[DecodedData]] = > > brokersToUse.map(brokers => { > > val groupId = s"${applicationName}~${topicsToUse.mkString("~")}" > > val kafkaParameters: Map[String, String] = > getKafkaParameters(brokers, > > useOldestOffsets, groupId) ++ additionalKafkaParameters > > logger.info(s"Kafka Params: ${kafkaParameters}") > > val topics = topicsToUse.toSet > > logger.info(s"Creating Kafka direct connection - > > ${kafkaParameters.mkString(GeneralConstants.comma)} " + > > s"topics: ${topics.mkString(GeneralConstants.comma)} w/ > > applicationGroup: ${groupId}") > > > > streamMap.getOrElse(StreamLookupKey(topics, brokers), > > createKafkaStream(ssc, applicationName, topics, brokers, > > maxRatePerPartition, batchSeconds, kafkaParameters)) > > }) > > > > ssc.union(streams) > > } > > > > private def createKafkaStream(ssc: StreamingContext, applicationName: > > String, topics: Set[String], brokers: String, > > maxRatePerPartition: Long, batchSeconds: > Int, > > kafkaParameters: Map[String,String]): DStream[DecodedData] = { > > logger.info(s"Creating a stream from Kafka for application > > ${applicationName} w/ topic ${topics} and " + > > s"brokers: ${brokers.split(',').head} with parameters: > > ${kafkaParameters.mkString("|")}") > > try { > > val consumerStrategy = ConsumerStrategies.Subscribe[String, > > DecodedData](topics.toSeq, kafkaParameters) > > val stream: InputDStream[ConsumerRecord[String, DecodedData]] = > > KafkaUtils.createDirectStream(ssc, locationStrategy = > > LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy) > > > > KafkaStreamFactory.writeStreamOffsets(applicationName, brokers, > stream, > > maxRatePerPartition, batchSeconds) > > val result = > > > stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel) > > streamMap += StreamLookupKey(topics, brokers) -> result > > result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator: > Iterator[_]) > > => {})) > > result > > } catch ErrorHandling.safelyCatch { > > case e: Exception => > > logger.error("Unable to create direct stream:") > > e.printStackTrace() > > throw KafkaDirectStreamException(topics.toArray, brokers, e) > > } > > } > > > > def getKafkaParameters(brokers: String, useOldestOffsets: Boolean, > > applicationName: String): Map[String, String] = > > Map[String, String]( > > "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else > "latest"), > > "enable.auto.commit" -> false.toString, // we'll commit these > manually > > "key.deserializer" -> > > > classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName, > > "value.deserializer" -> > classOf[Decoders.MixedDecoder].getCanonicalName, > > "partition.assignment.strategy" -> > > > classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName, > > "bootstrap.servers" -> brokers, > > "group.id" -> applicationName, > > "session.timeout.ms" -> 240000.toString, > > "request.timeout.ms"-> 300000.toString > > ) > > > > Write code looks like the following: > > > > def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String], conv: > > (T) => Array[Byte], numPartitions: Int): Unit = { > > val rddToWrite = > > if (numPartitions > 0) { > > rdd.repartition(numPartitions) > > } else { > > rdd > > } > > > > // Get session from current threads session > > val session = SparkSession.builder().getOrCreate() > > val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))), > > StructType(Array(StructField("value", BinaryType)))) > > df.selectExpr("CAST('' AS STRING)", "value") > > .write > > .format("kafka") > > .option("kafka.bootstrap.servers", getBrokersToUse(brokers)) > > .option("compression.type", "gzip") > > .option("retries", "3") > > .option("topic", topic) > > .save() > > } > > > > Regards, > > > > Bryan Jeffrey >