[jira] [Commented] (SPARK-20525) ClassCast exception when interpreting UDFs from a String in spark-shell
[ https://issues.apache.org/jira/browse/SPARK-20525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17200284#comment-17200284 ] Igor Kamyshnikov commented on SPARK-20525: -- I bet the issue is in JDK, but it could be solved in scala if they get rid of writeReplace/List$SerializationProxy. I've left some details [here|https://issues.apache.org/jira/browse/SPARK-19938?focusedCommentId=17200272&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17200272] in SPARK-19938. > ClassCast exception when interpreting UDFs from a String in spark-shell > --- > > Key: SPARK-20525 > URL: https://issues.apache.org/jira/browse/SPARK-20525 > Project: Spark > Issue Type: Bug > Components: Spark Core, Spark Shell >Affects Versions: 2.1.0 > Environment: OS X 10.11.6, spark-2.1.0-bin-hadoop2.7, Scala version > 2.11.8 (bundled w/ Spark), Java 1.8.0_121 >Reporter: Dave Knoester >Priority: Major > Labels: bulk-closed > Attachments: UdfTest.scala > > > I'm trying to interpret a string containing Scala code from inside a Spark > session. Everything is working fine, except for User Defined Function-like > things (UDFs, map, flatMap, etc). This is a blocker for production launch of > a large number of Spark jobs. > I've been able to boil the problem down to a number of spark-shell examples, > shown below. Because it's reproducible in the spark-shell, these related > issues **don't apply**: > https://issues.apache.org/jira/browse/SPARK-9219 > https://issues.apache.org/jira/browse/SPARK-18075 > https://issues.apache.org/jira/browse/SPARK-19938 > http://apache-spark-developers-list.1001551.n3.nabble.com/This-Exception-has-been-really-hard-to-trace-td19362.html > https://community.mapr.com/thread/21488-spark-error-scalacollectionseq-in-instance-of-orgapachesparkrddmappartitionsrdd > https://github.com/scala/bug/issues/9237 > Any help is appreciated! > > Repro: > Run each of the below from a spark-shell. > Preamble: > import scala.tools.nsc.GenericRunnerSettings > import scala.tools.nsc.interpreter.IMain > val settings = new GenericRunnerSettings( println _ ) > settings.usejavacp.value = true > val interpreter = new IMain(settings, new java.io.PrintWriter(System.out)) > interpreter.bind("spark", spark); > These work: > // works: > interpreter.interpret("val x = 5") > // works: > interpreter.interpret("import spark.implicits._\nval df = > spark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.show") > These do not work: > // doesn't work, fails with seq/RDD serialization error: > interpreter.interpret("import org.apache.spark.sql.functions._\nimport > spark.implicits._\nval upper: String => String = _.toUpperCase\nval upperUDF > = > udf(upper)\nspark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.withColumn(\"UPPER\", > upperUDF($\"value\")).show") > // doesn't work, fails with seq/RDD serialization error: > interpreter.interpret("import org.apache.spark.sql.functions._\nimport > spark.implicits._\nval upper: String => String = > _.toUpperCase\nspark.udf.register(\"myUpper\", > upper)\nspark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.withColumn(\"UPPER\", > callUDF(\"myUpper\", ($\"value\"))).show") > The not-working ones fail with this exception: > Caused by: java.lang.ClassCastException: cannot assign instance of > scala.collection.immutable.List$SerializationProxy to field > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type > scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) > at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80) > at org.apache.spark.scheduler.Task.run
[jira] [Comment Edited] (SPARK-19938) java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field
[ https://issues.apache.org/jira/browse/SPARK-19938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17200272#comment-17200272 ] Igor Kamyshnikov edited comment on SPARK-19938 at 9/22/20, 5:55 PM: [~rdblue], my analysis shows the different root cause of the problem: https://bugs.openjdk.java.net/browse/JDK-8024931 (never fixed) https://github.com/scala/bug/issues/9777 (asking scala to solve on their side) It's about circular references among the objects being serialized: RDD1.dependencies_ = Seq1[RDD2] RDD2.dependences_ = Seq2[RDD3] RDD3 with some Dataset/catalyst magic can refer back to the Seq1[RDD2] Seq are instances of scala.collection.immutable.List which uses writeReplace, giving an instance of 'SerializationProxy'. The serialization of RDD3 puts a reference to the Seq1's SerializationProxy. When the deserialization works, it reads that reference to SerializationProxy earlier than the 'readResolve' method is called (see the JDK bug reported). was (Author: kamyshnikov): [~rdblue], my analysis shows the different root cause of the problem: https://bugs.openjdk.java.net/browse/JDK-8024931 https://github.com/scala/bug/issues/9777 It's about circular references among the objects being serialized: RDD1.dependencies_ = Seq1[RDD2] RDD2.dependences_ = Seq2[RDD3] RDD3 with some Dataset/catalyst magic can refer back to the Seq1[RDD2] Seq are instances of scala.collection.immutable.List which uses writeReplace, giving an instance of 'SerializationProxy'. The serialization of RDD3 puts a reference to the Seq1's SerializationProxy. When the deserialization works, it reads that reference to SerializationProxy earlier than the 'readResolve' method is called (see the JDK bug reported). > java.lang.ClassCastException: cannot assign instance of > scala.collection.immutable.List$SerializationProxy to field > --- > > Key: SPARK-19938 > URL: https://issues.apache.org/jira/browse/SPARK-19938 > Project: Spark > Issue Type: Bug > Components: Tests >Affects Versions: 2.0.2 >Reporter: srinivas thallam >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19938) java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field
[ https://issues.apache.org/jira/browse/SPARK-19938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17200272#comment-17200272 ] Igor Kamyshnikov commented on SPARK-19938: -- [~rdblue], my analysis shows the different root cause of the problem: https://bugs.openjdk.java.net/browse/JDK-8024931 https://github.com/scala/bug/issues/9777 It's about circular references among the objects being serialized: RDD1.dependencies_ = Seq1[RDD2] RDD2.dependences_ = Seq2[RDD3] RDD3 with some Dataset/catalyst magic can refer back to the Seq1[RDD2] Seq are instances of scala.collection.immutable.List which uses writeReplace, giving an instance of 'SerializationProxy'. The serialization of RDD3 puts a reference to the Seq1's SerializationProxy. When the deserialization works, it reads that reference to SerializationProxy earlier than the 'readResolve' method is called (see the JDK bug reported). > java.lang.ClassCastException: cannot assign instance of > scala.collection.immutable.List$SerializationProxy to field > --- > > Key: SPARK-19938 > URL: https://issues.apache.org/jira/browse/SPARK-19938 > Project: Spark > Issue Type: Bug > Components: Tests >Affects Versions: 2.0.2 >Reporter: srinivas thallam >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27967) Fault tolerance broken: Race conditions: a supervised Driver is not relaunched and completely removed sometimes under Standalone cluster when Worker gracefully shuts dow
[ https://issues.apache.org/jira/browse/SPARK-27967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Kamyshnikov updated SPARK-27967: - Description: Synthetic test: 1) run ZK 2) run Master 3) run Worker with remote debugging agent (required for enabling a breakpoint to demonstrate race conditions issue) 4) submit a long running Driver with --supervise flag 5) connect to Worker via remote debugger 6) enable a breakpoint in the method: org.apache.spark.deploy.worker.DriverRunner#kill {code:java} /** Terminate this driver (or prevent it from ever starting if not yet started) */ private[worker] def kill(): Unit = { logInfo("Killing driver process!") killed = true synchronized { process.foreach { p => val exitCode = Utils.terminateProcess(p, DRIVER_TERMINATE_TIMEOUT_MS) if (exitCode.isEmpty) { //< BREAKPOINT << logWarning("Failed to terminate driver process: " + p + ". This process will likely be orphaned.") } } } } {code} 7) send SIGTERM to Worker (or CTRL+C in Windows) 8) check Spark Master Web UI: the Driver will appear in the *Completed Drivers* section with the state equal to *KILLED* If there was no breakpoint then it is more likely that a new row with *RELAUNCHING* state would appear in the *Completed Drivers* section and a row with *SUBMITTED* state would remain in the *Running Drivers* section. Explanation: 1) Spark master relaunches a driver in response to "channelInactive" callback: org.apache.spark.rpc.netty.NettyRpcHandler#channelInactive which is triggered when the Worker process finishes. 2) DriverRunner registers a shutdown hook here: org.apache.spark.deploy.worker.DriverRunner#start which calls the aforementioned "kill" method. Killing a driver can lead to reaching the following lines in the DriverRunner.start method: {noformat} // notify worker of final driver state, possible exception worker.send(DriverStateChanged(driverId, finalState.get, finalException)) {noformat} If this notification reaches Master then Driver is removed from the cluster as KILLED. Real-world scenario (ver. 2.1.2): ZK, two Masters, the Active one loses its leadership, another becomes a new leader. Workers attempt to re-register with new Master. But they report that they failed to do this. They execute *System.exit(1)* from org.apache.spark.deploy.worker.Worker#registerWithMaster. This System.exit results in executing shutdown hooks. And somehow DriverStateChanged message reaches the new master. *Worker* logs: {noformat} 19/06/03 14:05:30 INFO Worker: Retrying connection to master (attempt # 5) 19/06/03 14:05:30 INFO Worker: Connecting to master 10.0.0.16:7077... 19/06/03 14:05:33 INFO Worker: Master has changed, new master is at spark://10.0.0.17:7077 19/06/03 14:05:33 ERROR TransportResponseHandler: Still have 4 requests outstanding when connection from /10.0.0.16:7077 is closed 19/06/03 14:05:33 ERROR Worker: Cannot register with master: 10.0.0.16:7077 java.io.IOException: Connection from /10.0.0.16:7077 closed at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128) at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:108) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at org.apache.spark.network.util.TransportFrameDe
[jira] [Created] (SPARK-27967) Fault tolerance broken: Race conditions: a supervised Driver is not relaunched and completely removed sometimes under Standalone cluster when Worker gracefully shuts dow
Igor Kamyshnikov created SPARK-27967: Summary: Fault tolerance broken: Race conditions: a supervised Driver is not relaunched and completely removed sometimes under Standalone cluster when Worker gracefully shuts down Key: SPARK-27967 URL: https://issues.apache.org/jira/browse/SPARK-27967 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.2, 2.1.2 Reporter: Igor Kamyshnikov Synthetic test: 1) run ZK 2) run Master 3) run Worker with remote debugging agent (required for enabling a breakpoint to demonstrate race conditions issue) 4) submit a long running Driver with --supervise flag 5) connect to Worker via remote debugger 6) enable a breakpoint in the method: org.apache.spark.deploy.worker.DriverRunner#kill {code:java} /** Terminate this driver (or prevent it from ever starting if not yet started) */ private[worker] def kill(): Unit = { logInfo("Killing driver process!") killed = true synchronized { process.foreach { p => val exitCode = Utils.terminateProcess(p, DRIVER_TERMINATE_TIMEOUT_MS) if (exitCode.isEmpty) { //< BREAKPOINT << logWarning("Failed to terminate driver process: " + p + ". This process will likely be orphaned.") } } } } {code} 7) send SIGTERM to Worker (or CTRL+C in Windows) 8) check Spark Master Web UI: the Driver will appear in the *Completed Drivers* section with the state equal to *KILLED* If there was no breakpoint then it is more likely that a new row with *RELAUNCHING* state would appear in the *Completed Drivers* section and a row with *SUBMITTED* state would remain in the *Running Drivers* section. Explanation: 1) Spark master relaunches a driver in response to "channelInactive" callback: org.apache.spark.rpc.netty.NettyRpcHandler#channelInactive which is triggered when the Worker process finishes. 2) DriverRunner registers a shutdown hook here: org.apache.spark.deploy.worker.DriverRunner#start which calls the aforementioned "kill" method. Killing a driver can lead to reaching the following lines in the DriverRunner.start method: {noformat} // notify worker of final driver state, possible exception worker.send(DriverStateChanged(driverId, finalState.get, finalException)) {noformat} If this notification reaches Master then Driver is removed from the cluster as KILLED. Real-world scenario (ver. 2.1.2): ZK, two Masters, the Active one loses its leadership, another becomes a new leader. Workers attempt to re-register with new master. But the report they failed to do this. They execute *System.exit(1)* from org.apache.spark.deploy.worker.Worker#registerWithMaster. This System.exit results in executing shutdown hooks. And somehow DriverStateChanged message reaches the new master. *Worker* logs: {noformat} 19/06/03 14:05:30 INFO Worker: Retrying connection to master (attempt # 5) 19/06/03 14:05:30 INFO Worker: Connecting to master 10.0.0.16:7077... 19/06/03 14:05:33 INFO Worker: Master has changed, new master is at spark://10.0.0.17:7077 19/06/03 14:05:33 ERROR TransportResponseHandler: Still have 4 requests outstanding when connection from /10.0.0.16:7077 is closed 19/06/03 14:05:33 ERROR Worker: Cannot register with master: 10.0.0.16:7077 java.io.IOException: Connection from /10.0.0.16:7077 closed at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128) at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:108) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannel