Peter Mead created SPARK-20695:
----------------------------------

             Summary: Running multiple TCP socket streams in Spark Shell causes 
driver error
                 Key: SPARK-20695
                 URL: https://issues.apache.org/jira/browse/SPARK-20695
             Project: Spark
          Issue Type: Bug
          Components: DStreams, Spark Core, Spark Shell, Structured Streaming
    Affects Versions: 2.0.2
         Environment: DataStax DSE apache 3 node cassandra running with 
analytics on RHEL 7.3 on Hyper-V windows 10 laptop.
            Reporter: Peter Mead
            Priority: Blocker


Whenever I include a second socket stream (lines02) the script errors if I am 
not trying to process data. If I remove the lines02.... the script runs fine!!
script:
val s_server01="192.168.1.10"
val s_port01  = 8801
val s_port02  = 8802
import org.apache.spark.streaming._, 
org.apache.spark.streaming.StreamingContext._
import scala.util.Random
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import java.util.Date;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import sys.process._
import org.apache.spark.streaming.dstream.ConstantInputDStream
sc.setLogLevel("ERROR")
val df2 = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss")
var processed:Long = 0
var pdate=""
case class t_row (card_number: String, event_date: Int, event_time: Int, 
processed: Long, transport_type: String, card_credit: java.lang.Float, 
transport_location: String, journey_type: Int,  journey_value: java.lang.Float)
var type2tot = 0
var type5tot = 0
var numb=0
var total_secs:Double = 0
val red    = "\033[0;31m"
val green  = "\033[0;32m"
val cyan   = "\033[0;36m"
val yellow = "\033[0;33m"
val nocolour = "\033[0;0m"
var color = ""
val t_int = 5
var init = 0
var tot_cnt:Long = 0
val ssc = new StreamingContext(sc, Seconds(t_int))
val lines01 = ssc.socketTextStream(s_server01, s_port01)
val lines02 = ssc.socketTextStream(s_server01, s_port02)
// val lines   = lines01.union(lines02)

val line01 = lines01.foreachRDD( rdd => {
println("\n------------line 01")
if (init == 0) {"clear".!;init = 1}
val xx=rdd.map(bb => (bb.substring(0,bb.length).split(",")))
val processed = System.currentTimeMillis
val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt, 
System.currentTimeMillis, line(3), line(4).toFloat, line(5), 
line(6).toInt, line(7).toFloat ))
val cnt:Long = bb.count
bb.saveToCassandra("transport", "card_data_input")
})

//val line02 = lines02.foreachRDD( rdd => {
//println("------------line 02")
//if (init == 0) {"clear".!;init = 1}
//val xx=rdd.map(bb => (bb.substring(0,bb.length).split(",")))
//xx.collect.foreach(println)
//val processed = System.currentTimeMillis
//val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt, 
System.currentTimeMillis, line(3), line(4).toFloat, line(5), 
//line(6).toInt, line(7).toFloat ))
//val cnt:Long = bb.count
//bb.saveToCassandra("transport", "card_data_input")
//})

ERROR:
software.kryo.KryoException: Encountered unregistered class ID: 13994
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
        at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
        at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)...etc




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to