Peter Mead created SPARK-20695: ---------------------------------- Summary: Running multiple TCP socket streams in Spark Shell causes driver error Key: SPARK-20695 URL: https://issues.apache.org/jira/browse/SPARK-20695 Project: Spark Issue Type: Bug Components: DStreams, Spark Core, Spark Shell, Structured Streaming Affects Versions: 2.0.2 Environment: DataStax DSE apache 3 node cassandra running with analytics on RHEL 7.3 on Hyper-V windows 10 laptop. Reporter: Peter Mead Priority: Blocker
Whenever I include a second socket stream (lines02) the script errors if I am not trying to process data. If I remove the lines02.... the script runs fine!! script: val s_server01="192.168.1.10" val s_port01 = 8801 val s_port02 = 8802 import org.apache.spark.streaming._, org.apache.spark.streaming.StreamingContext._ import scala.util.Random import org.apache.spark._ import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import java.util.Date; import java.text.SimpleDateFormat; import java.util.Calendar; import sys.process._ import org.apache.spark.streaming.dstream.ConstantInputDStream sc.setLogLevel("ERROR") val df2 = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss") var processed:Long = 0 var pdate="" case class t_row (card_number: String, event_date: Int, event_time: Int, processed: Long, transport_type: String, card_credit: java.lang.Float, transport_location: String, journey_type: Int, journey_value: java.lang.Float) var type2tot = 0 var type5tot = 0 var numb=0 var total_secs:Double = 0 val red = "\033[0;31m" val green = "\033[0;32m" val cyan = "\033[0;36m" val yellow = "\033[0;33m" val nocolour = "\033[0;0m" var color = "" val t_int = 5 var init = 0 var tot_cnt:Long = 0 val ssc = new StreamingContext(sc, Seconds(t_int)) val lines01 = ssc.socketTextStream(s_server01, s_port01) val lines02 = ssc.socketTextStream(s_server01, s_port02) // val lines = lines01.union(lines02) val line01 = lines01.foreachRDD( rdd => { println("\n------------line 01") if (init == 0) {"clear".!;init = 1} val xx=rdd.map(bb => (bb.substring(0,bb.length).split(","))) val processed = System.currentTimeMillis val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt, System.currentTimeMillis, line(3), line(4).toFloat, line(5), line(6).toInt, line(7).toFloat )) val cnt:Long = bb.count bb.saveToCassandra("transport", "card_data_input") }) //val line02 = lines02.foreachRDD( rdd => { //println("------------line 02") //if (init == 0) {"clear".!;init = 1} //val xx=rdd.map(bb => (bb.substring(0,bb.length).split(","))) //xx.collect.foreach(println) //val processed = System.currentTimeMillis //val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt, System.currentTimeMillis, line(3), line(4).toFloat, line(5), //line(6).toInt, line(7).toFloat )) //val cnt:Long = bb.count //bb.saveToCassandra("transport", "card_data_input") //}) ERROR: software.kryo.KryoException: Encountered unregistered class ID: 13994 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)...etc -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org