[jira] [Commented] (SPARK-20525) ClassCast exception when interpreting UDFs from a String in spark-shell

2020-09-22 Thread Igor Kamyshnikov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-20525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17200284#comment-17200284
 ] 

Igor Kamyshnikov commented on SPARK-20525:
--

I bet the issue is in JDK, but it could be solved in scala if they get rid of 
writeReplace/List$SerializationProxy. I've left some details 
[here|https://issues.apache.org/jira/browse/SPARK-19938?focusedCommentId=17200272&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17200272]
 in SPARK-19938.

> ClassCast exception when interpreting UDFs from a String in spark-shell
> ---
>
> Key: SPARK-20525
> URL: https://issues.apache.org/jira/browse/SPARK-20525
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Spark Shell
>Affects Versions: 2.1.0
> Environment: OS X 10.11.6, spark-2.1.0-bin-hadoop2.7, Scala version 
> 2.11.8 (bundled w/ Spark), Java 1.8.0_121
>Reporter: Dave Knoester
>Priority: Major
>  Labels: bulk-closed
> Attachments: UdfTest.scala
>
>
> I'm trying to interpret a string containing Scala code from inside a Spark 
> session. Everything is working fine, except for User Defined Function-like 
> things (UDFs, map, flatMap, etc).  This is a blocker for production launch of 
> a large number of Spark jobs.
> I've been able to boil the problem down to a number of spark-shell examples, 
> shown below.  Because it's reproducible in the spark-shell, these related 
> issues **don't apply**:
> https://issues.apache.org/jira/browse/SPARK-9219
> https://issues.apache.org/jira/browse/SPARK-18075
> https://issues.apache.org/jira/browse/SPARK-19938
> http://apache-spark-developers-list.1001551.n3.nabble.com/This-Exception-has-been-really-hard-to-trace-td19362.html
> https://community.mapr.com/thread/21488-spark-error-scalacollectionseq-in-instance-of-orgapachesparkrddmappartitionsrdd
> https://github.com/scala/bug/issues/9237
> Any help is appreciated!
> 
> Repro: 
> Run each of the below from a spark-shell.  
> Preamble:
> import scala.tools.nsc.GenericRunnerSettings
> import scala.tools.nsc.interpreter.IMain
> val settings = new GenericRunnerSettings( println _ )
> settings.usejavacp.value = true
> val interpreter = new IMain(settings, new java.io.PrintWriter(System.out))
> interpreter.bind("spark", spark);
> These work:
> // works:
> interpreter.interpret("val x = 5")
> // works:
> interpreter.interpret("import spark.implicits._\nval df = 
> spark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.show")
> These do not work:
> // doesn't work, fails with seq/RDD serialization error:
> interpreter.interpret("import org.apache.spark.sql.functions._\nimport 
> spark.implicits._\nval upper: String => String = _.toUpperCase\nval upperUDF 
> = 
> udf(upper)\nspark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.withColumn(\"UPPER\",
>  upperUDF($\"value\")).show")
> // doesn't work, fails with seq/RDD serialization error:
> interpreter.interpret("import org.apache.spark.sql.functions._\nimport 
> spark.implicits._\nval upper: String => String = 
> _.toUpperCase\nspark.udf.register(\"myUpper\", 
> upper)\nspark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.withColumn(\"UPPER\",
>  callUDF(\"myUpper\", ($\"value\"))).show")
> The not-working ones fail with this exception:
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> scala.collection.immutable.List$SerializationProxy to field 
> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
> scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
>   at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
>   at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
>   at org.apache.spark.scheduler.Task.run

[jira] [Comment Edited] (SPARK-19938) java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field

2020-09-22 Thread Igor Kamyshnikov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-19938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17200272#comment-17200272
 ] 

Igor Kamyshnikov edited comment on SPARK-19938 at 9/22/20, 5:55 PM:


[~rdblue], my analysis shows the different root cause of the problem:
https://bugs.openjdk.java.net/browse/JDK-8024931 (never fixed)
https://github.com/scala/bug/issues/9777 (asking scala to solve on their side)

It's about circular references among the objects being serialized:

RDD1.dependencies_ = Seq1[RDD2]
RDD2.dependences_ = Seq2[RDD3]
RDD3 with some Dataset/catalyst magic can refer back to the Seq1[RDD2]

Seq are instances of scala.collection.immutable.List which uses writeReplace, 
giving an instance of 'SerializationProxy'. The serialization of RDD3 puts a 
reference to the Seq1's SerializationProxy. When the deserialization works, it 
reads that reference to SerializationProxy earlier than the 'readResolve' 
method is called (see the JDK bug reported).


was (Author: kamyshnikov):
[~rdblue], my analysis shows the different root cause of the problem:
https://bugs.openjdk.java.net/browse/JDK-8024931
https://github.com/scala/bug/issues/9777

It's about circular references among the objects being serialized:

RDD1.dependencies_ = Seq1[RDD2]
RDD2.dependences_ = Seq2[RDD3]
RDD3 with some Dataset/catalyst magic can refer back to the Seq1[RDD2]

Seq are instances of scala.collection.immutable.List which uses writeReplace, 
giving an instance of 'SerializationProxy'. The serialization of RDD3 puts a 
reference to the Seq1's SerializationProxy. When the deserialization works, it 
reads that reference to SerializationProxy earlier than the 'readResolve' 
method is called (see the JDK bug reported).

> java.lang.ClassCastException: cannot assign instance of 
> scala.collection.immutable.List$SerializationProxy to field
> ---
>
> Key: SPARK-19938
> URL: https://issues.apache.org/jira/browse/SPARK-19938
> Project: Spark
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 2.0.2
>Reporter: srinivas thallam
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19938) java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field

2020-09-22 Thread Igor Kamyshnikov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-19938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17200272#comment-17200272
 ] 

Igor Kamyshnikov commented on SPARK-19938:
--

[~rdblue], my analysis shows the different root cause of the problem:
https://bugs.openjdk.java.net/browse/JDK-8024931
https://github.com/scala/bug/issues/9777

It's about circular references among the objects being serialized:

RDD1.dependencies_ = Seq1[RDD2]
RDD2.dependences_ = Seq2[RDD3]
RDD3 with some Dataset/catalyst magic can refer back to the Seq1[RDD2]

Seq are instances of scala.collection.immutable.List which uses writeReplace, 
giving an instance of 'SerializationProxy'. The serialization of RDD3 puts a 
reference to the Seq1's SerializationProxy. When the deserialization works, it 
reads that reference to SerializationProxy earlier than the 'readResolve' 
method is called (see the JDK bug reported).

> java.lang.ClassCastException: cannot assign instance of 
> scala.collection.immutable.List$SerializationProxy to field
> ---
>
> Key: SPARK-19938
> URL: https://issues.apache.org/jira/browse/SPARK-19938
> Project: Spark
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 2.0.2
>Reporter: srinivas thallam
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27967) Fault tolerance broken: Race conditions: a supervised Driver is not relaunched and completely removed sometimes under Standalone cluster when Worker gracefully shuts dow

2019-06-06 Thread Igor Kamyshnikov (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Kamyshnikov updated SPARK-27967:
-
Description: 
Synthetic test:
 1) run ZK
 2) run Master
 3) run Worker with remote debugging agent (required for enabling a breakpoint 
to demonstrate race conditions issue)
 4) submit a long running Driver with --supervise flag
 5) connect to Worker via remote debugger
 6) enable a breakpoint in the method:
 org.apache.spark.deploy.worker.DriverRunner#kill
{code:java}
  /** Terminate this driver (or prevent it from ever starting if not yet 
started) */
  private[worker] def kill(): Unit = {
logInfo("Killing driver process!")
killed = true
synchronized {
  process.foreach { p =>
val exitCode = Utils.terminateProcess(p, DRIVER_TERMINATE_TIMEOUT_MS)
if (exitCode.isEmpty) { //< BREAKPOINT <<
  logWarning("Failed to terminate driver process: " + p +
  ". This process will likely be orphaned.")
}
  }
}
  }
{code}
7) send SIGTERM to Worker (or CTRL+C in Windows)
 8) check Spark Master Web UI: the Driver will appear in the *Completed 
