I've been working on packaging some UDTs as well.  I have them working in
scala and pyspark, although I haven't been able to get them to serialize to
parquet, which puzzles me.

Although it works, I have to define UDTs under the org.apache.spark scope
due to the privatization, which is a bit awkward.

On Wed, Aug 16, 2017 at 8:55 AM, Katherine Prevost <k...@hypatian.org> wrote:

> I'd say the quick summary of the problem is this:
>
> The encoder mechanism does not deal well with fields of case classes (you
> must use builtin types (including other case classes) for case class
> fields), and UDTs are not currently available (and never integrated well
> with built-in operations).
>
> Encoders work great for individual fields if you're using tuples, but once
> you get up over four or five fields this becomes incomprehensible. And, of
> course, encoders do nothing for you once you are in the realm of dataframes
> (including operations on fields, results of dataframe-based methods, and
> working in languages other than Scala.)
>
> The sort of machinations I describe below are unpleasant but not a huge
> deal for people who are trained as developers... but they're a much bigger
> mess when we have to provide these interfaces to our data scientists. Yes,
> they can do it, but the "every address is a string and you have to use
> these functions that parse the strings over and over again" approach is
> easier to use (if massively inefficient).
>
> I would like to improve Spark so that we can provide these types that our
> data scientists need to use *all the time* in a way that's both efficient
> and easy to use.
>
> Hence, my interest in doing work on the UDT and/or Encoder mechanisms of
> Spark (or equivalent, if something new is in the works), and my interest in
> hearing from anybody who is already working in this area, or hearing about
> any future plans that have already been made in this area.
>
>
> In more detail:
>
> On Wed, Aug 16, 2017 at 2:49 AM Jörn Franke <jornfra...@gmail.com> wrote:
>
>> Not sure I got to fully understand the issue (source code is always
>> helpful ;-) but why don't you override the toString method of IPAddress.
>> So, IP address could still be byte , but when it is displayed then toString
>> converts the byteaddress into something human-readable?
>>
>
> There are a couple of reasons it's not that simple. (If you look at the
> sample snippets of code I did include, you'll see that I did define
> toString methods.)
>
> The first problem is basically because toString doesn't happen when
> working with DataFrames, which are often the result of common Spark
> operations in Scala (though staying in the realm of Datasets is getting
> easier, and apparently also becoming more efficient). Outside of Scala,
> it's DataFrames all the way down.
>
> (If you look at my example code, you'll also see what happens when you
> have a DataFrame with a field that is a struct with a byte array in it, and
> nobody ever wants to see "[B@617f4814".)
>
> You can get around that (as long as you're still in a Dataset) with
> something like this (this is using the IPAddress.toString method to produce
> "IPAddress(Array(1,2,3,4))"):
>
> scala> ys.take(20)
> res10: Array[Rec] = Array(Rec(IPAddress(Array(1, 2, 3, 4)),
> IPAddress(Array(5, 6, 7, 8))), Rec(IPAddress(Array(1, 2, 3, 4, 5, 6, 7, 8,
> 9, 10, 11, 12, 13, 14, 15, 16)), IPAddress(Array(17, 18, 19, 20, 21, 22,
> 23, 24, 25, 26, 27, 28, 29, 30, 31, 32))))
>
> But then of course you lose any easy ability to view Rec fields in
> columns. (And while you could make something that prints Rec as columns,
> what happens once you transform your record and turn it into a tuple?)
>
> The second one is that operating on the fields cleanly is still rather
> painful, even if the values were to be displayed cleanly. This is what you
> have to do to search for rows that have a specific IPAddress value (ys("a")
> is a column of IPAddress, a is an IPAddress):
>
> scala> ys.select(ys("a.bytes") === a.bytes)
> res9: org.apache.spark.sql.DataFrame = [(a.bytes AS `bytes` =
> X'01020304'): boolean]
>
> It's worth noting that an implicit conversion from IPAddress to
> Array[Byte] or to Column wouldn't work here, because === accepts Any.
>
>
> katherine.
>
> > On 15. Aug 2017, at 18:49, Katherine Prevost <prevo...@cert.org> wrote:
>> >
>> > Hi, all!
>> >
>> >
>> > I'm a developer who works to support data scientists at CERT. We've
>> > been having some great success working with Spark for data analysis,
>> > and I have some questions about how we could contribute to work on
>> > Spark in support of our goals.
>> >
>> > Specifically, we have some interest in user-defined types, or their
>> > equivalents.
>> >
>> >
>> > When Spark 2 arrived, user-defined types (UDTs) were made private and
>> > seem to have fallen by the wayside in favor of using encoders for
>> > Datasets. I have some questions about the future of these mechanisms,
>> > and was wondering if there's been a plan published anywhere for the
>> > future of these mechanisms, or anyone I could talk to about where
>> > things are going with them.
>> >
>> > I've roughly outlined our experience with these two mechanisms below,
>> > and our hopes for what might be accomplished in the future.
>> >
>> > We'd love to spend some effort on development here, but haven't been
>> > able to figure out if anyone is already working on improvements in
>> > this area, or if there's some plan in place for where things are going
>> > to go.
>> >
>> > So, I'd love to get in touch with anyone who might know more.
>> >
>> >
>> > Background:
>> >
>> > Much of the work in my group is analysis of Internet protocol data,
>> > and I think that IP addresses are a great example how a custom atomic
>> > type can be helpful.
>> >
>> > IP addresses (including both 32-bit IPv4 addresses and 128-bit IPv6
>> > addresses) have a natural binary form (a sequence of bytes). Using
>> > this format makes the default implementation of certain basic
>> > operations sensible (equality and comparison, for example). Defining
>> > UDFs for more complicated operations is not terribly difficultt. But
>> > this format is not human-friendly to view.
>> >
>> > The human-readable presentations of IP addresses, on the other hand,
>> > are large and unwieldy to work with computationally. There is a
>> > canonical textual form for both IPv4 and IPv6 addresses, but
>> > converting back and forth between that form and the binary form is
>> > expensive, and the text form is generally at least twice as large as
>> > the binary form. The text form is suitable for presenting to human
>> > beings, but that's about it.
>> >
>> > There are also a variety of other types of Internet data that are best
>> > represented by byte arrays and the like, meaning that simply saying
>> > "just use a byte array for this column!" can be unfortunate for both
>> > type-safety and comprehensibility of a colleciton of data.
>> >
>> >
>> > When we were working on top of Spark 1, we had begun to look using
>> > UDTs to represent IP addresses. There were some issues with working
>> > with UDTs and working with the built-in operations like comparisons,
>> > but we had some hope for improvements with future Spark releases.
>> >
>> > With Spark 2.0, the UDT API was made private, and the encoder
>> > mechanism was suggested for use instead. For a bit, we experimented
>> > with using the API anyway by putting stubs into Spark's namespace, but
>> > there weren't really a lot of good places to hook various operations
>> > like equality that one would expect to work on an atomic type.
>> >
>> >
>> > We also tried using the encoder APIs, and ran into a few problems
>> > there as well. Encoders are well suited to handling "top-level"
>> > values, but the most convenient way to work with encoded data is by
>> > having your top level be a case class defining types and names for a
>> > record type. And here, there's a problem, because encoders from the
>> > implicit environment are not available when encoding the fields of a
>> > case class. So, if we defined a custom encoder for our IPAddress type,
>> > and then included an IPAddress as a field of a record, this would
>> > result in an error.
>> >
>> > One approach we tried to get around that was to make IP addresses
>> > themselves into case classes as well, so that only the default
>> > encoders would be required. This eliminated the error, but made
>> > working with the values a nightmare. If we made a Dataset[IPAddress],
>> > the byte array would be presented in a reasonable manner, but a
>> > Dataset[Rec] where Rec had IPAddress fields was another story,
>> > resulting in the default toString of Java arrays being used:
>> >
>> > +-------------+-------------+
>> > |            a|            b|
>> > +-------------+-------------+
>> > |[[B@47260109]|[[B@3538740a]|
>> > |[[B@617f4814]|[[B@77e69bee]|
>> > +-------------+-------------+
>> >
>> > (See code snippet at the end of this message for details.)
>> >
>> > Now basically all interactions would have to go through UDFs,
>> > including remembering to format the IPAddress field if you wanted any
>> > useful information out of it at all.
>> >
>> >
>> > As a result, since our initial experiments with 2.0 we dropped back
>> > and punted to using text for all IP addresses. But, we'd still like to
>> > do better. What we optimally want is some mechanism for a user-defined
>> > atomic type (whether via encoders or via registering a new type) which
>> > allows for:
>> >
>> > * An appropriately efficient underlying form to be used. (A struct
>> >   with a byte array field would be fine. A byte array field would be
>> >   fine.)
>> >
>> > * A display form that is meaningful to the user (the expected form
>> >   like "172.217.5.238" and "2607:f8b0:4004:800::200e".)
>> >
>> > * At least some support for standard SQL operators like equality and
>> >   comparison, and the ability to define UDFs that work with the type.
>> >
>> > Longer term, it would be lovely to:
>> >
>> > * Be able to work with values of the type in an appropriate way in
>> >   different source languags (i.e. make it not hard to work with the
>> >   values in Python or R, although the restrictions of those languages
>> >   will require additional implementation work.)
>> >
>> > * Be able to provide new Catalyst optimizations specific to the type
>> >   and functions defined on the type.
>> >
>> > We'd love to provide some effort at achieving these goals, but aren't
>> > sure where to start. We'd like to avoid stepping in the way of any
>> > efforts that might already be underway to improve these mechanisms.
>> >
>> >
>> > Thanks very much!
>> >
>> > Katherine Prevost
>> > Carnegie Mellon / Software Engineering Institute / CERT
>> >
>> >
>> > ------------------------------------------------------------
>> -------->8--
>> >
>> > // Simple example demonstrating the treatment of a case class with a
>> > // byte array within another case class.
>> >
>> > case class IPAddress(bytes: Array[Byte]) {
>> >  override def toString: String = s"IPAddress(Array(${bytes.mkString(",
>> ")}))"
>> > }
>> >
>> > val a = IPAddress(Array(1,2,3,4))
>> > val b = IPAddress(Array(5,6,7,8))
>> > val c = IPAddress(Array(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16))
>> > val d = IPAddress(Array(17,18,19,20,21,22,23,24,25,26,27,28,29,30,
>> 31,32))
>> >
>> > val x = Array(a, b, c, d)
>> > val xs = sc.parallelize(x).toDS
>> >
>> > /*
>> > scala> xs.show
>> > +--------------------+
>> > |               bytes|
>> > +--------------------+
>> > |       [01 02 03 04]|
>> > |       [05 06 07 08]|
>> > |[01 02 03 04 05 0...|
>> > |[11 12 13 14 15 1...|
>> > +--------------------+
>> > */
>> >
>> > case class Rec(a: IPAddress, b: IPAddress) {
>> >  override def toString: String = s"Rec($a, $b)"
>> > }
>> >
>> > val e = Rec(a, b)
>> > val f = Rec(c, d)
>> > val y = Array(e, f)
>> > val ys = sc.parallelize(y).toDS
>> >
>> > /*
>> > scala> ys.show
>> > +-------------+-------------+
>> > |            a|            b|
>> > +-------------+-------------+
>> > |[[B@47260109]|[[B@3538740a]|
>> > |[[B@617f4814]|[[B@77e69bee]|
>> > +-------------+-------------+
>> > */
>> >
>> > ------------------------------------------------------------
>> -------->8--
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>

Reply via email to