Re: Kafka with Spark Streaming work on local but it doesn't work in Standalone mode
Hi Davide, Please see the doc: *Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.* Have you tried the same with Structured Streaming and not with DStreams? If you insist somehow to DStreams you can use spark-streaming-kafka-0-10 connector instead. BR, G On Fri, Jul 24, 2020 at 12:08 PM Davide Curcio wrote: > Hi, > > I'm trying to use Spark Streaming with a very simple script like this: > > from pyspark import SparkContext, SparkConf > from pyspark.streaming import StreamingContext > from pyspark.streaming.kafka import KafkaUtils > > > sc = SparkContext(appName="PythonSparkStreamingKafka") > > > ssc = StreamingContext(sc, 1) > kafkaParams = {"metadata.broker.list": "172.31.71.104:9092", >"auto.offset.reset": "smallest"} > > training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams) > > training.pprint() > > ssc.start() > ssc.awaitTermination() > > But although locally it works, with the cluster using Standalone mode it > crashes. I have a cluster with 4 machines: > > 1 machine with Kafka Producer, 1 Broker and 1 Zookeeper > 1 machine is the driver > 2 machines are the workers. > > The strange thing is that when I had Kafka Producer, Broker and Zookeeper > in the same machine in which I have the driver, it worked both locally and > in the cluster. But obviously for the sake of scalability and modularity > I'd like to use the current configuration. > > I'm using Spark 2.4.6, the Kafka Streaming API are > "spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that > I'm currently using is kafka_2.11-2.4.1 > > The result is the following: > > 020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in > stage 0.0 (TID 0, 172.31.69.185, executor 0): > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) > at > org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) > > 2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1 > in stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785 > bytes) > 2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in > stage 0.0 (TID 1) on 172.31.69.185, executor 0: > java.nio.channels.ClosedChannelException (null) [duplicate 1] > 2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2 > in stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785 > bytes) > 2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in > stage 0.0 (TID 2) on 172.31.69.185, executor 0: > java.nio.channels.ClosedChannelException (null) [duplicate 2] > 2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3 > in stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785 > bytes) > 2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time > 1595584106000 ms > 2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added > broadcast_0_piece0 in memory on 172.31.79.221:44371 (size: 4.0 KB, free: > 366.3 MB) > 2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time > 1595584107000 ms > 2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in > stage 0.0 (TID 3) on 172.31.79.221, executor 1: >
Kafka with Spark Streaming work on local but it doesn't work in Standalone mode
Hi, I'm trying to use Spark Streaming with a very simple script like this: from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(appName="PythonSparkStreamingKafka") ssc = StreamingContext(sc, 1) kafkaParams = {"metadata.broker.list": "172.31.71.104:9092", "auto.offset.reset": "smallest"} training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams) training.pprint() ssc.start() ssc.awaitTermination() But although locally it works, with the cluster using Standalone mode it crashes. I have a cluster with 4 machines: 1 machine with Kafka Producer, 1 Broker and 1 Zookeeper 1 machine is the driver 2 machines are the workers. The strange thing is that when I had Kafka Producer, Broker and Zookeeper in the same machine in which I have the driver, it worked both locally and in the cluster. But obviously for the sake of scalability and modularity I'd like to use the current configuration. I'm using Spark 2.4.6, the Kafka Streaming API are "spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that I'm currently using is kafka_2.11-2.4.1 The result is the following: 020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 172.31.69.185, executor 0): java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) 2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes) 2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on 172.31.69.185, executor 0: java.nio.channels.ClosedChannelException (null) [duplicate 1] 2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes) 2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on 172.31.69.185, executor 0: java.nio.channels.ClosedChannelException (null) [duplicate 2] 2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785 bytes) 2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time 1595584106000 ms 2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.31.79.221:44371 (size: 4.0 KB, free: 366.3 MB) 2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time 1595584107000 ms 2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on 172.31.79.221, executor 1: java.nio.channels.ClosedChannelException (null) [duplicate 3] 2020-07-24 09:48:27,167 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 2020-07-24 09:48:27,171 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0 2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled