Cody,

Yes - I was able to verify that I am not seeing duplicate calls to
createDirectStream.  If the spark-streaming-kafka-0-10 will work on a 2.3
cluster I can go ahead and give that a shot.

Regards,

Bryan Jeffrey

On Fri, Aug 31, 2018 at 11:56 AM Cody Koeninger <c...@koeninger.org> wrote:

> Just to be 100% sure, when you're logging the group id in
> createDirectStream, you no longer see any duplicates?
>
> Regarding testing master, is the blocker that your spark cluster is on
> 2.3?  There's at least a reasonable chance that building an
> application assembly jar that uses the master version just for the
> spark-streaming-kafka-0-10 artifact will still work on a 2.3 cluster
>
> On Fri, Aug 31, 2018 at 8:55 AM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
> > Cody,
> >
> > We are connecting to multiple clusters for each topic.  I did experiment
> > this morning with both adding a cluster identifier to the group id, as
> well
> > as simply moving to use only a single one of our clusters.  Neither of
> these
> > were successful.  I am not able to run a test against master now.
> >
> > Regards,
> >
> > Bryan Jeffrey
> >
> >
> >
> >
> > On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> I doubt that fix will get backported to 2.3.x
> >>
> >> Are you able to test against master?  2.4 with the fix you linked to
> >> is likely to hit code freeze soon.
> >>
> >> From a quick look at your code, I'm not sure why you're mapping over
> >> an array of brokers.  It seems like that would result in different
> >> streams with the same group id, because broker isn't part of your
> >> group id string.
> >>
> >> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <
> bryan.jeff...@gmail.com>
> >> wrote:
> >> > Hello, Spark Users.
> >> >
> >> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.
> >> > We're
> >> > have a Spark streaming job, and we're reading a reasonable amount of
> >> > data
> >> > from Kafka (40 GB / minute or so).  We would like to move to using the
> >> > Kafka
> >> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from
> having
> >> > to
> >> > modify formats.
> >> >
> >> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> >> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
> >> > tried to
> >> > work around it as follows:
> >> >
> >> > 1. Disabled consumer caching.  This increased the total job time from
> ~1
> >> > minute per batch to ~1.8 minutes per batch.  This performance penalty
> is
> >> > unacceptable for our use-case. We also saw some partitions stop
> >> > receiving
> >> > for an extended period of time - I was unable to get a simple repro
> for
> >> > this
> >> > effect though.
> >> > 2. Disabled speculation and multiple-job concurrency and added caching
> >> > for
> >> > the stream directly after reading from Kafka & caching offsets.  This
> >> > approach seems to work well for simple examples (read from a Kafka
> >> > topic,
> >> > write to another topic). However, when we move through more complex
> >> > logic we
> >> > continue to see this type of error - despite only creating the stream
> >> > for a
> >> > given topic a single time.  We validated that we're creating the
> stream
> >> > from
> >> > a given topic / partition a single time by logging on stream creation,
> >> > caching the stream and (eventually) calling 'runJob' to actually go
> and
> >> > fetch the data. Nonetheless with multiple outputs we see the
> >> > ConcurrentModificationException.
> >> >
> >> > I've included some code down below.  I would be happy if anyone had
> >> > debugging tips for the workaround.  However, my main concern is to
> >> > ensure
> >> > that the 2.4 version will have a bug fix that will work for Spark
> >> > Streaming
> >> > in which multiple input topics map data to multiple outputs. I would
> >> > also
> >> > like to understand if the fix
> >> > (https://github.com/apache/spark/pull/20997)
> >> > will be backported to Spark 2.3.x
> >> >
> >> > In our code, read looks like the following:
> >> >
> >> > case class StreamLookupKey(topic: Set[String], brokers: String)
> >> >
> >> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] =
> >> > Map()
> >> >
> >> > // Given inputs return a direct stream.
> >> > def createDirectStream(ssc: StreamingContext,
> >> >                        additionalKafkaParameters: Map[String, String],
> >> >                        brokersToUse: Array[String], //
> >> > broker1,broker2|broker3,broker4
> >> >                        topicsToUse: Array[String],
> >> >                        applicationName: String,
> >> >                        persist: Option[PersistenceManager],
> >> >                        useOldestOffsets: Boolean,
> >> >                        maxRatePerPartition: Long,
> >> >                        batchSeconds: Int
> >> >                       ): DStream[DecodedData] = {
> >> >   val streams: Array[DStream[DecodedData]] =
> >> >     brokersToUse.map(brokers => {
> >> >       val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
> >> >       val kafkaParameters: Map[String, String] =
> >> > getKafkaParameters(brokers,
> >> > useOldestOffsets, groupId) ++ additionalKafkaParameters
> >> >       logger.info(s"Kafka Params: ${kafkaParameters}")
> >> >       val topics = topicsToUse.toSet
> >> >       logger.info(s"Creating Kafka direct connection -
> >> > ${kafkaParameters.mkString(GeneralConstants.comma)} " +
> >> >         s"topics: ${topics.mkString(GeneralConstants.comma)} w/
> >> > applicationGroup: ${groupId}")
> >> >
> >> >       streamMap.getOrElse(StreamLookupKey(topics, brokers),
> >> > createKafkaStream(ssc, applicationName, topics, brokers,
> >> > maxRatePerPartition, batchSeconds, kafkaParameters))
> >> >     })
> >> >
> >> >   ssc.union(streams)
> >> > }
> >> >
> >> > private def createKafkaStream(ssc: StreamingContext, applicationName:
> >> > String, topics: Set[String], brokers: String,
> >> >                               maxRatePerPartition: Long, batchSeconds:
> >> > Int,
> >> > kafkaParameters: Map[String,String]): DStream[DecodedData] = {
> >> >   logger.info(s"Creating a stream from Kafka for application
> >> > ${applicationName} w/ topic ${topics} and " +
> >> >     s"brokers: ${brokers.split(',').head} with parameters:
> >> > ${kafkaParameters.mkString("|")}")
> >> >   try {
> >> >     val consumerStrategy = ConsumerStrategies.Subscribe[String,
> >> > DecodedData](topics.toSeq, kafkaParameters)
> >> >     val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
> >> >       KafkaUtils.createDirectStream(ssc, locationStrategy =
> >> > LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)
> >> >
> >> >     KafkaStreamFactory.writeStreamOffsets(applicationName, brokers,
> >> > stream,
> >> > maxRatePerPartition, batchSeconds)
> >> >     val result =
> >> >
> >> >
> stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel)
> >> >     streamMap += StreamLookupKey(topics, brokers) -> result
> >> >     result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator:
> >> > Iterator[_])
> >> > => {}))
> >> >     result
> >> >   } catch ErrorHandling.safelyCatch {
> >> >     case e: Exception =>
> >> >       logger.error("Unable to create direct stream:")
> >> >       e.printStackTrace()
> >> >       throw KafkaDirectStreamException(topics.toArray, brokers, e)
> >> >   }
> >> > }
> >> >
> >> > def getKafkaParameters(brokers: String, useOldestOffsets: Boolean,
> >> > applicationName: String): Map[String, String] =
> >> >   Map[String, String](
> >> >     "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else
> >> > "latest"),
> >> >     "enable.auto.commit" -> false.toString, // we'll commit these
> >> > manually
> >> >     "key.deserializer" ->
> >> >
> >> >
> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
> >> >     "value.deserializer" ->
> >> > classOf[Decoders.MixedDecoder].getCanonicalName,
> >> >     "partition.assignment.strategy" ->
> >> >
> >> >
> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
> >> >     "bootstrap.servers" -> brokers,
> >> >     "group.id" -> applicationName,
> >> >     "session.timeout.ms" -> 240000.toString,
> >> >     "request.timeout.ms"-> 300000.toString
> >> >   )
> >> >
> >> > Write code looks like the following:
> >> >
> >> > def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String],
> >> > conv:
> >> > (T) => Array[Byte], numPartitions: Int): Unit = {
> >> >   val rddToWrite =
> >> >     if (numPartitions > 0) {
> >> >       rdd.repartition(numPartitions)
> >> >     } else {
> >> >       rdd
> >> >     }
> >> >
> >> >   // Get session from current threads session
> >> >   val session = SparkSession.builder().getOrCreate()
> >> >   val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))),
> >> > StructType(Array(StructField("value", BinaryType))))
> >> >   df.selectExpr("CAST('' AS STRING)", "value")
> >> >     .write
> >> >     .format("kafka")
> >> >     .option("kafka.bootstrap.servers", getBrokersToUse(brokers))
> >> >     .option("compression.type", "gzip")
> >> >     .option("retries", "3")
> >> >     .option("topic", topic)
> >> >     .save()
> >> > }
> >> >
> >> > Regards,
> >> >
> >> > Bryan Jeffrey
>

Reply via email to