I've been working on packaging some UDTs as well. I have them working in scala and pyspark, although I haven't been able to get them to serialize to parquet, which puzzles me.
Although it works, I have to define UDTs under the org.apache.spark scope due to the privatization, which is a bit awkward. On Wed, Aug 16, 2017 at 8:55 AM, Katherine Prevost <k...@hypatian.org> wrote: > I'd say the quick summary of the problem is this: > > The encoder mechanism does not deal well with fields of case classes (you > must use builtin types (including other case classes) for case class > fields), and UDTs are not currently available (and never integrated well > with built-in operations). > > Encoders work great for individual fields if you're using tuples, but once > you get up over four or five fields this becomes incomprehensible. And, of > course, encoders do nothing for you once you are in the realm of dataframes > (including operations on fields, results of dataframe-based methods, and > working in languages other than Scala.) > > The sort of machinations I describe below are unpleasant but not a huge > deal for people who are trained as developers... but they're a much bigger > mess when we have to provide these interfaces to our data scientists. Yes, > they can do it, but the "every address is a string and you have to use > these functions that parse the strings over and over again" approach is > easier to use (if massively inefficient). > > I would like to improve Spark so that we can provide these types that our > data scientists need to use *all the time* in a way that's both efficient > and easy to use. > > Hence, my interest in doing work on the UDT and/or Encoder mechanisms of > Spark (or equivalent, if something new is in the works), and my interest in > hearing from anybody who is already working in this area, or hearing about > any future plans that have already been made in this area. > > > In more detail: > > On Wed, Aug 16, 2017 at 2:49 AM Jörn Franke <jornfra...@gmail.com> wrote: > >> Not sure I got to fully understand the issue (source code is always >> helpful ;-) but why don't you override the toString method of IPAddress. >> So, IP address could still be byte , but when it is displayed then toString >> converts the byteaddress into something human-readable? >> > > There are a couple of reasons it's not that simple. (If you look at the > sample snippets of code I did include, you'll see that I did define > toString methods.) > > The first problem is basically because toString doesn't happen when > working with DataFrames, which are often the result of common Spark > operations in Scala (though staying in the realm of Datasets is getting > easier, and apparently also becoming more efficient). Outside of Scala, > it's DataFrames all the way down. > > (If you look at my example code, you'll also see what happens when you > have a DataFrame with a field that is a struct with a byte array in it, and > nobody ever wants to see "[B@617f4814".) > > You can get around that (as long as you're still in a Dataset) with > something like this (this is using the IPAddress.toString method to produce > "IPAddress(Array(1,2,3,4))"): > > scala> ys.take(20) > res10: Array[Rec] = Array(Rec(IPAddress(Array(1, 2, 3, 4)), > IPAddress(Array(5, 6, 7, 8))), Rec(IPAddress(Array(1, 2, 3, 4, 5, 6, 7, 8, > 9, 10, 11, 12, 13, 14, 15, 16)), IPAddress(Array(17, 18, 19, 20, 21, 22, > 23, 24, 25, 26, 27, 28, 29, 30, 31, 32)))) > > But then of course you lose any easy ability to view Rec fields in > columns. (And while you could make something that prints Rec as columns, > what happens once you transform your record and turn it into a tuple?) > > The second one is that operating on the fields cleanly is still rather > painful, even if the values were to be displayed cleanly. This is what you > have to do to search for rows that have a specific IPAddress value (ys("a") > is a column of IPAddress, a is an IPAddress): > > scala> ys.select(ys("a.bytes") === a.bytes) > res9: org.apache.spark.sql.DataFrame = [(a.bytes AS `bytes` = > X'01020304'): boolean] > > It's worth noting that an implicit conversion from IPAddress to > Array[Byte] or to Column wouldn't work here, because === accepts Any. > > > katherine. > > > On 15. Aug 2017, at 18:49, Katherine Prevost <prevo...@cert.org> wrote: >> > >> > Hi, all! >> > >> > >> > I'm a developer who works to support data scientists at CERT. We've >> > been having some great success working with Spark for data analysis, >> > and I have some questions about how we could contribute to work on >> > Spark in support of our goals. >> > >> > Specifically, we have some interest in user-defined types, or their >> > equivalents. >> > >> > >> > When Spark 2 arrived, user-defined types (UDTs) were made private and >> > seem to have fallen by the wayside in favor of using encoders for >> > Datasets. I have some questions about the future of these mechanisms, >> > and was wondering if there's been a plan published anywhere for the >> > future of these mechanisms, or anyone I could talk to about where >> > things are going with them. >> > >> > I've roughly outlined our experience with these two mechanisms below, >> > and our hopes for what might be accomplished in the future. >> > >> > We'd love to spend some effort on development here, but haven't been >> > able to figure out if anyone is already working on improvements in >> > this area, or if there's some plan in place for where things are going >> > to go. >> > >> > So, I'd love to get in touch with anyone who might know more. >> > >> > >> > Background: >> > >> > Much of the work in my group is analysis of Internet protocol data, >> > and I think that IP addresses are a great example how a custom atomic >> > type can be helpful. >> > >> > IP addresses (including both 32-bit IPv4 addresses and 128-bit IPv6 >> > addresses) have a natural binary form (a sequence of bytes). Using >> > this format makes the default implementation of certain basic >> > operations sensible (equality and comparison, for example). Defining >> > UDFs for more complicated operations is not terribly difficultt. But >> > this format is not human-friendly to view. >> > >> > The human-readable presentations of IP addresses, on the other hand, >> > are large and unwieldy to work with computationally. There is a >> > canonical textual form for both IPv4 and IPv6 addresses, but >> > converting back and forth between that form and the binary form is >> > expensive, and the text form is generally at least twice as large as >> > the binary form. The text form is suitable for presenting to human >> > beings, but that's about it. >> > >> > There are also a variety of other types of Internet data that are best >> > represented by byte arrays and the like, meaning that simply saying >> > "just use a byte array for this column!" can be unfortunate for both >> > type-safety and comprehensibility of a colleciton of data. >> > >> > >> > When we were working on top of Spark 1, we had begun to look using >> > UDTs to represent IP addresses. There were some issues with working >> > with UDTs and working with the built-in operations like comparisons, >> > but we had some hope for improvements with future Spark releases. >> > >> > With Spark 2.0, the UDT API was made private, and the encoder >> > mechanism was suggested for use instead. For a bit, we experimented >> > with using the API anyway by putting stubs into Spark's namespace, but >> > there weren't really a lot of good places to hook various operations >> > like equality that one would expect to work on an atomic type. >> > >> > >> > We also tried using the encoder APIs, and ran into a few problems >> > there as well. Encoders are well suited to handling "top-level" >> > values, but the most convenient way to work with encoded data is by >> > having your top level be a case class defining types and names for a >> > record type. And here, there's a problem, because encoders from the >> > implicit environment are not available when encoding the fields of a >> > case class. So, if we defined a custom encoder for our IPAddress type, >> > and then included an IPAddress as a field of a record, this would >> > result in an error. >> > >> > One approach we tried to get around that was to make IP addresses >> > themselves into case classes as well, so that only the default >> > encoders would be required. This eliminated the error, but made >> > working with the values a nightmare. If we made a Dataset[IPAddress], >> > the byte array would be presented in a reasonable manner, but a >> > Dataset[Rec] where Rec had IPAddress fields was another story, >> > resulting in the default toString of Java arrays being used: >> > >> > +-------------+-------------+ >> > | a| b| >> > +-------------+-------------+ >> > |[[B@47260109]|[[B@3538740a]| >> > |[[B@617f4814]|[[B@77e69bee]| >> > +-------------+-------------+ >> > >> > (See code snippet at the end of this message for details.) >> > >> > Now basically all interactions would have to go through UDFs, >> > including remembering to format the IPAddress field if you wanted any >> > useful information out of it at all. >> > >> > >> > As a result, since our initial experiments with 2.0 we dropped back >> > and punted to using text for all IP addresses. But, we'd still like to >> > do better. What we optimally want is some mechanism for a user-defined >> > atomic type (whether via encoders or via registering a new type) which >> > allows for: >> > >> > * An appropriately efficient underlying form to be used. (A struct >> > with a byte array field would be fine. A byte array field would be >> > fine.) >> > >> > * A display form that is meaningful to the user (the expected form >> > like "172.217.5.238" and "2607:f8b0:4004:800::200e".) >> > >> > * At least some support for standard SQL operators like equality and >> > comparison, and the ability to define UDFs that work with the type. >> > >> > Longer term, it would be lovely to: >> > >> > * Be able to work with values of the type in an appropriate way in >> > different source languags (i.e. make it not hard to work with the >> > values in Python or R, although the restrictions of those languages >> > will require additional implementation work.) >> > >> > * Be able to provide new Catalyst optimizations specific to the type >> > and functions defined on the type. >> > >> > We'd love to provide some effort at achieving these goals, but aren't >> > sure where to start. We'd like to avoid stepping in the way of any >> > efforts that might already be underway to improve these mechanisms. >> > >> > >> > Thanks very much! >> > >> > Katherine Prevost >> > Carnegie Mellon / Software Engineering Institute / CERT >> > >> > >> > ------------------------------------------------------------ >> -------->8-- >> > >> > // Simple example demonstrating the treatment of a case class with a >> > // byte array within another case class. >> > >> > case class IPAddress(bytes: Array[Byte]) { >> > override def toString: String = s"IPAddress(Array(${bytes.mkString(", >> ")}))" >> > } >> > >> > val a = IPAddress(Array(1,2,3,4)) >> > val b = IPAddress(Array(5,6,7,8)) >> > val c = IPAddress(Array(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16)) >> > val d = IPAddress(Array(17,18,19,20,21,22,23,24,25,26,27,28,29,30, >> 31,32)) >> > >> > val x = Array(a, b, c, d) >> > val xs = sc.parallelize(x).toDS >> > >> > /* >> > scala> xs.show >> > +--------------------+ >> > | bytes| >> > +--------------------+ >> > | [01 02 03 04]| >> > | [05 06 07 08]| >> > |[01 02 03 04 05 0...| >> > |[11 12 13 14 15 1...| >> > +--------------------+ >> > */ >> > >> > case class Rec(a: IPAddress, b: IPAddress) { >> > override def toString: String = s"Rec($a, $b)" >> > } >> > >> > val e = Rec(a, b) >> > val f = Rec(c, d) >> > val y = Array(e, f) >> > val ys = sc.parallelize(y).toDS >> > >> > /* >> > scala> ys.show >> > +-------------+-------------+ >> > | a| b| >> > +-------------+-------------+ >> > |[[B@47260109]|[[B@3538740a]| >> > |[[B@617f4814]|[[B@77e69bee]| >> > +-------------+-------------+ >> > */ >> > >> > ------------------------------------------------------------ >> -------->8-- >> > >> > --------------------------------------------------------------------- >> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> > >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> >>