[jira] [Commented] (SPARK-20176) Spark Dataframe UDAF issue

2017-04-03 Thread Dinesh Man Amatya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954527#comment-15954527
 ] 

Dinesh Man Amatya commented on SPARK-20176:
---

Thanks Kazuaki for the effort. I was able to resolve the issue by upgrading the 
spark and scala version as follows,

scala.version : 2.11.5

scala.compat.version : 2.11

spark.version : 2.1.0

> Spark Dataframe UDAF issue
> --
>
> Key: SPARK-20176
> URL: https://issues.apache.org/jira/browse/SPARK-20176
> Project: Spark
>  Issue Type: IT Help
>  Components: Spark Core
>Affects Versions: 2.0.2
>Reporter: Dinesh Man Amatya
>
> Getting following error in custom UDAF
> Error while decoding: java.util.concurrent.ExecutionException: 
> java.lang.Exception: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 58, Column 33: Incompatible expression types "boolean" and "java.lang.Boolean"
> /* 001 */ public java.lang.Object generate(Object[] references) {
> /* 002 */   return new SpecificSafeProjection(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificSafeProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
> /* 006 */
> /* 007 */   private Object[] references;
> /* 008 */   private MutableRow mutableRow;
> /* 009 */   private Object[] values;
> /* 010 */   private Object[] values1;
> /* 011 */   private org.apache.spark.sql.types.StructType schema;
> /* 012 */   private org.apache.spark.sql.types.StructType schema1;
> /* 013 */
> /* 014 */
> /* 015 */   public SpecificSafeProjection(Object[] references) {
> /* 016 */ this.references = references;
> /* 017 */ mutableRow = (MutableRow) references[references.length - 1];
> /* 018 */
> /* 019 */
> /* 020 */ this.schema = (org.apache.spark.sql.types.StructType) 
> references[0];
> /* 021 */ this.schema1 = (org.apache.spark.sql.types.StructType) 
> references[1];
> /* 022 */   }
> /* 023 */
> /* 024 */   public java.lang.Object apply(java.lang.Object _i) {
> /* 025 */ InternalRow i = (InternalRow) _i;
> /* 026 */
> /* 027 */ values = new Object[2];
> /* 028 */
> /* 029 */ boolean isNull2 = i.isNullAt(0);
> /* 030 */ UTF8String value2 = isNull2 ? null : (i.getUTF8String(0));
> /* 031 */
> /* 032 */ boolean isNull1 = isNull2;
> /* 033 */ final java.lang.String value1 = isNull1 ? null : 
> (java.lang.String) value2.toString();
> /* 034 */ isNull1 = value1 == null;
> /* 035 */ if (isNull1) {
> /* 036 */   values[0] = null;
> /* 037 */ } else {
> /* 038 */   values[0] = value1;
> /* 039 */ }
> /* 040 */
> /* 041 */ boolean isNull5 = i.isNullAt(1);
> /* 042 */ InternalRow value5 = isNull5 ? null : (i.getStruct(1, 2));
> /* 043 */ boolean isNull3 = false;
> /* 044 */ org.apache.spark.sql.Row value3 = null;
> /* 045 */ if (!false && isNull5) {
> /* 046 */
> /* 047 */   final org.apache.spark.sql.Row value6 = null;
> /* 048 */   isNull3 = true;
> /* 049 */   value3 = value6;
> /* 050 */ } else {
> /* 051 */
> /* 052 */   values1 = new Object[2];
> /* 053 */
> /* 054 */   boolean isNull10 = i.isNullAt(1);
> /* 055 */   InternalRow value10 = isNull10 ? null : (i.getStruct(1, 2));
> /* 056 */
> /* 057 */   boolean isNull9 = isNull10 || false;
> /* 058 */   final boolean value9 = isNull9 ? false : (Boolean) 
> value10.isNullAt(0);
> /* 059 */   boolean isNull8 = false;
> /* 060 */   double value8 = -1.0;
> /* 061 */   if (!isNull9 && value9) {
> /* 062 */
> /* 063 */ final double value12 = -1.0;
> /* 064 */ isNull8 = true;
> /* 065 */ value8 = value12;
> /* 066 */   } else {
> /* 067 */
> /* 068 */ boolean isNull14 = i.isNullAt(1);
> /* 069 */ InternalRow value14 = isNull14 ? null : (i.getStruct(1, 2));
> /* 070 */ boolean isNull13 = isNull14;
> /* 071 */ double value13 = -1.0;
> /* 072 */
> /* 073 */ if (!isNull14) {
> /* 074 */
> /* 075 */   if (value14.isNullAt(0)) {
> /* 076 */ isNull13 = true;
> /* 077 */   } else {
> /* 078 */ value13 = value14.getDouble(0);
> /* 079 */   }
> /* 080 */
> /* 081 */ }
> /* 082 */ isNull8 = isNull13;
> /* 083 */ value8 = value13;
> /* 084 */   }
> /* 085 */   if (isNull8) {
> /* 086 */ values1[0] = null;
> /* 087 */   } else {
> /* 088 */ values1[0] = value8;
> /* 089 */   }
> /* 090 */
> /* 091 */   boolean isNull17 = i.isNullAt(1);
> /* 092 */   InternalRow value17 = isNull17 ? null : (i.getStruct(1, 2));
> /* 093 */
> /* 094 */   boolean isNull16 = isNull17 || false;
> /* 095 */   final boolean value16 = isNull16 ? false : (Boolean) 
> value17.isNullAt(1);
> /* 096 */   boolean 

[jira] [Commented] (SPARK-20176) Spark Dataframe UDAF issue

2017-04-03 Thread Kazuaki Ishizaki (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954093#comment-15954093
 ] 

Kazuaki Ishizaki commented on SPARK-20176:
--

Thanks. The code seem to work for the master.
I am investigating which change fixes the issue.

> Spark Dataframe UDAF issue
> --
>
> Key: SPARK-20176
> URL: https://issues.apache.org/jira/browse/SPARK-20176
> Project: Spark
>  Issue Type: IT Help
>  Components: Spark Core
>Affects Versions: 2.0.2
>Reporter: Dinesh Man Amatya
>
> Getting following error in custom UDAF
> Error while decoding: java.util.concurrent.ExecutionException: 
> java.lang.Exception: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 58, Column 33: Incompatible expression types "boolean" and "java.lang.Boolean"
> /* 001 */ public java.lang.Object generate(Object[] references) {
> /* 002 */   return new SpecificSafeProjection(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificSafeProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
> /* 006 */
> /* 007 */   private Object[] references;
> /* 008 */   private MutableRow mutableRow;
> /* 009 */   private Object[] values;
> /* 010 */   private Object[] values1;
> /* 011 */   private org.apache.spark.sql.types.StructType schema;
> /* 012 */   private org.apache.spark.sql.types.StructType schema1;
> /* 013 */
> /* 014 */
> /* 015 */   public SpecificSafeProjection(Object[] references) {
> /* 016 */ this.references = references;
> /* 017 */ mutableRow = (MutableRow) references[references.length - 1];
> /* 018 */
> /* 019 */
> /* 020 */ this.schema = (org.apache.spark.sql.types.StructType) 
> references[0];
> /* 021 */ this.schema1 = (org.apache.spark.sql.types.StructType) 
> references[1];
> /* 022 */   }
> /* 023 */
> /* 024 */   public java.lang.Object apply(java.lang.Object _i) {
> /* 025 */ InternalRow i = (InternalRow) _i;
> /* 026 */
> /* 027 */ values = new Object[2];
> /* 028 */
> /* 029 */ boolean isNull2 = i.isNullAt(0);
> /* 030 */ UTF8String value2 = isNull2 ? null : (i.getUTF8String(0));
> /* 031 */
> /* 032 */ boolean isNull1 = isNull2;
> /* 033 */ final java.lang.String value1 = isNull1 ? null : 
> (java.lang.String) value2.toString();
> /* 034 */ isNull1 = value1 == null;
> /* 035 */ if (isNull1) {
> /* 036 */   values[0] = null;
> /* 037 */ } else {
> /* 038 */   values[0] = value1;
> /* 039 */ }
> /* 040 */
> /* 041 */ boolean isNull5 = i.isNullAt(1);
> /* 042 */ InternalRow value5 = isNull5 ? null : (i.getStruct(1, 2));
> /* 043 */ boolean isNull3 = false;
> /* 044 */ org.apache.spark.sql.Row value3 = null;
> /* 045 */ if (!false && isNull5) {
> /* 046 */
> /* 047 */   final org.apache.spark.sql.Row value6 = null;
> /* 048 */   isNull3 = true;
> /* 049 */   value3 = value6;
> /* 050 */ } else {
> /* 051 */
> /* 052 */   values1 = new Object[2];
> /* 053 */
> /* 054 */   boolean isNull10 = i.isNullAt(1);
> /* 055 */   InternalRow value10 = isNull10 ? null : (i.getStruct(1, 2));
> /* 056 */
> /* 057 */   boolean isNull9 = isNull10 || false;
> /* 058 */   final boolean value9 = isNull9 ? false : (Boolean) 
> value10.isNullAt(0);
> /* 059 */   boolean isNull8 = false;
> /* 060 */   double value8 = -1.0;
> /* 061 */   if (!isNull9 && value9) {
> /* 062 */
> /* 063 */ final double value12 = -1.0;
> /* 064 */ isNull8 = true;
> /* 065 */ value8 = value12;
> /* 066 */   } else {
> /* 067 */
> /* 068 */ boolean isNull14 = i.isNullAt(1);
> /* 069 */ InternalRow value14 = isNull14 ? null : (i.getStruct(1, 2));
> /* 070 */ boolean isNull13 = isNull14;
> /* 071 */ double value13 = -1.0;
> /* 072 */
> /* 073 */ if (!isNull14) {
> /* 074 */
> /* 075 */   if (value14.isNullAt(0)) {
> /* 076 */ isNull13 = true;
> /* 077 */   } else {
> /* 078 */ value13 = value14.getDouble(0);
> /* 079 */   }
> /* 080 */
> /* 081 */ }
> /* 082 */ isNull8 = isNull13;
> /* 083 */ value8 = value13;
> /* 084 */   }
> /* 085 */   if (isNull8) {
> /* 086 */ values1[0] = null;
> /* 087 */   } else {
> /* 088 */ values1[0] = value8;
> /* 089 */   }
> /* 090 */
> /* 091 */   boolean isNull17 = i.isNullAt(1);
> /* 092 */   InternalRow value17 = isNull17 ? null : (i.getStruct(1, 2));
> /* 093 */
> /* 094 */   boolean isNull16 = isNull17 || false;
> /* 095 */   final boolean value16 = isNull16 ? false : (Boolean) 
> value17.isNullAt(1);
> /* 096 */   boolean isNull15 = false;
> /* 097 */   double value15 = -1.0;
> /* 098 */   if (!isNull16 && value16) {
> 

[jira] [Commented] (SPARK-20176) Spark Dataframe UDAF issue

2017-04-02 Thread Dinesh Man Amatya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953025#comment-15953025
 ] 

Dinesh Man Amatya commented on SPARK-20176:
---

Following is the shortened code for generating above error. There are three 
files 
Test.scala , TestUdaf.scala and testData.csv




#Test.scala


import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
import org.scalatest.{BeforeAndAfterEach, FunSuite}

/**
  * Created by damatya on 4/3/17.
  */
class Test  extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  var sc :SparkContext= _


  override def beforeEach() {
sparkSession = SparkSession.builder().appName("udf testings")
  .master("local")
  .config("", "")
  .getOrCreate()
sc = sparkSession.sparkContext
  }

  override def afterEach() {
sparkSession.stop()
  }

  test("test total")
  {
val sqlContext = sparkSession.sqlContext

val dataRdd = 
sc.textFile("/opt/projects/pa/DasBackend/SparkEngine/src/main/resources/testData.csv")

val schemaString = "memberId;paidAmt;allowedAmt"

val schema = StructType(schemaString.split(";").map(fieldName ⇒ 
StructField(fieldName, StringType, true)))

val rowRdd = dataRdd.map{line => line.split(";", -1)}.map{ array => 
Row.fromSeq(array.toSeq)}

val dataFrame = sqlContext.createDataFrame(rowRdd,schema)

val testUdaf:TestUdaf = new TestUdaf(schema)

val resultDataFrame = 
dataFrame.groupBy("memberId").agg(testUdaf(dataFrame.columns.map(dataFrame(_)):_*).as("totalAmountPair"))

resultDataFrame.show(false)

dataFrame.show()



  }


}







##TestUdaf.scala


import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}


/**
  * Created by damatya on 4/3/17.
  */
class TestUdaf (inputSch:StructType) extends UserDefinedAggregateFunction{




  override def inputSchema: StructType = inputSch

  //inputSchema = inputSch

  override def bufferSchema: StructType = new StructType()
.add("totalRxPaid",DoubleType)
.add("totalRxAllowedAmt",DoubleType)




  override def dataType: DataType = new StructType()
.add("totalRxPaid",DoubleType)
.add("totalRxAllowedAmt",DoubleType)


  override def deterministic: Boolean = false

  override def initialize(buffer: MutableAggregationBuffer): Unit = {

buffer.update(0,0D)
buffer.update(1,0D)

  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {


var paidAmount : Float = 0f
var allowedAmount : Float = 0f

try
{
  paidAmount=input.getFloat(1)
  allowedAmount=input.getFloat(2)
}
catch
  {
case e:Exception =>
println ("invalid amount")
  }

val totalPaidAmount = buffer.getDouble(0)+paidAmount
val totalAllowedAmount = buffer.getDouble(1)+allowedAmount

buffer.update(0,totalPaidAmount)
buffer.update(1,totalAllowedAmount)

  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0,buffer1.getDouble(0)+buffer2.getDouble(0))
buffer1.update(1,buffer1.getDouble(1)+buffer2.getDouble(1))
  }

  override def evaluate(buffer: Row): Any = {

(buffer.getDouble(0) , buffer.getDouble(1))
  }
}





#testData.csv

m123;10.5;1
m123;20;10
m11;10;1
m11;30;1

> Spark Dataframe UDAF issue
> --
>
> Key: SPARK-20176
> URL: https://issues.apache.org/jira/browse/SPARK-20176
> Project: Spark
>  Issue Type: IT Help
>  Components: Spark Core
>Affects Versions: 2.0.2
>Reporter: Dinesh Man Amatya
>
> Getting following error in custom UDAF
> Error while decoding: java.util.concurrent.ExecutionException: 
> java.lang.Exception: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 58, Column 33: Incompatible expression types "boolean" and "java.lang.Boolean"
> /* 001 */ public java.lang.Object generate(Object[] references) {
> /* 002 */   return new SpecificSafeProjection(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificSafeProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
> /* 006 */
> /* 007 */   private Object[] references;
> /* 008 */   private MutableRow mutableRow;
> /* 009 */   private Object[] values;
> /* 010 */   private Object[] values1;
> /* 011 */   private org.apache.spark.sql.types.StructType schema;
> /* 012 */   private org.apache.spark.sql.types.StructType schema1;
> /* 013 */
> /* 014 */
> /* 015 */   public SpecificSafeProjection(Object[] references) {
> /* 016 */ this.references = references;
> /* 017 */ mutableRow = (MutableRow) references[references.length - 1];
> /* 018 */
> 

[jira] [Commented] (SPARK-20176) Spark Dataframe UDAF issue

2017-03-31 Thread Kazuaki Ishizaki (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15951473#comment-15951473
 ] 

Kazuaki Ishizaki commented on SPARK-20176:
--

Could you please post the program that can reproduce this issue?

> Spark Dataframe UDAF issue
> --
>
> Key: SPARK-20176
> URL: https://issues.apache.org/jira/browse/SPARK-20176
> Project: Spark
>  Issue Type: IT Help
>  Components: Spark Core
>Affects Versions: 2.0.2
>Reporter: Dinesh Man Amatya
>
> Getting following error in custom UDAF
> Error while decoding: java.util.concurrent.ExecutionException: 
> java.lang.Exception: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 58, Column 33: Incompatible expression types "boolean" and "java.lang.Boolean"
> /* 001 */ public java.lang.Object generate(Object[] references) {
> /* 002 */   return new SpecificSafeProjection(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificSafeProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
> /* 006 */
> /* 007 */   private Object[] references;
> /* 008 */   private MutableRow mutableRow;
> /* 009 */   private Object[] values;
> /* 010 */   private Object[] values1;
> /* 011 */   private org.apache.spark.sql.types.StructType schema;
> /* 012 */   private org.apache.spark.sql.types.StructType schema1;
> /* 013 */
> /* 014 */
> /* 015 */   public SpecificSafeProjection(Object[] references) {
> /* 016 */ this.references = references;
> /* 017 */ mutableRow = (MutableRow) references[references.length - 1];
> /* 018 */
> /* 019 */
> /* 020 */ this.schema = (org.apache.spark.sql.types.StructType) 
> references[0];
> /* 021 */ this.schema1 = (org.apache.spark.sql.types.StructType) 
> references[1];
> /* 022 */   }
> /* 023 */
> /* 024 */   public java.lang.Object apply(java.lang.Object _i) {
> /* 025 */ InternalRow i = (InternalRow) _i;
> /* 026 */
> /* 027 */ values = new Object[2];
> /* 028 */
> /* 029 */ boolean isNull2 = i.isNullAt(0);
> /* 030 */ UTF8String value2 = isNull2 ? null : (i.getUTF8String(0));
> /* 031 */
> /* 032 */ boolean isNull1 = isNull2;
> /* 033 */ final java.lang.String value1 = isNull1 ? null : 
> (java.lang.String) value2.toString();
> /* 034 */ isNull1 = value1 == null;
> /* 035 */ if (isNull1) {
> /* 036 */   values[0] = null;
> /* 037 */ } else {
> /* 038 */   values[0] = value1;
> /* 039 */ }
> /* 040 */
> /* 041 */ boolean isNull5 = i.isNullAt(1);
> /* 042 */ InternalRow value5 = isNull5 ? null : (i.getStruct(1, 2));
> /* 043 */ boolean isNull3 = false;
> /* 044 */ org.apache.spark.sql.Row value3 = null;
> /* 045 */ if (!false && isNull5) {
> /* 046 */
> /* 047 */   final org.apache.spark.sql.Row value6 = null;
> /* 048 */   isNull3 = true;
> /* 049 */   value3 = value6;
> /* 050 */ } else {
> /* 051 */
> /* 052 */   values1 = new Object[2];
> /* 053 */
> /* 054 */   boolean isNull10 = i.isNullAt(1);
> /* 055 */   InternalRow value10 = isNull10 ? null : (i.getStruct(1, 2));
> /* 056 */
> /* 057 */   boolean isNull9 = isNull10 || false;
> /* 058 */   final boolean value9 = isNull9 ? false : (Boolean) 
> value10.isNullAt(0);
> /* 059 */   boolean isNull8 = false;
> /* 060 */   double value8 = -1.0;
> /* 061 */   if (!isNull9 && value9) {
> /* 062 */
> /* 063 */ final double value12 = -1.0;
> /* 064 */ isNull8 = true;
> /* 065 */ value8 = value12;
> /* 066 */   } else {
> /* 067 */
> /* 068 */ boolean isNull14 = i.isNullAt(1);
> /* 069 */ InternalRow value14 = isNull14 ? null : (i.getStruct(1, 2));
> /* 070 */ boolean isNull13 = isNull14;
> /* 071 */ double value13 = -1.0;
> /* 072 */
> /* 073 */ if (!isNull14) {
> /* 074 */
> /* 075 */   if (value14.isNullAt(0)) {
> /* 076 */ isNull13 = true;
> /* 077 */   } else {
> /* 078 */ value13 = value14.getDouble(0);
> /* 079 */   }
> /* 080 */
> /* 081 */ }
> /* 082 */ isNull8 = isNull13;
> /* 083 */ value8 = value13;
> /* 084 */   }
> /* 085 */   if (isNull8) {
> /* 086 */ values1[0] = null;
> /* 087 */   } else {
> /* 088 */ values1[0] = value8;
> /* 089 */   }
> /* 090 */
> /* 091 */   boolean isNull17 = i.isNullAt(1);
> /* 092 */   InternalRow value17 = isNull17 ? null : (i.getStruct(1, 2));
> /* 093 */
> /* 094 */   boolean isNull16 = isNull17 || false;
> /* 095 */   final boolean value16 = isNull16 ? false : (Boolean) 
> value17.isNullAt(1);
> /* 096 */   boolean isNull15 = false;
> /* 097 */   double value15 = -1.0;
> /* 098 */   if (!isNull16 && value16) {
> /* 099 */
> /* 100 */