Re: Spark 1.6 udf/udaf alternatives in dataset?

2016-01-12 Thread Michael Armbrust
Awesome, thanks for opening the JIRA!  We'll take a look.

On Tue, Jan 12, 2016 at 1:53 PM, Muthu Jayakumar  wrote:

> I tried to rerun the same code with current snapshot version of 1.6 and
> 2.0 from
> https://repository.apache.org/content/repositories/snapshots/org/apache/spark/spark-core_2.11/
>
> But I still see an exception around the same line. Here is the exception
> below. Filed an issue against the same SPARK-12783
> 
>
> .13:49:07.388 [main] ERROR o.a.s.s.c.e.c.GenerateSafeProjection - failed
> to compile: org.codehaus.commons.compiler.CompileException: File
> 'generated.java', Line 140, Column 47: No applicable constructor/method
> found for actual parameters "scala.collection.Map"; candidates are:
> "collector.MyMap(scala.collection.immutable.Map)"
> /* 001 */
> /* 002 */ public java.lang.Object
> generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) {
> /* 003 */   return new SpecificSafeProjection(expr);
> /* 004 */ }
> /* 005 */
> /* 006 */ class SpecificSafeProjection extends
> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
> /* 007 */
> /* 008 */   private org.apache.spark.sql.catalyst.expressions.Expression[]
> expressions;
> /* 009 */   private org.apache.spark.sql.catalyst.expressions.MutableRow
> mutableRow;
> /* 010 */
> /* 011 */
> /* 012 */
> /* 013 */   public
> SpecificSafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
> expr) {
> /* 014 */ expressions = expr;
> /* 015 */ mutableRow = new
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(1);
> /* 016 */
> /* 017 */   }
> /* 018 */
> /* 019 */   public java.lang.Object apply(java.lang.Object _i) {
> /* 020 */ InternalRow i = (InternalRow) _i;
> /* 021 */ /* newinstance(class collector.MyMap,staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
> scala.collection.Map),toScalaMap,invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class
> [Ljava.lang.Object;)),invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class
> [Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None) */
> /* 022 */ /* staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
> scala.collection.Map),toScalaMap,invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class
> [Ljava.lang.Object;)),invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class
> [Ljava.lang.Object;)),true) */
> /* 023 */ /*
> invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class
> [Ljava.lang.Object;)) */
> /* 024 */ /*
> mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true)))
> */
> /* 025 */ /* invoke(input[0,
> MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true)) */
> /* 026 */ /* input[0, MapType(StringType,StringType,true)] */
> /* 027 */ boolean isNull10 = i.isNullAt(0);
> /* 028 */ MapData primitive11 = isNull10 ? null : (i.getMap(0));
> /* 029 */
> /* 030 */
> /* 031 */ boolean isNull8 = isNull10;
> /* 032 */ ArrayData primitive9 =
> /* 033 */ isNull8 ?
> /* 034 */ null : (ArrayData) primitive11.keyArray();
> /* 035 */ isNull8 = primitive9 == null;
> /* 036 */
> /* 037 */ boolean isNull6 = primitive9 == null;
> /* 038 */ ArrayData primitive7 = null;
> /* 039 */
> /* 040 */ if (!isNu

Re: Spark 1.6 udf/udaf alternatives in dataset?

2016-01-12 Thread Muthu Jayakumar
I tried to rerun the same code with current snapshot version of 1.6 and 2.0
from
https://repository.apache.org/content/repositories/snapshots/org/apache/spark/spark-core_2.11/

But I still see an exception around the same line. Here is the exception
below. Filed an issue against the same SPARK-12783


.13:49:07.388 [main] ERROR o.a.s.s.c.e.c.GenerateSafeProjection - failed to
compile: org.codehaus.commons.compiler.CompileException: File
'generated.java', Line 140, Column 47: No applicable constructor/method
found for actual parameters "scala.collection.Map"; candidates are:
"collector.MyMap(scala.collection.immutable.Map)"
/* 001 */
/* 002 */ public java.lang.Object
generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) {
/* 003 */   return new SpecificSafeProjection(expr);
/* 004 */ }
/* 005 */
/* 006 */ class SpecificSafeProjection extends
org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
/* 007 */
/* 008 */   private org.apache.spark.sql.catalyst.expressions.Expression[]
expressions;
/* 009 */   private org.apache.spark.sql.catalyst.expressions.MutableRow
mutableRow;
/* 010 */
/* 011 */
/* 012 */
/* 013 */   public
SpecificSafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
expr) {
/* 014 */ expressions = expr;
/* 015 */ mutableRow = new
org.apache.spark.sql.catalyst.expressions.GenericMutableRow(1);
/* 016 */
/* 017 */   }
/* 018 */
/* 019 */   public java.lang.Object apply(java.lang.Object _i) {
/* 020 */ InternalRow i = (InternalRow) _i;
/* 021 */ /* newinstance(class collector.MyMap,staticinvoke(class
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
scala.collection.Map),toScalaMap,invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class
[Ljava.lang.Object;)),invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class
[Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None) */
/* 022 */ /* staticinvoke(class
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
scala.collection.Map),toScalaMap,invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class
[Ljava.lang.Object;)),invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class
[Ljava.lang.Object;)),true) */
/* 023 */ /*
invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class
[Ljava.lang.Object;)) */
/* 024 */ /*
mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true)))
*/
/* 025 */ /* invoke(input[0,
MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true)) */
/* 026 */ /* input[0, MapType(StringType,StringType,true)] */
/* 027 */ boolean isNull10 = i.isNullAt(0);
/* 028 */ MapData primitive11 = isNull10 ? null : (i.getMap(0));
/* 029 */
/* 030 */
/* 031 */ boolean isNull8 = isNull10;
/* 032 */ ArrayData primitive9 =
/* 033 */ isNull8 ?
/* 034 */ null : (ArrayData) primitive11.keyArray();
/* 035 */ isNull8 = primitive9 == null;
/* 036 */
/* 037 */ boolean isNull6 = primitive9 == null;
/* 038 */ ArrayData primitive7 = null;
/* 039 */
/* 040 */ if (!isNull6) {
/* 041 */   java.lang.String[] convertedArray15 = null;
/* 042 */   int dataLength14 = primitive9.numElements();
/* 043 */   convertedArray15 = new java.lang.String[dataLength14];
/* 044 */
/* 045 */   int loopIndex16 = 0;
/* 046 */   while (loopIndex16 < da

Re: Spark 1.6 udf/udaf alternatives in dataset?

2016-01-12 Thread Muthu Jayakumar
Thanks Micheal. Let me test it with a recent master code branch.

Also for every mapping step should I have to create a new case class? I
cannot use Tuple as I have ~130 columns to process. Earlier I had used a
Seq[Any] (actually Array[Any] to optimize on serialization) but processed
it using RDD (by building the Schema at runtime). Now I am attempting to
replace this using Dataset.

>the problem is that at compile time we don't know if its an inner or outer
join.
May I suggest to have different methods for different kind of joins
(similar to RDD api)? This way the typesafety is enforced.

Here is the error message.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task not serializable: java.io.NotSerializableException:
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
Serialization stack: - object not serializable (class:
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1,
value: package lang) - field (class: scala.reflect.internal.Types$ThisType,
name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object
(class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - field
(class: scala.reflect.internal.Types$TypeRef, name: pre, type: class
scala.reflect.internal.Types$Type) - object (class
scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - field (class:
scala.reflect.internal.Types$TypeRef, name: normalized, type: class
scala.reflect.internal.Types$Type) - object (class
scala.reflect.internal.Types$AliasNoArgsTypeRef, String) - field (class:
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1,
type: class scala.reflect.api.Types$TypeApi) - object (class
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - field (class:
org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type:
interface scala.Function1) - object (class
org.apache.spark.sql.catalyst.expressions.MapObjects,
mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field
(class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) -
field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name:
targetObject, type: class
org.apache.spark.sql.catalyst.expressions.Expression) - object (class
org.apache.spark.sql.catalyst.expressions.Invoke,
invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;))) - writeObject data (class:
scala.collection.immutable.List$SerializationProxy) - object (class
scala.collection.immutable.List$SerializationProxy,
scala.collection.immutable.List$SerializationProxy@7e78c3cf) - writeReplace
data (class: scala.collection.immutable.List$SerializationProxy) - object
(class scala.collection.immutable.$colon$colon,
List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),
invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object; - field (class:
org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments,
type: interface scala.collection.Seq) - object (class
org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),true)) - writeObject data (class:
scala.collection.immutable.List$SerializationProxy) - object (class
scala.collection.immutable.List$SerializationProxy,
scala.collection.immutable.List$SerializationProxy@377795c5) - writeReplace
data (class: scala.collection.immutable.List$SerializationProxy) - object
(class scala.collection.immutable.$colon$colon, List(staticinvoke(class
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable

Re: Spark 1.6 udf/udaf alternatives in dataset?

2016-01-12 Thread Michael Armbrust
>
> df1.as[TestCaseClass].map(_.toMyMap).show() //fails
>
> This looks like a bug.  What is the error?  It might be fixed in
branch-1.6/master if you can test there.

> Please advice on what I may be missing here?
>
>
> Also for join, may I suggest to have a custom encoder / transformation to
> say how 2 datasets can merge?
> Also, when a join in made using something like 'left outer join' the right
> side object should ideally be Option kind (similar to what's seen in RDD).
> And I think this may make it strongly typed?
>

I think you can actually use as to convert this to an Option if you'd like
typesafety.  the problem is that at compile time we don't know if its an
inner or outer join.


Re: Spark 1.6 udf/udaf alternatives in dataset?

2016-01-11 Thread Muthu Jayakumar
Hello Michael,

Thank you for the suggestion. This should do the trick for column names.
But how could I transform columns value type? Do I have to use an UDF? In
case if I use UDF, then the other question I may have is pertaining to the
map step in dataset, where I am running into an error when I try to
transform the object into another type.

For example:

case class MyMap(map: Map[String, String])

case class TestCaseClass(a: String, b: String){
  def toMyMap: MyMap = {
MyMap(Map(a->b))
  }

  def toStr: String = {
a
  }
}

//Main method section below

import sqlContext.implicits._

val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01",
"data1"), TestCaseClass("2015-05-01", "data2"))).toDF()

df1.as[TestCaseClass].map(_.toStr).show() //works fine
df1.as[TestCaseClass].map(_.toMyMap).show() //fails

Please advice on what I may be missing here?


Also for join, may I suggest to have a custom encoder / transformation to
say how 2 datasets can merge?
Also, when a join in made using something like 'left outer join' the right
side object should ideally be Option kind (similar to what's seen in RDD).
And I think this may make it strongly typed?

Thank you for looking into my email.

Thanks,
Muthu


On Mon, Jan 11, 2016 at 3:08 PM, Michael Armbrust 
wrote:

> Also, while extracting a value into Dataset using as[U] method, how could
>> I specify a custom encoder/translation to case class (where I don't have
>> the same column-name mapping or same data-type mapping)?
>>
>
> There is no public API yet for defining your own encoders.  You change the
> column names using select and as to make sure they line up correctly.
>
> df.select($"oldName".as("newName"))
>


Re: Spark 1.6 udf/udaf alternatives in dataset?

2016-01-11 Thread Michael Armbrust
>
> Also, while extracting a value into Dataset using as[U] method, how could
> I specify a custom encoder/translation to case class (where I don't have
> the same column-name mapping or same data-type mapping)?
>

There is no public API yet for defining your own encoders.  You change the
column names using select and as to make sure they line up correctly.

df.select($"oldName".as("newName"))


Re: Spark 1.6 udf/udaf alternatives in dataset?

2016-01-11 Thread Arkadiusz Bicz
Hi,

There are some documentation in

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html

and also you can check out tests of DatasetSuite in spark sources.


BR,

Arkadiusz Bicz


On Mon, Jan 11, 2016 at 5:37 AM, Muthu Jayakumar  wrote:
> Hello there,
>
> While looking at the features of Dataset, it seem to provide an alternative
> way towards udf and udaf. Any documentation or sample code snippet to write
> this would be helpful in rewriting existing UDFs into Dataset mapping step.
> Also, while extracting a value into Dataset using as[U] method, how could I
> specify a custom encoder/translation to case class (where I don't have the
> same column-name mapping or same data-type mapping)?
>
> Please advice,
> Muthu

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.6 udf/udaf alternatives in dataset?

2016-01-10 Thread Muthu Jayakumar
Hello there,

While looking at the features of Dataset, it seem to provide an alternative
way towards udf and udaf. Any documentation or sample code snippet to write
this would be helpful in rewriting existing UDFs into Dataset mapping step.
Also, while extracting a value into Dataset using as[U] method, how could I
specify a custom encoder/translation to case class (where I don't have the
same column-name mapping or same data-type mapping)?

Please advice,
Muthu