Hurshal Patel created SPARK-13941:
-------------------------------------

             Summary: kafka.cluster.BrokerEndPoint cannot be cast to 
kafka.cluster.Broker
                 Key: SPARK-13941
                 URL: https://issues.apache.org/jira/browse/SPARK-13941
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.5.1
            Reporter: Hurshal Patel


I am connecting to a Kafka cluster with the following (anonymized) code:

{code:scala}
  var stream = KafkaUtils.createDirectStreamFromZookeeper[String, Array[Byte], 
StringDecoder, DefaultDecoder](
      ssc, kafkaParams, topics)
  stream.foreachRDD { rdd =>
    val df = sqlContext.createDataFrame(rdd.map(bytesToString), stringSchema)
    df.foreachPartition { partition => 
      val targetNode = chooseTarget(TaskContext.partitionId)
      loadPartition(targetNode, partition)
    }
  }
{code}

I am using Kafka 0.8.2.0-1.kafka1.2.0.p0.2 (Cloudera CDH 5.3.1) and Spark 1.4.1 
and this works fine.

After upgrading to Spark 1.5.1, my tasks are failing (stacktrace is below). Are 
there any notable changes to the KafkaDirectStream or KafkaRDD that would cause 
this or does Cloudera's Kafka distribution have known issues with 1.5.1?

{code}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in 
stage 12407.0 failed 4 times, most recent failure: Lost task 5.3 in stage 
12407.0 (TID 55638, 172.18.203.25): org.apache.spark.SparkException: Couldn't 
connect to leader for topic XXX: java.lang.ClassCastException: 
kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:164)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:164)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:163)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:155)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:135)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896)
at 
org.apache.spark.sql.DataFrame$$anonfun$foreachPartition$1.apply$mcV$sp(DataFrame.scala:1369)
at 
org.apache.spark.sql.DataFrame$$anonfun$foreachPartition$1.apply(DataFrame.scala:1369)
at 
org.apache.spark.sql.DataFrame$$anonfun$foreachPartition$1.apply(DataFrame.scala:1369)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1368)
at com.foobar.kafka.KafkaLoader.load(KafkaLoader.scala:35)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Couldn't connect to leader for 
topic XXX: java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be 
cast to kafka.cluster.Broker
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:164)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:164)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:163)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:155)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:135)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to