Cody, Yes - I was able to verify that I am not seeing duplicate calls to createDirectStream. If the spark-streaming-kafka-0-10 will work on a 2.3 cluster I can go ahead and give that a shot.
Regards, Bryan Jeffrey On Fri, Aug 31, 2018 at 11:56 AM Cody Koeninger <c...@koeninger.org> wrote: > Just to be 100% sure, when you're logging the group id in > createDirectStream, you no longer see any duplicates? > > Regarding testing master, is the blocker that your spark cluster is on > 2.3? There's at least a reasonable chance that building an > application assembly jar that uses the master version just for the > spark-streaming-kafka-0-10 artifact will still work on a 2.3 cluster > > On Fri, Aug 31, 2018 at 8:55 AM, Bryan Jeffrey <bryan.jeff...@gmail.com> > wrote: > > Cody, > > > > We are connecting to multiple clusters for each topic. I did experiment > > this morning with both adding a cluster identifier to the group id, as > well > > as simply moving to use only a single one of our clusters. Neither of > these > > were successful. I am not able to run a test against master now. > > > > Regards, > > > > Bryan Jeffrey > > > > > > > > > > On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> I doubt that fix will get backported to 2.3.x > >> > >> Are you able to test against master? 2.4 with the fix you linked to > >> is likely to hit code freeze soon. > >> > >> From a quick look at your code, I'm not sure why you're mapping over > >> an array of brokers. It seems like that would result in different > >> streams with the same group id, because broker isn't part of your > >> group id string. > >> > >> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey < > bryan.jeff...@gmail.com> > >> wrote: > >> > Hello, Spark Users. > >> > > >> > We have an application using Spark 2.3.0 and the 0.8 Kafka client. > >> > We're > >> > have a Spark streaming job, and we're reading a reasonable amount of > >> > data > >> > from Kafka (40 GB / minute or so). We would like to move to using the > >> > Kafka > >> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from > having > >> > to > >> > modify formats. > >> > > >> > We've run into https://issues.apache.org/jira/browse/SPARK-19185, > >> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'. I've > >> > tried to > >> > work around it as follows: > >> > > >> > 1. Disabled consumer caching. This increased the total job time from > ~1 > >> > minute per batch to ~1.8 minutes per batch. This performance penalty > is > >> > unacceptable for our use-case. We also saw some partitions stop > >> > receiving > >> > for an extended period of time - I was unable to get a simple repro > for > >> > this > >> > effect though. > >> > 2. Disabled speculation and multiple-job concurrency and added caching > >> > for > >> > the stream directly after reading from Kafka & caching offsets. This > >> > approach seems to work well for simple examples (read from a Kafka > >> > topic, > >> > write to another topic). However, when we move through more complex > >> > logic we > >> > continue to see this type of error - despite only creating the stream > >> > for a > >> > given topic a single time. We validated that we're creating the > stream > >> > from > >> > a given topic / partition a single time by logging on stream creation, > >> > caching the stream and (eventually) calling 'runJob' to actually go > and > >> > fetch the data. Nonetheless with multiple outputs we see the > >> > ConcurrentModificationException. > >> > > >> > I've included some code down below. I would be happy if anyone had > >> > debugging tips for the workaround. However, my main concern is to > >> > ensure > >> > that the 2.4 version will have a bug fix that will work for Spark > >> > Streaming > >> > in which multiple input topics map data to multiple outputs. I would > >> > also > >> > like to understand if the fix > >> > (https://github.com/apache/spark/pull/20997) > >> > will be backported to Spark 2.3.x > >> > > >> > In our code, read looks like the following: > >> > > >> > case class StreamLookupKey(topic: Set[String], brokers: String) > >> > > >> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = > >> > Map() > >> > > >> > // Given inputs return a direct stream. > >> > def createDirectStream(ssc: StreamingContext, > >> > additionalKafkaParameters: Map[String, String], > >> > brokersToUse: Array[String], // > >> > broker1,broker2|broker3,broker4 > >> > topicsToUse: Array[String], > >> > applicationName: String, > >> > persist: Option[PersistenceManager], > >> > useOldestOffsets: Boolean, > >> > maxRatePerPartition: Long, > >> > batchSeconds: Int > >> > ): DStream[DecodedData] = { > >> > val streams: Array[DStream[DecodedData]] = > >> > brokersToUse.map(brokers => { > >> > val groupId = s"${applicationName}~${topicsToUse.mkString("~")}" > >> > val kafkaParameters: Map[String, String] = > >> > getKafkaParameters(brokers, > >> > useOldestOffsets, groupId) ++ additionalKafkaParameters > >> > logger.info(s"Kafka Params: ${kafkaParameters}") > >> > val topics = topicsToUse.toSet > >> > logger.info(s"Creating Kafka direct connection - > >> > ${kafkaParameters.mkString(GeneralConstants.comma)} " + > >> > s"topics: ${topics.mkString(GeneralConstants.comma)} w/ > >> > applicationGroup: ${groupId}") > >> > > >> > streamMap.getOrElse(StreamLookupKey(topics, brokers), > >> > createKafkaStream(ssc, applicationName, topics, brokers, > >> > maxRatePerPartition, batchSeconds, kafkaParameters)) > >> > }) > >> > > >> > ssc.union(streams) > >> > } > >> > > >> > private def createKafkaStream(ssc: StreamingContext, applicationName: > >> > String, topics: Set[String], brokers: String, > >> > maxRatePerPartition: Long, batchSeconds: > >> > Int, > >> > kafkaParameters: Map[String,String]): DStream[DecodedData] = { > >> > logger.info(s"Creating a stream from Kafka for application > >> > ${applicationName} w/ topic ${topics} and " + > >> > s"brokers: ${brokers.split(',').head} with parameters: > >> > ${kafkaParameters.mkString("|")}") > >> > try { > >> > val consumerStrategy = ConsumerStrategies.Subscribe[String, > >> > DecodedData](topics.toSeq, kafkaParameters) > >> > val stream: InputDStream[ConsumerRecord[String, DecodedData]] = > >> > KafkaUtils.createDirectStream(ssc, locationStrategy = > >> > LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy) > >> > > >> > KafkaStreamFactory.writeStreamOffsets(applicationName, brokers, > >> > stream, > >> > maxRatePerPartition, batchSeconds) > >> > val result = > >> > > >> > > stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel) > >> > streamMap += StreamLookupKey(topics, brokers) -> result > >> > result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator: > >> > Iterator[_]) > >> > => {})) > >> > result > >> > } catch ErrorHandling.safelyCatch { > >> > case e: Exception => > >> > logger.error("Unable to create direct stream:") > >> > e.printStackTrace() > >> > throw KafkaDirectStreamException(topics.toArray, brokers, e) > >> > } > >> > } > >> > > >> > def getKafkaParameters(brokers: String, useOldestOffsets: Boolean, > >> > applicationName: String): Map[String, String] = > >> > Map[String, String]( > >> > "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else > >> > "latest"), > >> > "enable.auto.commit" -> false.toString, // we'll commit these > >> > manually > >> > "key.deserializer" -> > >> > > >> > > classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName, > >> > "value.deserializer" -> > >> > classOf[Decoders.MixedDecoder].getCanonicalName, > >> > "partition.assignment.strategy" -> > >> > > >> > > classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName, > >> > "bootstrap.servers" -> brokers, > >> > "group.id" -> applicationName, > >> > "session.timeout.ms" -> 240000.toString, > >> > "request.timeout.ms"-> 300000.toString > >> > ) > >> > > >> > Write code looks like the following: > >> > > >> > def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String], > >> > conv: > >> > (T) => Array[Byte], numPartitions: Int): Unit = { > >> > val rddToWrite = > >> > if (numPartitions > 0) { > >> > rdd.repartition(numPartitions) > >> > } else { > >> > rdd > >> > } > >> > > >> > // Get session from current threads session > >> > val session = SparkSession.builder().getOrCreate() > >> > val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))), > >> > StructType(Array(StructField("value", BinaryType)))) > >> > df.selectExpr("CAST('' AS STRING)", "value") > >> > .write > >> > .format("kafka") > >> > .option("kafka.bootstrap.servers", getBrokersToUse(brokers)) > >> > .option("compression.type", "gzip") > >> > .option("retries", "3") > >> > .option("topic", topic) > >> > .save() > >> > } > >> > > >> > Regards, > >> > > >> > Bryan Jeffrey >