[ 
https://issues.apache.org/jira/browse/SPARK-20695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen closed SPARK-20695.
-----------------------------

> Running multiple TCP socket streams in Spark Shell causes driver error
> ----------------------------------------------------------------------
>
>                 Key: SPARK-20695
>                 URL: https://issues.apache.org/jira/browse/SPARK-20695
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Spark Core, Spark Shell, Structured Streaming
>    Affects Versions: 2.0.2
>         Environment: DataStax DSE apache 3 node cassandra running with 
> analytics on RHEL 7.3 on Hyper-V windows 10 laptop.
>            Reporter: Peter Mead
>            Priority: Blocker
>
> Whenever I include a second socket stream (lines02) the script errors if I am 
> not trying to process data. If I remove the lines02.... the script runs fine!!
> script:
> val s_server01="192.168.1.10"
> val s_port01  = 8801
> val s_port02  = 8802
> import org.apache.spark.streaming._, 
> org.apache.spark.streaming.StreamingContext._
> import scala.util.Random
> import org.apache.spark._
> import org.apache.spark.storage._
> import org.apache.spark.streaming.receiver._
> import java.util.Date;
> import java.text.SimpleDateFormat;
> import java.util.Calendar;
> import sys.process._
> import org.apache.spark.streaming.dstream.ConstantInputDStream
> sc.setLogLevel("ERROR")
> val df2 = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss")
> var processed:Long = 0
> var pdate=""
> case class t_row (card_number: String, event_date: Int, event_time: Int, 
> processed: Long, transport_type: String, card_credit: java.lang.Float, 
> transport_location: String, journey_type: Int,  journey_value: 
> java.lang.Float)
> var type2tot = 0
> var type5tot = 0
> var numb=0
> var total_secs:Double = 0
> val red    = "\033[0;31m"
> val green  = "\033[0;32m"
> val cyan   = "\033[0;36m"
> val yellow = "\033[0;33m"
> val nocolour = "\033[0;0m"
> var color = ""
> val t_int = 5
> var init = 0
> var tot_cnt:Long = 0
> val ssc = new StreamingContext(sc, Seconds(t_int))
> val lines01 = ssc.socketTextStream(s_server01, s_port01)
> val lines02 = ssc.socketTextStream(s_server01, s_port02)
> // val lines   = lines01.union(lines02)
> val line01 = lines01.foreachRDD( rdd => {
> println("\n------------line 01")
> if (init == 0) {"clear".!;init = 1}
> val xx=rdd.map(bb => (bb.substring(0,bb.length).split(",")))
> val processed = System.currentTimeMillis
> val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt, 
> System.currentTimeMillis, line(3), line(4).toFloat, line(5), 
> line(6).toInt, line(7).toFloat ))
> val cnt:Long = bb.count
> bb.saveToCassandra("transport", "card_data_input")
> })
> //val line02 = lines02.foreachRDD( rdd => {
> //println("------------line 02")
> //if (init == 0) {"clear".!;init = 1}
> //val xx=rdd.map(bb => (bb.substring(0,bb.length).split(",")))
> //xx.collect.foreach(println)
> //val processed = System.currentTimeMillis
> //val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt, 
> System.currentTimeMillis, line(3), line(4).toFloat, line(5), 
> //line(6).toInt, line(7).toFloat ))
> //val cnt:Long = bb.count
> //bb.saveToCassandra("transport", "card_data_input")
> //})
> ERROR:
> software.kryo.KryoException: Encountered unregistered class ID: 13994
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>         at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>         at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>         at 
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)...etc



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to