[ 
https://issues.apache.org/jira/browse/SPARK-20176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15953025#comment-15953025
 ] 

Dinesh Man Amatya commented on SPARK-20176:
-------------------------------------------

Following is the shortened code for generating above error. There are three 
files 
Test.scala , TestUdaf.scala and testData.csv




#Test.scala


import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
import org.scalatest.{BeforeAndAfterEach, FunSuite}

/**
  * Created by damatya on 4/3/17.
  */
class Test  extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  var sc :SparkContext= _


  override def beforeEach() {
    sparkSession = SparkSession.builder().appName("udf testings")
      .master("local")
      .config("", "")
      .getOrCreate()
    sc = sparkSession.sparkContext
  }

  override def afterEach() {
    sparkSession.stop()
  }

  test("test total")
  {
    val sqlContext = sparkSession.sqlContext

    val dataRdd = 
sc.textFile("/opt/projects/pa/DasBackend/SparkEngine/src/main/resources/testData.csv")

    val schemaString = "memberId;paidAmt;allowedAmt"

    val schema = StructType(schemaString.split(";").map(fieldName ⇒ 
StructField(fieldName, StringType, true)))

    val rowRdd = dataRdd.map{line => line.split(";", -1)}.map{ array => 
Row.fromSeq(array.toSeq)}

    val dataFrame = sqlContext.createDataFrame(rowRdd,schema)

    val testUdaf:TestUdaf = new TestUdaf(schema)

    val resultDataFrame = 
dataFrame.groupBy("memberId").agg(testUdaf(dataFrame.columns.map(dataFrame(_)):_*).as("totalAmountPair"))

    resultDataFrame.show(false)

    dataFrame.show()



  }


}







##TestUdaf.scala


import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}


/**
  * Created by damatya on 4/3/17.
  */
class TestUdaf (inputSch:StructType) extends UserDefinedAggregateFunction{




  override def inputSchema: StructType = inputSch

  //inputSchema = inputSch

  override def bufferSchema: StructType = new StructType()
    .add("totalRxPaid",DoubleType)
    .add("totalRxAllowedAmt",DoubleType)




  override def dataType: DataType = new StructType()
    .add("totalRxPaid",DoubleType)
    .add("totalRxAllowedAmt",DoubleType)


  override def deterministic: Boolean = false

