Toby Miller created FLINK-23388:
-----------------------------------

             Summary: Non-static Scala case classes cannot be serialised
                 Key: FLINK-23388
                 URL: https://issues.apache.org/jira/browse/FLINK-23388
             Project: Flink
          Issue Type: Bug
          Components: API / Type Serialization System, Scala Shell
            Reporter: Toby Miller


h3. Explanation of the issue

{{ScalaCaseClassSerializer}} is not powerful enough to serialise all Scala case 
classes that can be serialised in normal JVM serialisation (and the standard 
{{KryoSerializer}}). This is because it treats all case classes as made up only 
of their listed members. In fact, it is valid Scala to have other data in a 
case class, and in particular, it is possible for a nested case class to depend 
on data in its parent. This might be ill advised, but it is valid Scala:
{code:scala}
class Outer {
    var x = 0
    case class Inner(y: Int) {
        def z = x
    }
}

val outer = new Outer()
val inner = outer.Inner(1)
outer.x = 2

scala> inner.z
res0: Int = 2
{code}
As of Scala 2.11, the compiler flag {{-Yrepl-class-based}} is made available 
(and defaults to on in Scala 2.13). It changes the way the Scala REPL and 
similar tools encapsulates the user code written in the REPL, wrapping it in a 
serialisable class rather than an object (static class). The purpose of this is 
to aid serialisation of the whole thing for distributed REPL systems that need 
to perform computation on remote nodes. One such application is 
{{flink-scala-shell}}, and another is Apache Zeppelin. See below for an 
illustration of the flag's importance in applications like these.

In the JVM, a class can be serialised if it implements {{Serializable}}, and 
either it is static or its parent can be serialised. In the latter case the 
parent is brought with it to allow it to be constructed in its context at the 
other end.

The Flink {{ScalaCaseClassSerializer}} does not understand that case classes 
(which always implement {{Serializable}}) might be nested inside a serialisable 
outer class. This is exactly the scenario that occurs when defining a 
supposedly top-level case class in {{flink-scala-shell}} or Zeppelin, because 
{{-Yrepl-class-based}} causes it to be nested inside a serialisable outer 
class. The consequence is that Scala case classes defined in one of these 
applications cannot be serialised in Flink, making them practically unusable. 
As a core Scala language feature, this is a serious omission from these 
projects.
h3. Fixing {{ScalaCaseClassSerializer}} - no free lunch

I attempted to fix the issue by redirecting case classes in Flink to the 
standard {{KryoSerializer}} rather than the {{ScalaCaseClassSerializer}}, and 
at first glance this appeared to work very well. I was even able to run code in 
Zeppelin that sent a user-defined case class to be processed in a Flink job 
using the batch environment, and it worked well.

Unfortunately, it didn't work when I tried to do the same thing using the 
streaming table environment. The error as presented was a failure to cast 
{{KryoSerializer}} to {{TupleSerializerBase}}, the superclass serialiser for 
tuples, products, and {{ScalaCaseClassSerializer}}. The Flink Table API assumes 
that case classes will be assigned a serialiser capable of moving to and fro 
from a table representation. This is a strong assumption that no case class 
instance will ever carry data besides its primary contents.

In the case we're most interested in (the REPL class wrapper one), most case 
classes will not actually depend on additional data, but it's difficult for 
Flink to know that. In general, I imagine we would like to support cases where 
the case class does depend on additional data, although clearly Flink would be 
unable to turn it into a record and then back again. Perhaps we could do all 
the other operations without issue, but then raise an error if any operation 
attempted the latter transformation if there were missing additional data?
h3. Illustration of the importance of {{-Yrepl-class-based}}

As illustration of why the flag is important, consider the following REPL 
interaction:
{code:scala}
val x = 1
case class A(y: Int) {
    def z = x
}

val a = A(2)
compute_remotely(() => a.z)
{code}
The instance {{a}} requires the context {{x}} to be bought with it in order to 
perform the computation {{a.z}}, but if the REPL is placing the user code 
statically, then a simple serialisation of {{a}} will not take {{x}} with it.

However, with the {{-Yrepl-class-based}} flag active, {{A}} is now a nested 
class which depends on the outer serialisable class of global state. {{x}} is 
now automatically transferred as part of JVM serialisation and deserialisation, 
and the REPL doesn't need to jump through any additional hoops to make this 
work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to