Cody,

We are connecting to multiple clusters for each topic.  I did experiment
this morning with both adding a cluster identifier to the group id, as well
as simply moving to use only a single one of our clusters.  Neither of
these were successful.  I am not able to run a test against master now.

Regards,

Bryan Jeffrey




On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger <c...@koeninger.org> wrote:

> I doubt that fix will get backported to 2.3.x
>
> Are you able to test against master?  2.4 with the fix you linked to
> is likely to hit code freeze soon.
>
> From a quick look at your code, I'm not sure why you're mapping over
> an array of brokers.  It seems like that would result in different
> streams with the same group id, because broker isn't part of your
> group id string.
>
> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
> > Hello, Spark Users.
> >
> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.  We're
> > have a Spark streaming job, and we're reading a reasonable amount of data
> > from Kafka (40 GB / minute or so).  We would like to move to using the
> Kafka
> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having
> to
> > modify formats.
> >
> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
> tried to
> > work around it as follows:
> >
> > 1. Disabled consumer caching.  This increased the total job time from ~1
> > minute per batch to ~1.8 minutes per batch.  This performance penalty is
> > unacceptable for our use-case. We also saw some partitions stop receiving
> > for an extended period of time - I was unable to get a simple repro for
> this
> > effect though.
> > 2. Disabled speculation and multiple-job concurrency and added caching
> for
> > the stream directly after reading from Kafka & caching offsets.  This
> > approach seems to work well for simple examples (read from a Kafka topic,
> > write to another topic). However, when we move through more complex
> logic we
> > continue to see this type of error - despite only creating the stream
> for a
> > given topic a single time.  We validated that we're creating the stream
> from
> > a given topic / partition a single time by logging on stream creation,
> > caching the stream and (eventually) calling 'runJob' to actually go and
> > fetch the data. Nonetheless with multiple outputs we see the
> > ConcurrentModificationException.
> >
> > I've included some code down below.  I would be happy if anyone had
> > debugging tips for the workaround.  However, my main concern is to ensure
> > that the 2.4 version will have a bug fix that will work for Spark
> Streaming
> > in which multiple input topics map data to multiple outputs. I would also
> > like to understand if the fix (
> https://github.com/apache/spark/pull/20997)
> > will be backported to Spark 2.3.x
> >
> > In our code, read looks like the following:
> >
> > case class StreamLookupKey(topic: Set[String], brokers: String)
> >
> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map()
> >
> > // Given inputs return a direct stream.
> > def createDirectStream(ssc: StreamingContext,
> >                        additionalKafkaParameters: Map[String, String],
> >                        brokersToUse: Array[String], //
> > broker1,broker2|broker3,broker4
> >                        topicsToUse: Array[String],
> >                        applicationName: String,
> >                        persist: Option[PersistenceManager],
> >                        useOldestOffsets: Boolean,
> >                        maxRatePerPartition: Long,
> >                        batchSeconds: Int
> >                       ): DStream[DecodedData] = {
> >   val streams: Array[DStream[DecodedData]] =
> >     brokersToUse.map(brokers => {
> >       val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
> >       val kafkaParameters: Map[String, String] =
> getKafkaParameters(brokers,
> > useOldestOffsets, groupId) ++ additionalKafkaParameters
> >       logger.info(s"Kafka Params: ${kafkaParameters}")
> >       val topics = topicsToUse.toSet
> >       logger.info(s"Creating Kafka direct connection -
> > ${kafkaParameters.mkString(GeneralConstants.comma)} " +
> >         s"topics: ${topics.mkString(GeneralConstants.comma)} w/
> > applicationGroup: ${groupId}")
> >
> >       streamMap.getOrElse(StreamLookupKey(topics, brokers),
> > createKafkaStream(ssc, applicationName, topics, brokers,
> > maxRatePerPartition, batchSeconds, kafkaParameters))
> >     })
> >
> >   ssc.union(streams)
> > }
> >
> > private def createKafkaStream(ssc: StreamingContext, applicationName:
> > String, topics: Set[String], brokers: String,
> >                               maxRatePerPartition: Long, batchSeconds:
> Int,
> > kafkaParameters: Map[String,String]): DStream[DecodedData] = {
> >   logger.info(s"Creating a stream from Kafka for application
> > ${applicationName} w/ topic ${topics} and " +
> >     s"brokers: ${brokers.split(',').head} with parameters:
> > ${kafkaParameters.mkString("|")}")
> >   try {
> >     val consumerStrategy = ConsumerStrategies.Subscribe[String,
> > DecodedData](topics.toSeq, kafkaParameters)
> >     val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
> >       KafkaUtils.createDirectStream(ssc, locationStrategy =
> > LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)
> >
> >     KafkaStreamFactory.writeStreamOffsets(applicationName, brokers,
> stream,
> > maxRatePerPartition, batchSeconds)
> >     val result =
> >
> stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel)
> >     streamMap += StreamLookupKey(topics, brokers) -> result
> >     result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator:
> Iterator[_])
> > => {}))
> >     result
> >   } catch ErrorHandling.safelyCatch {
> >     case e: Exception =>
> >       logger.error("Unable to create direct stream:")
> >       e.printStackTrace()
> >       throw KafkaDirectStreamException(topics.toArray, brokers, e)
> >   }
> > }
> >
> > def getKafkaParameters(brokers: String, useOldestOffsets: Boolean,
> > applicationName: String): Map[String, String] =
> >   Map[String, String](
> >     "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else
> "latest"),
> >     "enable.auto.commit" -> false.toString, // we'll commit these
> manually
> >     "key.deserializer" ->
> >
> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
> >     "value.deserializer" ->
> classOf[Decoders.MixedDecoder].getCanonicalName,
> >     "partition.assignment.strategy" ->
> >
> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
> >     "bootstrap.servers" -> brokers,
> >     "group.id" -> applicationName,
> >     "session.timeout.ms" -> 240000.toString,
> >     "request.timeout.ms"-> 300000.toString
> >   )
> >
> > Write code looks like the following:
> >
> > def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String], conv:
> > (T) => Array[Byte], numPartitions: Int): Unit = {
> >   val rddToWrite =
> >     if (numPartitions > 0) {
> >       rdd.repartition(numPartitions)
> >     } else {
> >       rdd
> >     }
> >
> >   // Get session from current threads session
> >   val session = SparkSession.builder().getOrCreate()
> >   val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))),
> > StructType(Array(StructField("value", BinaryType))))
> >   df.selectExpr("CAST('' AS STRING)", "value")
> >     .write
> >     .format("kafka")
> >     .option("kafka.bootstrap.servers", getBrokersToUse(brokers))
> >     .option("compression.type", "gzip")
> >     .option("retries", "3")
> >     .option("topic", topic)
> >     .save()
> > }
> >
> > Regards,
> >
> > Bryan Jeffrey
>

Reply via email to