Re: Kafka with Spark Streaming work on local but it doesn't work in Standalone mode

2020-07-24 Thread Gabor Somogyi
Hi Davide,

Please see the doc:
*Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.*

Have you tried the same with Structured Streaming and not with DStreams?
If you insist somehow to DStreams you can use spark-streaming-kafka-0-10
connector instead.

BR,
G


On Fri, Jul 24, 2020 at 12:08 PM Davide Curcio 
wrote:

> Hi,
>
> I'm trying to use Spark Streaming with a very simple script like this:
>
> from pyspark import SparkContext, SparkConf
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kafka import KafkaUtils
>
>
> sc = SparkContext(appName="PythonSparkStreamingKafka")
>
>
> ssc = StreamingContext(sc, 1)
> kafkaParams = {"metadata.broker.list": "172.31.71.104:9092",
>"auto.offset.reset": "smallest"}
>
> training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)
>
> training.pprint()
>
> ssc.start()
> ssc.awaitTermination()
>
> But although locally it works, with the cluster using Standalone mode it
> crashes. I have a cluster with 4 machines:
>
> 1 machine with Kafka Producer, 1 Broker and 1 Zookeeper
> 1 machine is the driver
> 2 machines are the workers.
>
> The strange thing is that when I had Kafka Producer, Broker and Zookeeper
> in the same machine in which I have the driver, it worked both locally and
> in the cluster. But obviously for the sake of scalability and modularity
> I'd like to use the current configuration.
>
> I'm using Spark 2.4.6, the Kafka Streaming API are
> "spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that
> I'm currently using is kafka_2.11-2.4.1
>
> The result is the following:
>
> 020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in
> stage 0.0 (TID 0, 172.31.69.185, executor 0):
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
> at
> org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
>
> 2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1
> in stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in
> stage 0.0 (TID 1) on 172.31.69.185, executor 0:
> java.nio.channels.ClosedChannelException (null) [duplicate 1]
> 2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2
> in stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in
> stage 0.0 (TID 2) on 172.31.69.185, executor 0:
> java.nio.channels.ClosedChannelException (null) [duplicate 2]
> 2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3
> in stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785
> bytes)
> 2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time
> 1595584106000 ms
> 2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added
> broadcast_0_piece0 in memory on 172.31.79.221:44371 (size: 4.0 KB, free:
> 366.3 MB)
> 2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time
> 1595584107000 ms
> 2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in
> stage 0.0 (TID 3) on 172.31.79.221, executor 1:
> 

Kafka with Spark Streaming work on local but it doesn't work in Standalone mode

2020-07-24 Thread Davide Curcio
Hi,

I'm trying to use Spark Streaming with a very simple script like this:

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


sc = SparkContext(appName="PythonSparkStreamingKafka")


ssc = StreamingContext(sc, 1)
kafkaParams = {"metadata.broker.list": "172.31.71.104:9092",
   "auto.offset.reset": "smallest"}

training = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)

training.pprint()

ssc.start()
ssc.awaitTermination()
But although locally it works, with the cluster using Standalone mode it 
crashes. I have a cluster with 4 machines:

1 machine with Kafka Producer, 1 Broker and 1 Zookeeper
1 machine is the driver
2 machines are the workers.

The strange thing is that when I had Kafka Producer, Broker and Zookeeper in 
the same machine in which I have the driver, it worked both locally and in the 
cluster. But obviously for the sake of scalability and modularity I'd like to 
use the current configuration.

I'm using Spark 2.4.6, the Kafka Streaming API are 
"spark-streaming-kafka-0-8-assembly_2.11-2.4.6" and the Kafka version that I'm 
currently using is kafka_2.11-2.4.1

The result is the following:

020-07-24 09:48:25,869 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 
0.0 (TID 0, 172.31.69.185, executor 0): java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:197)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

2020-07-24 09:48:25,875 INFO scheduler.TaskSetManager: Starting task 0.1 in 
stage 0.0 (TID 1, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:25,950 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 
0.0 (TID 1) on 172.31.69.185, executor 0: 
java.nio.channels.ClosedChannelException (null) [duplicate 1]
2020-07-24 09:48:25,952 INFO scheduler.TaskSetManager: Starting task 0.2 in 
stage 0.0 (TID 2, 172.31.69.185, executor 0, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:25,984 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 
0.0 (TID 2) on 172.31.69.185, executor 0: 
java.nio.channels.ClosedChannelException (null) [duplicate 2]
2020-07-24 09:48:25,985 INFO scheduler.TaskSetManager: Starting task 0.3 in 
stage 0.0 (TID 3, 172.31.79.221, executor 1, partition 0, ANY, 7785 bytes)
2020-07-24 09:48:26,026 INFO scheduler.JobScheduler: Added jobs for time 
1595584106000 ms
2020-07-24 09:48:26,375 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 
in memory on 172.31.79.221:44371 (size: 4.0 KB, free: 366.3 MB)
2020-07-24 09:48:27,022 INFO scheduler.JobScheduler: Added jobs for time 
1595584107000 ms
2020-07-24 09:48:27,165 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 
0.0 (TID 3) on 172.31.79.221, executor 1: 
java.nio.channels.ClosedChannelException (null) [duplicate 3]
2020-07-24 09:48:27,167 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 
failed 4 times; aborting job
2020-07-24 09:48:27,171 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, 
whose tasks have all completed, from pool
2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
2020-07-24 09:48:27,172 INFO scheduler.TaskSchedulerImpl: Killing all running 
tasks in stage 0: Stage cancelled