  override def initialize(buffer: MutableAggregationBuffer): Unit = {

    buffer.update(0,0D)
    buffer.update(1,0D)

  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {


    var paidAmount : Float = 0f
    var allowedAmount : Float = 0f

    try
    {
      paidAmount=input.getFloat(1)
      allowedAmount=input.getFloat(2)
    }
    catch
      {
        case e:Exception =>
        println ("invalid amount")
      }

    val totalPaidAmount = buffer.getDouble(0)+paidAmount
    val totalAllowedAmount = buffer.getDouble(1)+allowedAmount

    buffer.update(0,totalPaidAmount)
    buffer.update(1,totalAllowedAmount)

  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1.update(0,buffer1.getDouble(0)+buffer2.getDouble(0))
    buffer1.update(1,buffer1.getDouble(1)+buffer2.getDouble(1))
  }

  override def evaluate(buffer: Row): Any = {
    
    (buffer.getDouble(0) , buffer.getDouble(1))
  }
}





#testData.csv

m123;10.5;1
m123;20;10
m11;10;1
m11;30;1

> Spark Dataframe UDAF issue
> --------------------------
>
>                 Key: SPARK-20176
>                 URL: https://issues.apache.org/jira/browse/SPARK-20176
>             Project: Spark
>          Issue Type: IT Help
>          Components: Spark Core
>    Affects Versions: 2.0.2
>            Reporter: Dinesh Man Amatya
>
> Getting following error in custom UDAF
> Error while decoding: java.util.concurrent.ExecutionException: 
> java.lang.Exception: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 58, Column 33: Incompatible expression types "boolean" and "java.lang.Boolean"
> /* 001 */ public java.lang.Object generate(Object[] references) {
> /* 002 */   return new SpecificSafeProjection(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificSafeProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
> /* 006 */
> /* 007 */   private Object[] references;
> /* 008 */   private MutableRow mutableRow;
> /* 009 */   private Object[] values;
> /* 010 */   private Object[] values1;
> /* 011 */   private org.apache.spark.sql.types.StructType schema;
> /* 012 */   private org.apache.spark.sql.types.StructType schema1;
> /* 013 */
> /* 014 */
> /* 015 */   public SpecificSafeProjection(Object[] references) {
> /* 016 */     this.references = references;
> /* 017 */     mutableRow = (MutableRow) references[references.length - 1];
> /* 018 */
> /* 019 */
> /* 020 */     this.schema = (org.apache.spark.sql.types.StructType) 
> references[0];
> /* 021 */     this.schema1 = (org.apache.spark.sql.types.StructType) 
> references[1];
> /* 022 */   }
> /* 023 */
> /* 024 */   public java.lang.Object apply(java.lang.Object _i) {
> /* 025 */     InternalRow i = (InternalRow) _i;
> /* 026 */
> /* 027 */     values = new Object[2];
> /* 028 */
> /* 029 */     boolean isNull2 = i.isNullAt(0);
> /* 030 */     UTF8String value2 = isNull2 ? null : (i.getUTF8String(0));
> /* 031 */
> /* 032 */     boolean isNull1 = isNull2;
> /* 033 */     final java.lang.String value1 = isNull1 ? null : 
> (java.lang.String) value2.toString();
> /* 034 */     isNull1 = value1 == null;
> /* 035 */     if (isNull1) {
> /* 036 */       values[0] = null;
> /* 037 */     } else {
> /* 038 */       values[0] = value1;
> /* 039 */     }
> /* 040 */
> /* 041 */     boolean isNull5 = i.isNullAt(1);
> /* 042 */     InternalRow value5 = isNull5 ? null : (i.getStruct(1, 2));
> /* 043 */     boolean isNull3 = false;
> /* 044 */     org.apache.spark.sql.Row value3 = null;
> /* 045 */     if (!false && isNull5) {
> /* 046 */
> /* 047 */       final org.apache.spark.sql.Row value6 = null;
> /* 048 */       isNull3 = true;
> /* 049 */       value3 = value6;
> /* 050 */     } else {
> /* 051 */
> /* 052 */       values1 = new Object[2];
> /* 053 */
> /* 054 */       boolean isNull10 = i.isNullAt(1);
> /* 055 */       InternalRow value10 = isNull10 ? null : (i.getStruct(1, 2));
> /* 056 */
> /* 057 */       boolean isNull9 = isNull10 || false;
> /* 058 */       final boolean value9 = isNull9 ? false : (Boolean) 
> value10.isNullAt(0);
> /* 059 */       boolean isNull8 = false;
> /* 060 */       double value8 = -1.0;
> /* 061 */       if (!isNull9 && value9) {
> /* 062 */
> /* 063 */         final double value12 = -1.0;
> /* 064 */         isNull8 = true;
> /* 065 */         value8 = value12;
> /* 066 */       } else {
> /* 067 */
> /* 068 */         boolean isNull14 = i.isNullAt(1);
> /* 069 */         InternalRow value14 = isNull14 ? null : (i.getStruct(1, 2));
> /* 070 */         boolean isNull13 = isNull14;
> /* 071 */         double value13 = -1.0;
> /* 072 */
> /* 073 */         if (!isNull14) {
> /* 074 */
> /* 075 */           if (value14.isNullAt(0)) {
> /* 076 */             isNull13 = true;
> /* 077 */           } else {
> /* 078 */             value13 = value14.getDouble(0);
> /* 079 */           }
> /* 080 */
> /* 081 */         }
> /* 082 */         isNull8 = isNull13;
> /* 083 */         value8 = value13;
> /* 084 */       }
> /* 085 */       if (isNull8) {
> /* 086 */         values1[0] = null;
> /* 087 */       } else {
> /* 088 */         values1[0] = value8;
> /* 089 */       }
> /* 090 */
> /* 091 */       boolean isNull17 = i.isNullAt(1);
> /* 092 */       InternalRow value17 = isNull17 ? null : (i.getStruct(1, 2));
> /* 093 */
> /* 094 */       boolean isNull16 = isNull17 || false;
> /* 095 */       final boolean value16 = isNull16 ? false : (Boolean) 
> value17.isNullAt(1);
> /* 096 */       boolean isNull15 = false;
> /* 097 */       double value15 = -1.0;
> /* 098 */       if (!isNull16 && value16) {
> /* 099 */
> /* 100 */         final double value19 = -1.0;
> /* 101 */         isNull15 = true;
> /* 102 */         value15 = value19;
> /* 103 */       } else {
> /* 104 */
> /* 105 */         boolean isNull21 = i.isNullAt(1);
> /* 106 */         InternalRow value21 = isNull21 ? null : (i.getStruct(1, 2));
> /* 107 */         boolean isNull20 = isNull21;
> /* 108 */         double value20 = -1.0;
> /* 109 */
> /* 110 */         if (!isNull21) {
> /* 111 */
> /* 112 */           if (value21.isNullAt(1)) {
> /* 113 */             isNull20 = true;
> /* 114 */           } else {
> /* 115 */             value20 = value21.getDouble(1);
> /* 116 */           }
> /* 117 */
> /* 118 */         }
> /* 119 */         isNull15 = isNull20;
> /* 120 */         value15 = value20;
> /* 121 */       }
> /* 122 */       if (isNull15) {
> /* 123 */         values1[1] = null;
> /* 124 */       } else {
> /* 125 */         values1[1] = value15;
> /* 126 */       }
> /* 127 */
> /* 128 */       final org.apache.spark.sql.Row value7 = new 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values1, 
> schema);
> /* 129 */       isNull3 = false;
> /* 130 */       value3 = value7;
> /* 131 */     }
> /* 132 */     if (isNull3) {
> /* 133 */       values[1] = null;
> /* 134 */     } else {
> /* 135 */       values[1] = value3;
> /* 136 */     }
> /* 137 */
> /* 138 */     final org.apache.spark.sql.Row value = new 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values, 
> schema1);
> /* 139 */     if (false) {
> /* 140 */       mutableRow.setNullAt(0);
> /* 141 */     } else {
> /* 142 */
> /* 143 */       mutableRow.update(0, value);
> /* 144 */     }
> /* 145 */
> /* 146 */     return mutableRow;
> /* 147 */   }
> /* 148 */ }
> createexternalrow(input[0, string, true].toString, if (isnull(input[1, 
> struct<totalRxPaid:double,totalRxAllowedAmt:double>, true])) null else 
> createexternalrow(if (input[1, 
> struct<totalRxPaid:double,totalRxAllowedAmt:double>, true].isNullAt) null 
> else input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true].totalRxPaid, if (input[1, 
> struct<totalRxPaid:double,totalRxAllowedAmt:double>, true].isNullAt) null 
> else input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true].totalRxAllowedAmt, StructField(totalRxPaid,DoubleType,true), 
> StructField(totalRxAllowedAmt,DoubleType,true)), 
> StructField(dw_member_id,StringType,true), 
> StructField(test,StructType(StructField(totalRxPaid,DoubleType,true), 
> StructField(totalRxAllowedAmt,DoubleType,true)),true))
> :- input[0, string, true].toString
> :  +- input[0, string, true]
> +- if (isnull(input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true])) null else createexternalrow(if (input[1, 
> struct<totalRxPaid:double,totalRxAllowedAmt:double>, true].isNullAt) null 
> else input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true].totalRxPaid, if (input[1, 
> struct<totalRxPaid:double,totalRxAllowedAmt:double>, true].isNullAt) null 
> else input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true].totalRxAllowedAmt, StructField(totalRxPaid,DoubleType,true), 
> StructField(totalRxAllowedAmt,DoubleType,true))
>    :- isnull(input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true])
>    :  +- input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, true]
>    :- null
>    +- createexternalrow(if (input[1, 
> struct<totalRxPaid:double,totalRxAllowedAmt:double>, true].isNullAt) null 
> else input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true].totalRxPaid, if (input[1, 
> struct<totalRxPaid:double,totalRxAllowedAmt:double>, true].isNullAt) null 
> else input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true].totalRxAllowedAmt, StructField(totalRxPaid,DoubleType,true), 
> StructField(totalRxAllowedAmt,DoubleType,true))
>       :- if (input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true].isNullAt) null else input[1, 
> struct<totalRxPaid:double,totalRxAllowedAmt:double>, true].totalRxPaid
>       :  :- input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true].isNullAt
>       :  :  :- input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true]
>       :  :  +- 0
>       :  :- null
>       :  +- input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true].totalRxPaid
>       :     +- input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true]
>       +- if (input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true].isNullAt) null else input[1, 
> struct<totalRxPaid:double,totalRxAllowedAmt:double>, true].totalRxAllowedAmt
>          :- input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true].isNullAt
>          :  :- input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true]
>          :  +- 1
>          :- null
>          +- input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true].totalRxAllowedAmt
>             +- input[1, struct<totalRxPaid:double,totalRxAllowedAmt:double>, 
> true]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to