Drivers* section with the state equal to *KILLED*

If there was no breakpoint then it is more likely that a new row with 
*RELAUNCHING* state would appear in the *Completed Drivers* section and a row 
with *SUBMITTED* state would remain in the *Running Drivers* section.

Explanation:
 1) Spark master relaunches a driver in response to "channelInactive" callback: 
org.apache.spark.rpc.netty.NettyRpcHandler#channelInactive
 which is triggered when the Worker process finishes.
 2) DriverRunner registers a shutdown hook here: 
org.apache.spark.deploy.worker.DriverRunner#start which calls the 
aforementioned "kill" method. Killing a driver can lead to reaching the 
following lines in the DriverRunner.start method:
{noformat}
// notify worker of final driver state, possible exception
worker.send(DriverStateChanged(driverId, finalState.get, 
finalException))
{noformat}
If this notification reaches Master then Driver is removed from the cluster as 
KILLED.

Real-world scenario (ver. 2.1.2):
 ZK, two Masters, the Active one loses its leadership, another becomes a new 
leader.
 Workers attempt to re-register with new Master. But they report that they 
failed to do this. They execute *System.exit(1)* from 
org.apache.spark.deploy.worker.Worker#registerWithMaster.
 This System.exit results in executing shutdown hooks. And somehow 
