Hi Davide, Please see the doc: *Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.*
Have you tried the same with Structured Streaming and not with DStreams? If you insist somehow to DStreams you can use spark-streaming-kafka-0-10 connector instead. BR, G On Fri, Jul 24, 2020 at 12:08 PM Davide Curcio <davide.cur...@live.com> wrote: > Hi, > > I'm trying to use Spark Streaming with a very simple script like this: > > from pyspark import SparkContext, SparkConf > from pyspark.streaming import StreamingContext > from pyspark.streaming.kafka import KafkaUtils > > > sc = SparkContext(appName="PythonSparkStreamingKafka") > > > ssc = StreamingContext(sc, 1) > kafkaParams = {"metadata.broker.list": "172.31.71.104:9092", > "auto.offset.reset": "smallest"} > > training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams) > > training.pprint() > > ssc.start() > ssc.awaitTermination() > > But although locally it works, with the cluster using Standalone mode it > crashes. I have a cluster with 4 machines: > > 1 machine with Kafka Producer, 1 Broker and 1 Zookeeper > 1 machine is the driver > 2 machines are the workers. > > The strange thing is that when I had Kafka Producer, Broker and Zookeeper > in the same machine in which I have the driver, it worked both locally and > in the cluster. But obviously for the sake of scalability and modularity > I'd like to use the current configuration. > > I'm using Spark 2.4.6, the Kafka Streaming API are > "spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that > I'm currently using is kafka_2.11-2.4.1 > > The result is the following: > > 020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in > stage 0.0 (TID 0, 172.31.69.185, executor 0): > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) > at > org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) > > 2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1 > in stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785 > bytes) > 2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in > stage 0.0 (TID 1) on 172.31.69.185, executor 0: > java.nio.channels.ClosedChannelException (null) [duplicate 1] > 2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2 > in stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785 > bytes) > 2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in > stage 0.0 (TID 2) on 172.31.69.185, executor 0: > java.nio.channels.ClosedChannelException (null) [duplicate 2] > 2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3 > in stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785 > bytes) > 2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time > 1595584106000 ms > 2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added > broadcast_0_piece0 in memory on 172.31.79.221:44371 (size: 4.0 KB, free: > 366.3 MB) > 2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time > 1595584107000 ms > 2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in > stage 0.0 (TID 3) on 172.31.79.221, executor 1: > java.nio.channels.ClosedChannelException (null) [duplicate 3] > 2020-07-24 09:48:27,167 ERROR scheduler.TaskSetManager: Task 0 in stage > 0.0 failed 4 times; aborting job > 2020-07-24 09:48:27,171 INFO scheduler.TaskSchedulerImpl: Removed TaskSet > 0.0, whose tasks have all completed, from pool > 2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Cancelling stage > 0 > 2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Killing all > running tasks in stage 0: Stage cancelled > 2020-07-24 09:48:27,174 INFO scheduler.DAGScheduler: ResultStage 0 (runJob > at PythonRDD.scala:153) failed in 2.943 s due to Job aborted due to stage > failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task > 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1): > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) > at > org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) > > Driver stacktrace: > 2020-07-24 09:48:27,179 INFO scheduler.DAGScheduler: Job 0 failed: runJob > at PythonRDD.scala:153, took 3.010820 s > 2020-07-24 09:48:27,190 INFO scheduler.JobScheduler: Finished job > streaming job 1595584104000 ms.0 from job set of time 1595584104000 ms > 2020-07-24 09:48:27,191 INFO scheduler.JobScheduler: Starting job > streaming job 1595584105000 ms.0 from job set of time 1595584105000 ms > 2020-07-24 09:48:27,193 ERROR scheduler.JobScheduler: Error running job > streaming job 1595584104000 ms.0 > org.apache.spark.SparkException: An exception was raised by Python: > Traceback (most recent call last): > File > "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line > 68, in call > r = self.func(t, *rdds) > File > "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", > line 173, in takeAndPrint > taken = rdd.take(num + 1) > File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/rdd.py", line > 1360, in take > res = self.context.runJob(self, takeUpToNumLeft, p) > File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/context.py", > line 1069, in runJob > sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), > mappedRDD._jrdd, partitions) > File > "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", > line 1257, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line > 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task > 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage > 0.0 (TID 3, 172.31.79.221, executor 1): > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) > at > org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) > at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153) > at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:282) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) > at > org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) > > > at > org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95) > at > org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) > at > org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) > at > org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at scala.util.Try$.apply(Try.scala:192) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2020-07-24 09:48:27,211 INFO spark.SparkContext: Starting job: runJob at > PythonRDD.scala:153 > 2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Got job 1 (runJob at > PythonRDD.scala:153) with 1 output partitions > 2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Final stage: > ResultStage 1 (runJob at PythonRDD.scala:153) > 2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Parents of final > stage: List() > 2020-07-24 09:48:27,216 INFO scheduler.DAGScheduler: Missing parents: > List() > 2020-07-24 09:48:27,216 INFO scheduler.DAGScheduler: Submitting > ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:53), which has no > missing parents > 2020-07-24 09:48:27,220 INFO memory.MemoryStore: Block broadcast_1 stored > as values in memory (estimated size 6.9 KB, free 366.3 MB) > 2020-07-24 09:48:27,223 INFO memory.MemoryStore: Block broadcast_1_piece0 > stored as bytes in memory (estimated size 4.0 KB, free 366.3 MB) > 2020-07-24 09:48:27,225 INFO storage.BlockManagerInfo: Added > broadcast_1_piece0 in memory on ip-172-31-69-46.ec2.internal:41579 (size: > 4.0 KB, free: 366.3 MB) > 2020-07-24 09:48:27,226 INFO spark.SparkContext: Created broadcast 1 from > broadcast at DAGScheduler.scala:1163 > 2020-07-24 09:48:27,227 INFO scheduler.DAGScheduler: Submitting 1 missing > tasks from ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:53) (first > 15 tasks are for partitions Vector(0)) > 2020-07-24 09:48:27,229 INFO scheduler.TaskSchedulerImpl: Adding task set > 1.0 with 1 tasks > 2020-07-24 09:48:27,230 INFO scheduler.TaskSetManager: Starting task 0.0 > in stage 1.0 (TID 4, 172.31.69.185, executor 0, partition 0, ANY, 7785 > bytes) > 2020-07-24 09:48:27,248 INFO storage.BlockManagerInfo: Added > broadcast_1_piece0 in memory on 172.31.69.185:44675 (size: 4.0 KB, free: > 366.3 MB) > Traceback (most recent call last): > File "/home/ubuntu/./prova2.py", line 22, in <module> > ssc.awaitTermination() > File > "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", > line 192, in awaitTermination > File > "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", > line 1257, in __call__ > File > "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line > 328, in get_return_value > 2020-07-24 09:48:27,315 INFO scheduler.TaskSetManager: Finished task 0.0 > in stage 1.0 (TID 4) in 85 ms on 172.31.69.185 (executor 0) (1/1) > 2020-07-24 09:48:27,316 INFO scheduler.TaskSchedulerImpl: Removed TaskSet > 1.0, whose tasks have all completed, from pool > 2020-07-24 09:48:27,321 INFO python.PythonAccumulatorV2: Connected to > AccumulatorServer at host: 127.0.0.1 port: 34673 > 2020-07-24 09:48:27,324 INFO scheduler.DAGScheduler: ResultStage 1 (runJob > at PythonRDD.scala:153) finished in 0.106 s > 2020-07-24 09:48:27,325 INFO scheduler.DAGScheduler: Job 1 finished: > runJob at PythonRDD.scala:153, took 0.113169 s > 2020-07-24 09:48:27,448 INFO spark.ContextCleaner: Cleaned accumulator 20 > 2020-07-24 09:48:27,448 INFO spark.ContextCleaner: Cleaned accumulator 13 > 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 3 > 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 8 > 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 7 > 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 10 > 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 4 > 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 6 > 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 11 > 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 5 > 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 25 > py4j.protocol.Py4JJavaError: An error occurred while calling > o23.awaitTermination. > : org.apache.spark.SparkException: An exception was raised by Python: > Traceback (most recent call last): > File > "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line > 68, in call > r = self.func(t, *rdds) > File > "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", > line 173, in takeAndPrint > taken = rdd.take(num + 1) > File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/rdd.py", line > 1360, in take > res = self.context.runJob(self, takeUpToNumLeft, p) > File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/context.py", > line 1069, in runJob > sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), > mappedRDD._jrdd, partitions) > File > "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", > line 1257, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line > 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task > 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage > 0.0 (TID 3, 172.31.79.221, executor 1): > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) > at > org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) > at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153) > at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:282) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224) > at > org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195) > > > at > org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95) > at > org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) > at > org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) > at > org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at scala.util.Try$.apply(Try.scala:192) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > 2020-07-24 09:48:27,475 INFO storage.BlockManagerInfo: Removed > broadcast_1_piece0 on ip-172-31-69-46.ec2.internal:41579 in memory (size: > 4.0 KB, free: 366.3 MB) > 2020-07-24 09:48:27,477 INFO storage.BlockManagerInfo: Removed > broadcast_1_piece0 on 172.31.69.185:44675 in memory (size: 4.0 KB, free: > 366.3 MB) > 2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 15 > 2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 23 > 2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 21 > 2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 12 > 2020-07-24 09:48:27,511 INFO spark.ContextCleaner: Cleaned accumulator 2 > 2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 18 > 2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 1 > 2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 14 > 2020-07-24 09:48:27,514 INFO streaming.StreamingContext: Invoking > stop(stopGracefully=false) from shutdown hook > 2020-07-24 09:48:27,511 INFO scheduler.JobScheduler: Finished job > streaming job 1595584105000 ms.0 from job set of time 1595584105000 ms > 2020-07-24 09:48:27,523 INFO scheduler.ReceiverTracker: ReceiverTracker > stopped > 2020-07-24 09:48:27,523 INFO scheduler.JobGenerator: Stopping JobGenerator > immediately > 2020-07-24 09:48:27,524 INFO scheduler.JobScheduler: Starting job > streaming job 1595584106000 ms.0 from job set of time 1595584106000 ms > 2020-07-24 09:48:27,527 INFO scheduler.JobScheduler: Finished job > streaming job 1595584106000 ms.0 from job set of time 1595584106000 ms > 2020-07-24 09:48:27,528 INFO storage.BlockManagerInfo: Removed > broadcast_0_piece0 on 172.31.69.185:44675 in memory (size: 4.0 KB, free: > 366.3 MB) > 2020-07-24 09:48:27,529 INFO storage.BlockManagerInfo: Removed > broadcast_0_piece0 on 172.31.79.221:44371 in memory (size: 4.0 KB, free: > 366.3 MB) > 2020-07-24 09:48:27,530 INFO storage.BlockManagerInfo: Removed > broadcast_0_piece0 on ip-172-31-69-46.ec2.internal:41579 in memory (size: > 4.0 KB, free: 366.3 MB) > 2020-07-24 09:48:27,531 INFO scheduler.JobScheduler: Starting job > streaming job 1595584107000 ms.0 from job set of time 1595584107000 ms > 2020-07-24 09:48:27,532 INFO scheduler.JobScheduler: Finished job > streaming job 1595584107000 ms.0 from job set of time 1595584107000 ms > 2020-07-24 09:48:27,532 ERROR scheduler.JobScheduler: Error running job > streaming job 1595584105000 ms.0 > py4j.Py4JException: Error while sending a command. > at py4j.CallbackClient.sendCommand(CallbackClient.java:397) > at py4j.CallbackClient.sendCommand(CallbackClient.java:356) > at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106) > at com.sun.proxy.$Proxy18.call(Unknown Source) > at > org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92) > at > org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) > at > org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) > at > org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at scala.util.Try$.apply(Try.scala:192) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: py4j.Py4JNetworkException: Error while sending a command: null > response: c > p2 > call > L1595584105000 > lo96 > e > > at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158) > at py4j.CallbackClient.sendCommand(CallbackClient.java:384) > ... 24 more > 2020-07-24 09:48:27,534 ERROR scheduler.JobScheduler: Error running job > streaming job 1595584106000 ms.0 > py4j.Py4JException: Error while sending a command. > at py4j.CallbackClient.sendCommand(CallbackClient.java:397) > at py4j.CallbackClient.sendCommand(CallbackClient.java:356) > at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106) > at com.sun.proxy.$Proxy18.call(Unknown Source) > at > org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92) > at > org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) > at > org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) > at > org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at scala.util.Try$.apply(Try.scala:192) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: py4j.Py4JNetworkException: Error while sending a command: null > response: c > p2 > call > L1595584106000 > lo113 > e > > at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158) > at py4j.CallbackClient.sendCommand(CallbackClient.java:384) > ... 24 more > 2020-07-24 09:48:27,535 ERROR scheduler.JobScheduler: Error running job > streaming job 1595584107000 ms.0 > py4j.Py4JException: Cannot obtain a new communication channel > at py4j.CallbackClient.sendCommand(CallbackClient.java:380) > at py4j.CallbackClient.sendCommand(CallbackClient.java:356) > at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106) > at com.sun.proxy.$Proxy18.call(Unknown Source) > at > org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92) > at > org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) > at > org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) > at > org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at scala.util.Try$.apply(Try.scala:192) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2020-07-24 09:48:27,534 INFO util.RecurringTimer: Stopped timer for > JobGenerator after time 1595584107000 > 2020-07-24 09:48:27,540 ERROR python.PythonDStream$$anon$1: Cannot connect > to Python process. It's probably dead. Stopping StreamingContext. > py4j.Py4JException: Cannot obtain a new communication channel > at py4j.CallbackClient.sendCommand(CallbackClient.java:380) > at py4j.CallbackClient.sendCommand(CallbackClient.java:356) > at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106) > at com.sun.proxy.$Proxy18.call(Unknown Source) > at > org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92) > at > org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) > at > org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) > at > org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at scala.util.Try$.apply(Try.scala:192) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2020-07-24 09:48:27,544 INFO scheduler.JobGenerator: Stopped JobGenerator > 2020-07-24 09:48:27,551 INFO scheduler.JobScheduler: Stopped JobScheduler > 2020-07-24 09:48:27,556 INFO handler.ContextHandler: Stopped > o.s.j.s.ServletContextHandler@6ad44bd5{/streaming,null,UNAVAILABLE,@Spark} > 2020-07-24 09:48:27,557 INFO handler.ContextHandler: Stopped > o.s.j.s.ServletContextHandler@68ac401f > {/streaming/json,null,UNAVAILABLE,@Spark} > 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 16 > 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 9 > 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 19 > 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 22 > 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 17 > 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 24 > 2020-07-24 09:48:27,559 INFO handler.ContextHandler: Stopped > o.s.j.s.ServletContextHandler@11792245 > {/streaming/batch,null,UNAVAILABLE,@Spark} > 2020-07-24 09:48:27,559 INFO handler.ContextHandler: Stopped > o.s.j.s.ServletContextHandler@2e5d35e4 > {/streaming/batch/json,null,UNAVAILABLE,@Spark} > 2020-07-24 09:48:27,560 INFO handler.ContextHandler: Stopped > o.s.j.s.ServletContextHandler@2dedce2c > {/static/streaming,null,UNAVAILABLE,@Spark} > 2020-07-24 09:48:27,562 INFO streaming.StreamingContext: StreamingContext > stopped successfully > 2020-07-24 09:48:27,562 WARN streaming.StreamingContext: StreamingContext > has already been stopped > 2020-07-24 09:48:27,562 INFO spark.SparkContext: Invoking stop() from > shutdown hook > 2020-07-24 09:48:27,569 INFO server.AbstractConnector: Stopped > Spark@4c8c11ce{HTTP/1.1,[http/1.1]}{0.0.0.0:4040} > 2020-07-24 09:48:27,572 INFO ui.SparkUI: Stopped Spark web UI at > http://ip-172-31-69-46.ec2.internal:4040 > 2020-07-24 09:48:27,575 INFO cluster.StandaloneSchedulerBackend: Shutting > down all executors > 2020-07-24 09:48:27,576 INFO > cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor > to shut down > 2020-07-24 09:48:27,592 INFO spark.MapOutputTrackerMasterEndpoint: > MapOutputTrackerMasterEndpoint stopped! > 2020-07-24 09:48:27,600 INFO memory.MemoryStore: MemoryStore cleared > 2020-07-24 09:48:27,600 INFO storage.BlockManager: BlockManager stopped > 2020-07-24 09:48:27,604 INFO storage.BlockManagerMaster: > BlockManagerMaster stopped > 2020-07-24 09:48:27,607 INFO > scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: > OutputCommitCoordinator stopped! > 2020-07-24 09:48:27,613 INFO spark.SparkContext: Successfully stopped > SparkContext > 2020-07-24 09:48:27,614 INFO util.ShutdownHookManager: Shutdown hook called > 2020-07-24 09:48:27,615 INFO util.ShutdownHookManager: Deleting directory > /tmp/spark-85b5d4dc-0524-4148-aff4-8a77deb6ccca > 2020-07-24 09:48:27,617 INFO util.ShutdownHookManager: Deleting directory > /tmp/spark-49e4b426-544b-4cb4-a3e2-c0d98985e7b7 > 2020-07-24 09:48:27,620 INFO util.ShutdownHookManager: Deleting directory > /tmp/spark-49e4b426-544b-4cb4-a3e2-c0d98985e7b7/pyspark-c0db4268-2afd-4336-bf77-9dc7257213d2 > > I debug everything but I don't have any idea about how to solve this > problem. Do you have any suggestion? It could be a Kafka configuration > problem? > Thanks in advance, > > Davide >