Hi Davide,

Please see the doc:
*Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.*

Have you tried the same with Structured Streaming and not with DStreams?
If you insist somehow to DStreams you can use spark-streaming-kafka-0-10
connector instead.

BR,
G


On Fri, Jul 24, 2020 at 12:08 PM Davide Curcio <davide.cur...@live.com>
wrote:

> Hi,
>
> I'm trying to use Spark Streaming with a very simple script like this:
>
> from pyspark import SparkContext, SparkConf
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kafka import KafkaUtils
>
>
> sc = SparkContext(appName="PythonSparkStreamingKafka")
>
>
> ssc = StreamingContext(sc, 1)
> kafkaParams = {"metadata.broker.list": "172.31.71.104:9092",
>                        "auto.offset.reset": "smallest"}
>
> training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)
>
> training.pprint()
>
> ssc.start()
> ssc.awaitTermination()
>
> But although locally it works, with the cluster using Standalone mode it
> crashes. I have a cluster with 4 machines:
>
> 1 machine with Kafka Producer, 1 Broker and 1 Zookeeper
> 1 machine is the driver
> 2 machines are the workers.
>
> The strange thing is that when I had Kafka Producer, Broker and Zookeeper
> in the same machine in which I have the driver, it worked both locally and
> in the cluster. But obviously for the sake of scalability and modularity
> I'd like to use the current configuration.
>
> I'm using Spark 2.4.6, the Kafka Streaming API are
> "spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that
> I'm currently using is kafka_2.11-2.4.1
>
> The result is the following:
>
> 020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in
> stage 0.0 (TID 0, 172.31.69.185, executor 0):
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
> at
> org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
>
> 2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1
> in stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in
> stage 0.0 (TID 1) on 172.31.69.185, executor 0:
> java.nio.channels.ClosedChannelException (null) [duplicate 1]
> 2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2
> in stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in
> stage 0.0 (TID 2) on 172.31.69.185, executor 0:
> java.nio.channels.ClosedChannelException (null) [duplicate 2]
> 2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3
> in stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time
> 1595584106000 ms
> 2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added
> broadcast_0_piece0 in memory on 172.31.79.221:44371 (size: 4.0 KB, free:
> 366.3 MB)
> 2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time
> 1595584107000 ms
> 2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in
> stage 0.0 (TID 3) on 172.31.79.221, executor 1:
> java.nio.channels.ClosedChannelException (null) [duplicate 3]
> 2020-07-24 09:48:27,167 ERROR scheduler.TaskSetManager: Task 0 in stage
> 0.0 failed 4 times; aborting job
> 2020-07-24 09:48:27,171 INFO scheduler.TaskSchedulerImpl: Removed TaskSet
> 0.0, whose tasks have all completed, from pool
> 2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Cancelling stage
> 0
> 2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Killing all
> running tasks in stage 0: Stage cancelled
> 2020-07-24 09:48:27,174 INFO scheduler.DAGScheduler: ResultStage 0 (runJob
> at PythonRDD.scala:153) failed in 2.943 s due to Job aborted due to stage
> failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task
> 0.3 in stage 0.0 (TID 3, 172.31.79.221, executor 1):
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
> at
> org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
>
> Driver stacktrace:
> 2020-07-24 09:48:27,179 INFO scheduler.DAGScheduler: Job 0 failed: runJob
> at PythonRDD.scala:153, took 3.010820 s
> 2020-07-24 09:48:27,190 INFO scheduler.JobScheduler: Finished job
> streaming job 1595584104000 ms.0 from job set of time 1595584104000 ms
> 2020-07-24 09:48:27,191 INFO scheduler.JobScheduler: Starting job
> streaming job 1595584105000 ms.0 from job set of time 1595584105000 ms
> 2020-07-24 09:48:27,193 ERROR scheduler.JobScheduler: Error running job
> streaming job 1595584104000 ms.0
> org.apache.spark.SparkException: An exception was raised by Python:
> Traceback (most recent call last):
>   File
> "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line
> 68, in call
>     r = self.func(t, *rdds)
>   File
> "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py",
> line 173, in takeAndPrint
>     taken = rdd.take(num + 1)
>   File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/rdd.py", line
> 1360, in take
>     res = self.context.runJob(self, takeUpToNumLeft, p)
>   File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/context.py",
> line 1069, in runJob
>     sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(),
> mappedRDD._jrdd, partitions)
>   File
> "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File
> "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line
> 328, in get_return_value
>     format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 0.0 (TID 3, 172.31.79.221, executor 1):
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
> at
> org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
> at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
> at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
> at
> org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
>
>
> at
> org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
> at
> org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
> at
> org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
> at
> org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at scala.util.Try$.apply(Try.scala:192)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2020-07-24 09:48:27,211 INFO spark.SparkContext: Starting job: runJob at
> PythonRDD.scala:153
> 2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Got job 1 (runJob at
> PythonRDD.scala:153) with 1 output partitions
> 2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Final stage:
> ResultStage 1 (runJob at PythonRDD.scala:153)
> 2020-07-24 09:48:27,215 INFO scheduler.DAGScheduler: Parents of final
> stage: List()
> 2020-07-24 09:48:27,216 INFO scheduler.DAGScheduler: Missing parents:
> List()
> 2020-07-24 09:48:27,216 INFO scheduler.DAGScheduler: Submitting
> ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:53), which has no
> missing parents
> 2020-07-24 09:48:27,220 INFO memory.MemoryStore: Block broadcast_1 stored
> as values in memory (estimated size 6.9 KB, free 366.3 MB)
> 2020-07-24 09:48:27,223 INFO memory.MemoryStore: Block broadcast_1_piece0
> stored as bytes in memory (estimated size 4.0 KB, free 366.3 MB)
> 2020-07-24 09:48:27,225 INFO storage.BlockManagerInfo: Added
> broadcast_1_piece0 in memory on ip-172-31-69-46.ec2.internal:41579 (size:
> 4.0 KB, free: 366.3 MB)
> 2020-07-24 09:48:27,226 INFO spark.SparkContext: Created broadcast 1 from
> broadcast at DAGScheduler.scala:1163
> 2020-07-24 09:48:27,227 INFO scheduler.DAGScheduler: Submitting 1 missing
> tasks from ResultStage 1 (PythonRDD[9] at RDD at PythonRDD.scala:53) (first
> 15 tasks are for partitions Vector(0))
> 2020-07-24 09:48:27,229 INFO scheduler.TaskSchedulerImpl: Adding task set
> 1.0 with 1 tasks
> 2020-07-24 09:48:27,230 INFO scheduler.TaskSetManager: Starting task 0.0
> in stage 1.0 (TID 4, 172.31.69.185, executor 0, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:27,248 INFO storage.BlockManagerInfo: Added
> broadcast_1_piece0 in memory on 172.31.69.185:44675 (size: 4.0 KB, free:
> 366.3 MB)
> Traceback (most recent call last):
>   File "/home/ubuntu/./prova2.py", line 22, in <module>
>     ssc.awaitTermination()
>   File
> "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/context.py",
> line 192, in awaitTermination
>   File
> "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__
>   File
> "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line
> 328, in get_return_value
> 2020-07-24 09:48:27,315 INFO scheduler.TaskSetManager: Finished task 0.0
> in stage 1.0 (TID 4) in 85 ms on 172.31.69.185 (executor 0) (1/1)
> 2020-07-24 09:48:27,316 INFO scheduler.TaskSchedulerImpl: Removed TaskSet
> 1.0, whose tasks have all completed, from pool
> 2020-07-24 09:48:27,321 INFO python.PythonAccumulatorV2: Connected to
> AccumulatorServer at host: 127.0.0.1 port: 34673
> 2020-07-24 09:48:27,324 INFO scheduler.DAGScheduler: ResultStage 1 (runJob
> at PythonRDD.scala:153) finished in 0.106 s
> 2020-07-24 09:48:27,325 INFO scheduler.DAGScheduler: Job 1 finished:
> runJob at PythonRDD.scala:153, took 0.113169 s
> 2020-07-24 09:48:27,448 INFO spark.ContextCleaner: Cleaned accumulator 20
> 2020-07-24 09:48:27,448 INFO spark.ContextCleaner: Cleaned accumulator 13
> 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 3
> 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 8
> 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 7
> 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 10
> 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 4
> 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 6
> 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 11
> 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 5
> 2020-07-24 09:48:27,449 INFO spark.ContextCleaner: Cleaned accumulator 25
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o23.awaitTermination.
> : org.apache.spark.SparkException: An exception was raised by Python:
> Traceback (most recent call last):
>   File
> "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line
> 68, in call
>     r = self.func(t, *rdds)
>   File
> "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py",
> line 173, in takeAndPrint
>     taken = rdd.take(num + 1)
>   File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/rdd.py", line
> 1360, in take
>     res = self.context.runJob(self, takeUpToNumLeft, p)
>   File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/context.py",
> line 1069, in runJob
>     sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(),
> mappedRDD._jrdd, partitions)
>   File
> "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File
> "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line
> 328, in get_return_value
>     format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 0.0 (TID 3, 172.31.79.221, executor 1):
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
> at
> org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
> at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
> at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
> at
> org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
>
>
> at
> org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
> at
> org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
> at
> org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
> at
> org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at scala.util.Try$.apply(Try.scala:192)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> 2020-07-24 09:48:27,475 INFO storage.BlockManagerInfo: Removed
> broadcast_1_piece0 on ip-172-31-69-46.ec2.internal:41579 in memory (size:
> 4.0 KB, free: 366.3 MB)
> 2020-07-24 09:48:27,477 INFO storage.BlockManagerInfo: Removed
> broadcast_1_piece0 on 172.31.69.185:44675 in memory (size: 4.0 KB, free:
> 366.3 MB)
> 2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 15
> 2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 23
> 2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 21
> 2020-07-24 09:48:27,509 INFO spark.ContextCleaner: Cleaned accumulator 12
> 2020-07-24 09:48:27,511 INFO spark.ContextCleaner: Cleaned accumulator 2
> 2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 18
> 2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 1
> 2020-07-24 09:48:27,514 INFO spark.ContextCleaner: Cleaned accumulator 14
> 2020-07-24 09:48:27,514 INFO streaming.StreamingContext: Invoking
> stop(stopGracefully=false) from shutdown hook
> 2020-07-24 09:48:27,511 INFO scheduler.JobScheduler: Finished job
> streaming job 1595584105000 ms.0 from job set of time 1595584105000 ms
> 2020-07-24 09:48:27,523 INFO scheduler.ReceiverTracker: ReceiverTracker
> stopped
> 2020-07-24 09:48:27,523 INFO scheduler.JobGenerator: Stopping JobGenerator
> immediately
> 2020-07-24 09:48:27,524 INFO scheduler.JobScheduler: Starting job
> streaming job 1595584106000 ms.0 from job set of time 1595584106000 ms
> 2020-07-24 09:48:27,527 INFO scheduler.JobScheduler: Finished job
> streaming job 1595584106000 ms.0 from job set of time 1595584106000 ms
> 2020-07-24 09:48:27,528 INFO storage.BlockManagerInfo: Removed
> broadcast_0_piece0 on 172.31.69.185:44675 in memory (size: 4.0 KB, free:
> 366.3 MB)
> 2020-07-24 09:48:27,529 INFO storage.BlockManagerInfo: Removed
> broadcast_0_piece0 on 172.31.79.221:44371 in memory (size: 4.0 KB, free:
> 366.3 MB)
> 2020-07-24 09:48:27,530 INFO storage.BlockManagerInfo: Removed
> broadcast_0_piece0 on ip-172-31-69-46.ec2.internal:41579 in memory (size:
> 4.0 KB, free: 366.3 MB)
> 2020-07-24 09:48:27,531 INFO scheduler.JobScheduler: Starting job
> streaming job 1595584107000 ms.0 from job set of time 1595584107000 ms
> 2020-07-24 09:48:27,532 INFO scheduler.JobScheduler: Finished job
> streaming job 1595584107000 ms.0 from job set of time 1595584107000 ms
> 2020-07-24 09:48:27,532 ERROR scheduler.JobScheduler: Error running job
> streaming job 1595584105000 ms.0
> py4j.Py4JException: Error while sending a command.
> at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
> at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
> at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
> at com.sun.proxy.$Proxy18.call(Unknown Source)
> at
> org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
> at
> org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
> at
> org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
> at
> org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at scala.util.Try$.apply(Try.scala:192)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: py4j.Py4JNetworkException: Error while sending a command: null
> response: c
> p2
> call
> L1595584105000
> lo96
> e
>
> at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158)
> at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
> ... 24 more
> 2020-07-24 09:48:27,534 ERROR scheduler.JobScheduler: Error running job
> streaming job 1595584106000 ms.0
> py4j.Py4JException: Error while sending a command.
> at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
> at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
> at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
> at com.sun.proxy.$Proxy18.call(Unknown Source)
> at
> org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
> at
> org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
> at
> org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
> at
> org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at scala.util.Try$.apply(Try.scala:192)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: py4j.Py4JNetworkException: Error while sending a command: null
> response: c
> p2
> call
> L1595584106000
> lo113
> e
>
> at py4j.CallbackConnection.sendCommand(CallbackConnection.java:158)
> at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
> ... 24 more
> 2020-07-24 09:48:27,535 ERROR scheduler.JobScheduler: Error running job
> streaming job 1595584107000 ms.0
> py4j.Py4JException: Cannot obtain a new communication channel
> at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
> at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
> at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
> at com.sun.proxy.$Proxy18.call(Unknown Source)
> at
> org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
> at
> org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
> at
> org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
> at
> org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at scala.util.Try$.apply(Try.scala:192)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2020-07-24 09:48:27,534 INFO util.RecurringTimer: Stopped timer for
> JobGenerator after time 1595584107000
> 2020-07-24 09:48:27,540 ERROR python.PythonDStream$$anon$1: Cannot connect
> to Python process. It's probably dead. Stopping StreamingContext.
> py4j.Py4JException: Cannot obtain a new communication channel
> at py4j.CallbackClient.sendCommand(CallbackClient.java:380)
> at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
> at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
> at com.sun.proxy.$Proxy18.call(Unknown Source)
> at
> org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
> at
> org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
> at
> org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
> at
> org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at scala.util.Try$.apply(Try.scala:192)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 2020-07-24 09:48:27,544 INFO scheduler.JobGenerator: Stopped JobGenerator
> 2020-07-24 09:48:27,551 INFO scheduler.JobScheduler: Stopped JobScheduler
> 2020-07-24 09:48:27,556 INFO handler.ContextHandler: Stopped
> o.s.j.s.ServletContextHandler@6ad44bd5{/streaming,null,UNAVAILABLE,@Spark}
> 2020-07-24 09:48:27,557 INFO handler.ContextHandler: Stopped
> o.s.j.s.ServletContextHandler@68ac401f
> {/streaming/json,null,UNAVAILABLE,@Spark}
> 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 16
> 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 9
> 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 19
> 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 22
> 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 17
> 2020-07-24 09:48:27,558 INFO spark.ContextCleaner: Cleaned accumulator 24
> 2020-07-24 09:48:27,559 INFO handler.ContextHandler: Stopped
> o.s.j.s.ServletContextHandler@11792245
> {/streaming/batch,null,UNAVAILABLE,@Spark}
> 2020-07-24 09:48:27,559 INFO handler.ContextHandler: Stopped
> o.s.j.s.ServletContextHandler@2e5d35e4
> {/streaming/batch/json,null,UNAVAILABLE,@Spark}
> 2020-07-24 09:48:27,560 INFO handler.ContextHandler: Stopped
> o.s.j.s.ServletContextHandler@2dedce2c
> {/static/streaming,null,UNAVAILABLE,@Spark}
> 2020-07-24 09:48:27,562 INFO streaming.StreamingContext: StreamingContext
> stopped successfully
> 2020-07-24 09:48:27,562 WARN streaming.StreamingContext: StreamingContext
> has already been stopped
> 2020-07-24 09:48:27,562 INFO spark.SparkContext: Invoking stop() from
> shutdown hook
> 2020-07-24 09:48:27,569 INFO server.AbstractConnector: Stopped
> Spark@4c8c11ce{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
> 2020-07-24 09:48:27,572 INFO ui.SparkUI: Stopped Spark web UI at
> http://ip-172-31-69-46.ec2.internal:4040
> 2020-07-24 09:48:27,575 INFO cluster.StandaloneSchedulerBackend: Shutting
> down all executors
> 2020-07-24 09:48:27,576 INFO
> cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor
> to shut down
> 2020-07-24 09:48:27,592 INFO spark.MapOutputTrackerMasterEndpoint:
> MapOutputTrackerMasterEndpoint stopped!
> 2020-07-24 09:48:27,600 INFO memory.MemoryStore: MemoryStore cleared
> 2020-07-24 09:48:27,600 INFO storage.BlockManager: BlockManager stopped
> 2020-07-24 09:48:27,604 INFO storage.BlockManagerMaster:
> BlockManagerMaster stopped
> 2020-07-24 09:48:27,607 INFO
> scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
> OutputCommitCoordinator stopped!
> 2020-07-24 09:48:27,613 INFO spark.SparkContext: Successfully stopped
> SparkContext
> 2020-07-24 09:48:27,614 INFO util.ShutdownHookManager: Shutdown hook called
> 2020-07-24 09:48:27,615 INFO util.ShutdownHookManager: Deleting directory
> /tmp/spark-85b5d4dc-0524-4148-aff4-8a77deb6ccca
> 2020-07-24 09:48:27,617 INFO util.ShutdownHookManager: Deleting directory
> /tmp/spark-49e4b426-544b-4cb4-a3e2-c0d98985e7b7
> 2020-07-24 09:48:27,620 INFO util.ShutdownHookManager: Deleting directory
> /tmp/spark-49e4b426-544b-4cb4-a3e2-c0d98985e7b7/pyspark-c0db4268-2afd-4336-bf77-9dc7257213d2
>
> I debug everything but I don't have any idea about how to solve this
> problem. Do you have any suggestion? It could be a Kafka configuration
> problem?
> Thanks in advance,
>
> Davide
>

Reply via email to