[jira] [Updated] (SPARK-20695) Running multiple TCP socket streams in Spark Shell causes driver error
[ https://issues.apache.org/jira/browse/SPARK-20695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Mead updated SPARK-20695: --- Do you guys ever read the issues?This is a simple spark shell script (dse -u -p yyy spark) which works perfectly fine if I only have a single text socket stream BUT fails immediately as soon as I intoduce a second socket text stream even if I never reference it. As for registering classes I have no idea what class 13994 is!!Not very helpfull! Original message From: "Sean Owen (JIRA)"Date: 10/05/2017 15:40 (GMT+01:00) To: pjm...@blueyonder.co.uk Subject: [jira] [Resolved] (SPARK-20695) Running multiple TCP socket streams in Spark Shell causes driver error [ https://issues.apache.org/jira/browse/SPARK-20695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-20695. --- Resolution: Invalid I don't believe that's anything to do with TCP; you are enabling Kryo registration but didn't register some class you are serializing. This is a question about debugging your app and shouldn't be a Spark JIRA. You need to read http://spark.apache.org/contributing.html too; you would never set Blocker for example. -- This message was sent by Atlassian JIRA (v6.3.15#6346) > Running multiple TCP socket streams in Spark Shell causes driver error > -- > > Key: SPARK-20695 > URL: https://issues.apache.org/jira/browse/SPARK-20695 > Project: Spark > Issue Type: Bug > Components: DStreams, Spark Core, Spark Shell, Structured Streaming >Affects Versions: 2.0.2 > Environment: DataStax DSE apache 3 node cassandra running with > analytics on RHEL 7.3 on Hyper-V windows 10 laptop. >Reporter: Peter Mead >Priority: Blocker > > Whenever I include a second socket stream (lines02) the script errors if I am > not trying to process data. If I remove the lines02 the script runs fine!! > script: > val s_server01="192.168.1.10" > val s_port01 = 8801 > val s_port02 = 8802 > import org.apache.spark.streaming._, > org.apache.spark.streaming.StreamingContext._ > import scala.util.Random > import org.apache.spark._ > import org.apache.spark.storage._ > import org.apache.spark.streaming.receiver._ > import java.util.Date; > import java.text.SimpleDateFormat; > import java.util.Calendar; > import sys.process._ > import org.apache.spark.streaming.dstream.ConstantInputDStream > sc.setLogLevel("ERROR") > val df2 = new SimpleDateFormat("dd-MM- HH:mm:ss") > var processed:Long = 0 > var pdate="" > case class t_row (card_number: String, event_date: Int, event_time: Int, > processed: Long, transport_type: String, card_credit: java.lang.Float, > transport_location: String, journey_type: Int, journey_value: > java.lang.Float) > var type2tot = 0 > var type5tot = 0 > var numb=0 > var total_secs:Double = 0 > val red= "\033[0;31m" > val green = "\033[0;32m" > val cyan = "\033[0;36m" > val yellow = "\033[0;33m" > val nocolour = "\033[0;0m" > var color = "" > val t_int = 5 > var init = 0 > var tot_cnt:Long = 0 > val ssc = new StreamingContext(sc, Seconds(t_int)) > val lines01 = ssc.socketTextStream(s_server01, s_port01) > val lines02 = ssc.socketTextStream(s_server01, s_port02) > // val lines = lines01.union(lines02) > val line01 = lines01.foreachRDD( rdd => { > println("\nline 01") > if (init == 0) {"clear".!;init = 1} > val xx=rdd.map(bb => (bb.substring(0,bb.length).split(","))) > val processed = System.currentTimeMillis > val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt, > System.currentTimeMillis, line(3), line(4).toFloat, line(5), > line(6).toInt, line(7).toFloat )) > val cnt:Long = bb.count > bb.saveToCassandra("transport", "card_data_input") > }) > //val line02 = lines02.foreachRDD( rdd => { > //println("line 02") > //if (init == 0) {"clear".!;init = 1} > //val xx=rdd.map(bb => (bb.substring(0,bb.length).split(","))) > //xx.collect.foreach(println) > //val processed = System.currentTimeMillis > //val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt, > System.currentTimeMillis, line(3), line(4).toFloat, line(5), > //line(6).toInt, line(7).toFloat )) > //val cnt:Long = bb.count > //bb.saveToCassandra("transport", "card_data_input") > //}) > ERROR: > software.kryo.KryoException: Encountered unregistered class ID: 13994 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) >
[jira] [Created] (SPARK-20695) Running multiple TCP socket streams in Spark Shell causes driver error
Peter Mead created SPARK-20695: -- Summary: Running multiple TCP socket streams in Spark Shell causes driver error Key: SPARK-20695 URL: https://issues.apache.org/jira/browse/SPARK-20695 Project: Spark Issue Type: Bug Components: DStreams, Spark Core, Spark Shell, Structured Streaming Affects Versions: 2.0.2 Environment: DataStax DSE apache 3 node cassandra running with analytics on RHEL 7.3 on Hyper-V windows 10 laptop. Reporter: Peter Mead Priority: Blocker Whenever I include a second socket stream (lines02) the script errors if I am not trying to process data. If I remove the lines02 the script runs fine!! script: val s_server01="192.168.1.10" val s_port01 = 8801 val s_port02 = 8802 import org.apache.spark.streaming._, org.apache.spark.streaming.StreamingContext._ import scala.util.Random import org.apache.spark._ import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import java.util.Date; import java.text.SimpleDateFormat; import java.util.Calendar; import sys.process._ import org.apache.spark.streaming.dstream.ConstantInputDStream sc.setLogLevel("ERROR") val df2 = new SimpleDateFormat("dd-MM- HH:mm:ss") var processed:Long = 0 var pdate="" case class t_row (card_number: String, event_date: Int, event_time: Int, processed: Long, transport_type: String, card_credit: java.lang.Float, transport_location: String, journey_type: Int, journey_value: java.lang.Float) var type2tot = 0 var type5tot = 0 var numb=0 var total_secs:Double = 0 val red= "\033[0;31m" val green = "\033[0;32m" val cyan = "\033[0;36m" val yellow = "\033[0;33m" val nocolour = "\033[0;0m" var color = "" val t_int = 5 var init = 0 var tot_cnt:Long = 0 val ssc = new StreamingContext(sc, Seconds(t_int)) val lines01 = ssc.socketTextStream(s_server01, s_port01) val lines02 = ssc.socketTextStream(s_server01, s_port02) // val lines = lines01.union(lines02) val line01 = lines01.foreachRDD( rdd => { println("\nline 01") if (init == 0) {"clear".!;init = 1} val xx=rdd.map(bb => (bb.substring(0,bb.length).split(","))) val processed = System.currentTimeMillis val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt, System.currentTimeMillis, line(3), line(4).toFloat, line(5), line(6).toInt, line(7).toFloat )) val cnt:Long = bb.count bb.saveToCassandra("transport", "card_data_input") }) //val line02 = lines02.foreachRDD( rdd => { //println("line 02") //if (init == 0) {"clear".!;init = 1} //val xx=rdd.map(bb => (bb.substring(0,bb.length).split(","))) //xx.collect.foreach(println) //val processed = System.currentTimeMillis //val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt, System.currentTimeMillis, line(3), line(4).toFloat, line(5), //line(6).toInt, line(7).toFloat )) //val cnt:Long = bb.count //bb.saveToCassandra("transport", "card_data_input") //}) ERROR: software.kryo.KryoException: Encountered unregistered class ID: 13994 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)...etc -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-20252) java.lang.ClassNotFoundException: $line22.$read$$iwC$$iwC$movie_row
[ https://issues.apache.org/jira/browse/SPARK-20252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Mead reopened SPARK-20252: I'm Not sure how this explains how it works the first (and every) time if the spark context is not changed? There must be a discrepancy in the way that DSE creates the spark context the first time through and the way I create it after sc.stop? > java.lang.ClassNotFoundException: $line22.$read$$iwC$$iwC$movie_row > --- > > Key: SPARK-20252 > URL: https://issues.apache.org/jira/browse/SPARK-20252 > Project: Spark > Issue Type: Bug > Components: Spark Shell >Affects Versions: 1.6.3 > Environment: Datastax DSE dual node SPARK cluster > [cqlsh 5.0.1 | Cassandra 3.0.12.1586 | DSE 5.0.7 | CQL spec 3.4.0 | Native > protocol v4] >Reporter: Peter Mead > > After starting a spark shell using DSE -u -p x spark > scala> case class movie_row (actor: String, character_name: String, video_id: > java.util.UUID, video_year: Int, title: String) > defined class movie_row > scala> val vids=sc.cassandraTable("hcl","videos_by_actor") > vids: > com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] > = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15 > scala> val vids=sc.cassandraTable[movie_row]("hcl","videos_by_actor") > vids: com.datastax.spark.connector.rdd.CassandraTableScanRDD[movie_row] = > CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:15 > scala> vids.count > res0: Long = 114961 > Works OK!! > BUT if the spark context is stopped and recreated THEN: > scala> sc.stop() > scala> import org.apache.spark.SparkContext, org.apache.spark.SparkContext._, > org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.SparkContext._ > import org.apache.spark.SparkConf > scala> :paste > // Entering paste mode (ctrl-D to finish) > val conf = new SparkConf(true) > .set("spark.cassandra.connection.host", "redacted") > .set("spark.cassandra.auth.username", "redacted") > .set("spark.cassandra.auth.password", "redacted") > // Exiting paste mode, now interpreting. > conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@e207342 > scala> val sc = new SparkContext("spark://192.168.1.84:7077", "pjm", conf) > sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@12b8b7c8 > scala> case class movie_row (actor: String, character_name: String, video_id: > java.util.UUID, video_year: Int, title: String) > defined class movie_row > scala> val vids=sc.cassandraTable[movie_row]("hcl","videos_by_actor") > vids: com.datastax.spark.connector.rdd.CassandraTableScanRDD[movie_row] = > CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15 > scala> vids.count > [Stage 0:> (0 + 2) / > 2]WARN 2017-04-07 12:52:03,277 org.apache.spark.scheduler.TaskSetManager: > Lost task 0.0 in stage 0.0 (TID 0, cassandra183): > java.lang.ClassNotFoundException: $line22.$read$$iwC$$iwC$movie_row > FAILS!! > I have been unable to get this to work from a remote SPARK shell! -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20252) java.lang.ClassNotFoundException: $line22.$read$$iwC$$iwC$movie_row
[ https://issues.apache.org/jira/browse/SPARK-20252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960762#comment-15960762 ] Peter Mead commented on SPARK-20252: I'm Not sure how this explains how it work the first (and every) time if the spark context is not changed? There must be a discrepancy in the way that DSE creates the spark context the first time through and the way I create it after sc.stop? > java.lang.ClassNotFoundException: $line22.$read$$iwC$$iwC$movie_row > --- > > Key: SPARK-20252 > URL: https://issues.apache.org/jira/browse/SPARK-20252 > Project: Spark > Issue Type: Bug > Components: Spark Shell >Affects Versions: 1.6.3 > Environment: Datastax DSE dual node SPARK cluster > [cqlsh 5.0.1 | Cassandra 3.0.12.1586 | DSE 5.0.7 | CQL spec 3.4.0 | Native > protocol v4] >Reporter: Peter Mead > > After starting a spark shell using DSE -u -p x spark > scala> case class movie_row (actor: String, character_name: String, video_id: > java.util.UUID, video_year: Int, title: String) > defined class movie_row > scala> val vids=sc.cassandraTable("hcl","videos_by_actor") > vids: > com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] > = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15 > scala> val vids=sc.cassandraTable[movie_row]("hcl","videos_by_actor") > vids: com.datastax.spark.connector.rdd.CassandraTableScanRDD[movie_row] = > CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:15 > scala> vids.count > res0: Long = 114961 > Works OK!! > BUT if the spark context is stopped and recreated THEN: > scala> sc.stop() > scala> import org.apache.spark.SparkContext, org.apache.spark.SparkContext._, > org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.SparkContext._ > import org.apache.spark.SparkConf > scala> :paste > // Entering paste mode (ctrl-D to finish) > val conf = new SparkConf(true) > .set("spark.cassandra.connection.host", "redacted") > .set("spark.cassandra.auth.username", "redacted") > .set("spark.cassandra.auth.password", "redacted") > // Exiting paste mode, now interpreting. > conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@e207342 > scala> val sc = new SparkContext("spark://192.168.1.84:7077", "pjm", conf) > sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@12b8b7c8 > scala> case class movie_row (actor: String, character_name: String, video_id: > java.util.UUID, video_year: Int, title: String) > defined class movie_row > scala> val vids=sc.cassandraTable[movie_row]("hcl","videos_by_actor") > vids: com.datastax.spark.connector.rdd.CassandraTableScanRDD[movie_row] = > CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15 > scala> vids.count > [Stage 0:> (0 + 2) / > 2]WARN 2017-04-07 12:52:03,277 org.apache.spark.scheduler.TaskSetManager: > Lost task 0.0 in stage 0.0 (TID 0, cassandra183): > java.lang.ClassNotFoundException: $line22.$read$$iwC$$iwC$movie_row > FAILS!! > I have been unable to get this to work from a remote SPARK shell! -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20252) java.lang.ClassNotFoundException: $line22.$read$$iwC$$iwC$movie_row
Peter Mead created SPARK-20252: -- Summary: java.lang.ClassNotFoundException: $line22.$read$$iwC$$iwC$movie_row Key: SPARK-20252 URL: https://issues.apache.org/jira/browse/SPARK-20252 Project: Spark Issue Type: Bug Components: Spark Shell Affects Versions: 1.6.3 Environment: Datastax DSE dual node SPARK cluster [cqlsh 5.0.1 | Cassandra 3.0.12.1586 | DSE 5.0.7 | CQL spec 3.4.0 | Native protocol v4] Reporter: Peter Mead After starting a spark shell using DSE -u -p x spark scala> case class movie_row (actor: String, character_name: String, video_id: java.util.UUID, video_year: Int, title: String) defined class movie_row scala> val vids=sc.cassandraTable("hcl","videos_by_actor") vids: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15 scala> val vids=sc.cassandraTable[movie_row]("hcl","videos_by_actor") vids: com.datastax.spark.connector.rdd.CassandraTableScanRDD[movie_row] = CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:15 scala> vids.count res0: Long = 114961 Works OK!! BUT if the spark context is stopped and recreated THEN: scala> sc.stop() scala> import org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf scala> :paste // Entering paste mode (ctrl-D to finish) val conf = new SparkConf(true) .set("spark.cassandra.connection.host", "redacted") .set("spark.cassandra.auth.username", "redacted") .set("spark.cassandra.auth.password", "redacted") // Exiting paste mode, now interpreting. conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@e207342 scala> val sc = new SparkContext("spark://192.168.1.84:7077", "pjm", conf) sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@12b8b7c8 scala> case class movie_row (actor: String, character_name: String, video_id: java.util.UUID, video_year: Int, title: String) defined class movie_row scala> val vids=sc.cassandraTable[movie_row]("hcl","videos_by_actor") vids: com.datastax.spark.connector.rdd.CassandraTableScanRDD[movie_row] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15 scala> vids.count [Stage 0:> (0 + 2) / 2]WARN 2017-04-07 12:52:03,277 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, cassandra183): java.lang.ClassNotFoundException: $line22.$read$$iwC$$iwC$movie_row FAILS!! I have been unable to get this to work from a remote SPARK shell! -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org