DriverStateChanged message reaches the new master.

 

*Worker* logs:
{noformat}
19/06/03 14:05:30 INFO Worker: Retrying connection to master (attempt # 5)
19/06/03 14:05:30 INFO Worker: Connecting to master 10.0.0.16:7077...
19/06/03 14:05:33 INFO Worker: Master has changed, new master is at 
spark://10.0.0.17:7077
19/06/03 14:05:33 ERROR TransportResponseHandler: Still have 4 requests 
outstanding when connection from /10.0.0.16:7077 is closed
19/06/03 14:05:33 ERROR Worker: Cannot register with master: 10.0.0.16:7077
java.io.IOException: Connection from /10.0.0.16:7077 closed
at 
org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128)
at 
org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:108)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
org.apache.spark.network.util.TransportFrameDe

[jira] [Created] (SPARK-27967) Fault tolerance broken: Race conditions: a supervised Driver is not relaunched and completely removed sometimes under Standalone cluster when Worker gracefully shuts dow

2019-06-06 Thread Igor Kamyshnikov (JIRA)
Igor Kamyshnikov created SPARK-27967:


 Summary: Fault tolerance broken: Race conditions: a supervised 
Driver is not relaunched and completely removed sometimes under Standalone 
cluster when Worker gracefully shuts down
 Key: SPARK-27967
 URL: https://issues.apache.org/jira/browse/SPARK-27967
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.2, 2.1.2
Reporter: Igor Kamyshnikov


Synthetic test:
 1) run ZK
 2) run Master
 3) run Worker with remote debugging agent (required for enabling a breakpoint 
to demonstrate race conditions issue)
 4) submit a long running Driver with --supervise flag
 5) connect to Worker via remote debugger
 6) enable a breakpoint in the method:
 org.apache.spark.deploy.worker.DriverRunner#kill
{code:java}
  /** Terminate this driver (or prevent it from ever starting if not yet 
started) */
  private[worker] def kill(): Unit = {
logInfo("Killing driver process!")
killed = true
synchronized {
  process.foreach { p =>
val exitCode = Utils.terminateProcess(p, DRIVER_TERMINATE_TIMEOUT_MS)
if (exitCode.isEmpty) { //< BREAKPOINT <<
  logWarning("Failed to terminate driver process: " + p +
  ". This process will likely be orphaned.")
}
  }
}
  }
{code}
7) send SIGTERM to Worker (or CTRL+C in Windows)
 8) check Spark Master Web UI: the Driver will appear in the *Completed 
Drivers* section with the state equal to *KILLED*

If there was no breakpoint then it is more likely that a new row with 
*RELAUNCHING* state would appear in the *Completed Drivers* section and a row 
with *SUBMITTED* state would remain in the *Running Drivers* section.

Explanation:
 1) Spark master relaunches a driver in response to "channelInactive" callback: 
org.apache.spark.rpc.netty.NettyRpcHandler#channelInactive
 which is triggered when the Worker process finishes.
 2) DriverRunner registers a shutdown hook here: 
org.apache.spark.deploy.worker.DriverRunner#start which calls the 
aforementioned "kill" method. Killing a driver can lead to reaching the 
following lines in the DriverRunner.start method:
{noformat}
// notify worker of final driver state, possible exception
worker.send(DriverStateChanged(driverId, finalState.get, 
finalException))
{noformat}
If this notification reaches Master then Driver is removed from the cluster as 
KILLED.

Real-world scenario (ver. 2.1.2):
 ZK, two Masters, the Active one loses its leadership, another becomes a new 
leader.
 Workers attempt to re-register with new master. But the report they failed to 
do this. They execute *System.exit(1)* from 
org.apache.spark.deploy.worker.Worker#registerWithMaster.
 This System.exit results in executing shutdown hooks. And somehow 
DriverStateChanged message reaches the new master.

 

*Worker* logs:
{noformat}
19/06/03 14:05:30 INFO Worker: Retrying connection to master (attempt # 5)
19/06/03 14:05:30 INFO Worker: Connecting to master 10.0.0.16:7077...
19/06/03 14:05:33 INFO Worker: Master has changed, new master is at 
spark://10.0.0.17:7077
19/06/03 14:05:33 ERROR TransportResponseHandler: Still have 4 requests 
outstanding when connection from /10.0.0.16:7077 is closed
19/06/03 14:05:33 ERROR Worker: Cannot register with master: 10.0.0.16:7077
java.io.IOException: Connection from /10.0.0.16:7077 closed
at 
org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128)
at 
org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:108)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannel