[jira] [Commented] (SPARK-20176) Spark Dataframe UDAF issue
[ https://issues.apache.org/jira/browse/SPARK-20176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954527#comment-15954527 ] Dinesh Man Amatya commented on SPARK-20176: --- Thanks Kazuaki for the effort. I was able to resolve the issue by upgrading the spark and scala version as follows, scala.version : 2.11.5 scala.compat.version : 2.11 spark.version : 2.1.0 > Spark Dataframe UDAF issue > -- > > Key: SPARK-20176 > URL: https://issues.apache.org/jira/browse/SPARK-20176 > Project: Spark > Issue Type: IT Help > Components: Spark Core >Affects Versions: 2.0.2 >Reporter: Dinesh Man Amatya > > Getting following error in custom UDAF > Error while decoding: java.util.concurrent.ExecutionException: > java.lang.Exception: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 58, Column 33: Incompatible expression types "boolean" and "java.lang.Boolean" > /* 001 */ public java.lang.Object generate(Object[] references) { > /* 002 */ return new SpecificSafeProjection(references); > /* 003 */ } > /* 004 */ > /* 005 */ class SpecificSafeProjection extends > org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection { > /* 006 */ > /* 007 */ private Object[] references; > /* 008 */ private MutableRow mutableRow; > /* 009 */ private Object[] values; > /* 010 */ private Object[] values1; > /* 011 */ private org.apache.spark.sql.types.StructType schema; > /* 012 */ private org.apache.spark.sql.types.StructType schema1; > /* 013 */ > /* 014 */ > /* 015 */ public SpecificSafeProjection(Object[] references) { > /* 016 */ this.references = references; > /* 017 */ mutableRow = (MutableRow) references[references.length - 1]; > /* 018 */ > /* 019 */ > /* 020 */ this.schema = (org.apache.spark.sql.types.StructType) > references[0]; > /* 021 */ this.schema1 = (org.apache.spark.sql.types.StructType) > references[1]; > /* 022 */ } > /* 023 */ > /* 024 */ public java.lang.Object apply(java.lang.Object _i) { > /* 025 */ InternalRow i = (InternalRow) _i; > /* 026 */ > /* 027 */ values = new Object[2]; > /* 028 */ > /* 029 */ boolean isNull2 = i.isNullAt(0); > /* 030 */ UTF8String value2 = isNull2 ? null : (i.getUTF8String(0)); > /* 031 */ > /* 032 */ boolean isNull1 = isNull2; > /* 033 */ final java.lang.String value1 = isNull1 ? null : > (java.lang.String) value2.toString(); > /* 034 */ isNull1 = value1 == null; > /* 035 */ if (isNull1) { > /* 036 */ values[0] = null; > /* 037 */ } else { > /* 038 */ values[0] = value1; > /* 039 */ } > /* 040 */ > /* 041 */ boolean isNull5 = i.isNullAt(1); > /* 042 */ InternalRow value5 = isNull5 ? null : (i.getStruct(1, 2)); > /* 043 */ boolean isNull3 = false; > /* 044 */ org.apache.spark.sql.Row value3 = null; > /* 045 */ if (!false && isNull5) { > /* 046 */ > /* 047 */ final org.apache.spark.sql.Row value6 = null; > /* 048 */ isNull3 = true; > /* 049 */ value3 = value6; > /* 050 */ } else { > /* 051 */ > /* 052 */ values1 = new Object[2]; > /* 053 */ > /* 054 */ boolean isNull10 = i.isNullAt(1); > /* 055 */ InternalRow value10 = isNull10 ? null : (i.getStruct(1, 2)); > /* 056 */ > /* 057 */ boolean isNull9 = isNull10 || false; > /* 058 */ final boolean value9 = isNull9 ? false : (Boolean) > value10.isNullAt(0); > /* 059 */ boolean isNull8 = false; > /* 060 */ double value8 = -1.0; > /* 061 */ if (!isNull9 && value9) { > /* 062 */ > /* 063 */ final double value12 = -1.0; > /* 064 */ isNull8 = true; > /* 065 */ value8 = value12; > /* 066 */ } else { > /* 067 */ > /* 068 */ boolean isNull14 = i.isNullAt(1); > /* 069 */ InternalRow value14 = isNull14 ? null : (i.getStruct(1, 2)); > /* 070 */ boolean isNull13 = isNull14; > /* 071 */ double value13 = -1.0; > /* 072 */ > /* 073 */ if (!isNull14) { > /* 074 */ > /* 075 */ if (value14.isNullAt(0)) { > /* 076 */ isNull13 = true; > /* 077 */ } else { > /* 078 */ value13 = value14.getDouble(0); > /* 079 */ } > /* 080 */ > /* 081 */ } > /* 082 */ isNull8 = isNull13; > /* 083 */ value8 = value13; > /* 084 */ } > /* 085 */ if (isNull8) { > /* 086 */ values1[0] = null; > /* 087 */ } else { > /* 088 */ values1[0] = value8; > /* 089 */ } > /* 090 */ > /* 091 */ boolean isNull17 = i.isNullAt(1); > /* 092 */ InternalRow value17 = isNull17 ? null : (i.getStruct(1, 2)); > /* 093 */ > /* 094 */ boolean isNull16 = isNull17 || false; > /* 095 */ final boolean value16 = isNull16 ? false : (Boolean) > value17.isNullAt(1); > /* 096 */ boolean
[jira] [Commented] (SPARK-20176) Spark Dataframe UDAF issue
[ https://issues.apache.org/jira/browse/SPARK-20176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954093#comment-15954093 ] Kazuaki Ishizaki commented on SPARK-20176: -- Thanks. The code seem to work for the master. I am investigating which change fixes the issue. > Spark Dataframe UDAF issue > -- > > Key: SPARK-20176 > URL: https://issues.apache.org/jira/browse/SPARK-20176 > Project: Spark > Issue Type: IT Help > Components: Spark Core >Affects Versions: 2.0.2 >Reporter: Dinesh Man Amatya > > Getting following error in custom UDAF > Error while decoding: java.util.concurrent.ExecutionException: > java.lang.Exception: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 58, Column 33: Incompatible expression types "boolean" and "java.lang.Boolean" > /* 001 */ public java.lang.Object generate(Object[] references) { > /* 002 */ return new SpecificSafeProjection(references); > /* 003 */ } > /* 004 */ > /* 005 */ class SpecificSafeProjection extends > org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection { > /* 006 */ > /* 007 */ private Object[] references; > /* 008 */ private MutableRow mutableRow; > /* 009 */ private Object[] values; > /* 010 */ private Object[] values1; > /* 011 */ private org.apache.spark.sql.types.StructType schema; > /* 012 */ private org.apache.spark.sql.types.StructType schema1; > /* 013 */ > /* 014 */ > /* 015 */ public SpecificSafeProjection(Object[] references) { > /* 016 */ this.references = references; > /* 017 */ mutableRow = (MutableRow) references[references.length - 1]; > /* 018 */ > /* 019 */ > /* 020 */ this.schema = (org.apache.spark.sql.types.StructType) > references[0]; > /* 021 */ this.schema1 = (org.apache.spark.sql.types.StructType) > references[1]; > /* 022 */ } > /* 023 */ > /* 024 */ public java.lang.Object apply(java.lang.Object _i) { > /* 025 */ InternalRow i = (InternalRow) _i; > /* 026 */ > /* 027 */ values = new Object[2]; > /* 028 */ > /* 029 */ boolean isNull2 = i.isNullAt(0); > /* 030 */ UTF8String value2 = isNull2 ? null : (i.getUTF8String(0)); > /* 031 */ > /* 032 */ boolean isNull1 = isNull2; > /* 033 */ final java.lang.String value1 = isNull1 ? null : > (java.lang.String) value2.toString(); > /* 034 */ isNull1 = value1 == null; > /* 035 */ if (isNull1) { > /* 036 */ values[0] = null; > /* 037 */ } else { > /* 038 */ values[0] = value1; > /* 039 */ } > /* 040 */ > /* 041 */ boolean isNull5 = i.isNullAt(1); > /* 042 */ InternalRow value5 = isNull5 ? null : (i.getStruct(1, 2)); > /* 043 */ boolean isNull3 = false; > /* 044 */ org.apache.spark.sql.Row value3 = null; > /* 045 */ if (!false && isNull5) { > /* 046 */ > /* 047 */ final org.apache.spark.sql.Row value6 = null; > /* 048 */ isNull3 = true; > /* 049 */ value3 = value6; > /* 050 */ } else { > /* 051 */ > /* 052 */ values1 = new Object[2]; > /* 053 */ > /* 054 */ boolean isNull10 = i.isNullAt(1); > /* 055 */ InternalRow value10 = isNull10 ? null : (i.getStruct(1, 2)); > /* 056 */ > /* 057 */ boolean isNull9 = isNull10 || false; > /* 058 */ final boolean value9 = isNull9 ? false : (Boolean) > value10.isNullAt(0); > /* 059 */ boolean isNull8 = false; > /* 060 */ double value8 = -1.0; > /* 061 */ if (!isNull9 && value9) { > /* 062 */ > /* 063 */ final double value12 = -1.0; > /* 064 */ isNull8 = true; > /* 065 */ value8 = value12; > /* 066 */ } else { > /* 067 */ > /* 068 */ boolean isNull14 = i.isNullAt(1); > /* 069 */ InternalRow value14 = isNull14 ? null : (i.getStruct(1, 2)); > /* 070 */ boolean isNull13 = isNull14; > /* 071 */ double value13 = -1.0; > /* 072 */ > /* 073 */ if (!isNull14) { > /* 074 */ > /* 075 */ if (value14.isNullAt(0)) { > /* 076 */ isNull13 = true; > /* 077 */ } else { > /* 078 */ value13 = value14.getDouble(0); > /* 079 */ } > /* 080 */ > /* 081 */ } > /* 082 */ isNull8 = isNull13; > /* 083 */ value8 = value13; > /* 084 */ } > /* 085 */ if (isNull8) { > /* 086 */ values1[0] = null; > /* 087 */ } else { > /* 088 */ values1[0] = value8; > /* 089 */ } > /* 090 */ > /* 091 */ boolean isNull17 = i.isNullAt(1); > /* 092 */ InternalRow value17 = isNull17 ? null : (i.getStruct(1, 2)); > /* 093 */ > /* 094 */ boolean isNull16 = isNull17 || false; > /* 095 */ final boolean value16 = isNull16 ? false : (Boolean) > value17.isNullAt(1); > /* 096 */ boolean isNull15 = false; > /* 097 */ double value15 = -1.0; > /* 098 */ if (!isNull16 && value16) { >
[jira] [Commented] (SPARK-20176) Spark Dataframe UDAF issue
[ https://issues.apache.org/jira/browse/SPARK-20176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953025#comment-15953025 ] Dinesh Man Amatya commented on SPARK-20176: --- Following is the shortened code for generating above error. There are three files Test.scala , TestUdaf.scala and testData.csv #Test.scala import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types._ import org.scalatest.{BeforeAndAfterEach, FunSuite} /** * Created by damatya on 4/3/17. */ class Test extends FunSuite with BeforeAndAfterEach { var sparkSession : SparkSession = _ var sc :SparkContext= _ override def beforeEach() { sparkSession = SparkSession.builder().appName("udf testings") .master("local") .config("", "") .getOrCreate() sc = sparkSession.sparkContext } override def afterEach() { sparkSession.stop() } test("test total") { val sqlContext = sparkSession.sqlContext val dataRdd = sc.textFile("/opt/projects/pa/DasBackend/SparkEngine/src/main/resources/testData.csv") val schemaString = "memberId;paidAmt;allowedAmt" val schema = StructType(schemaString.split(";").map(fieldName ⇒ StructField(fieldName, StringType, true))) val rowRdd = dataRdd.map{line => line.split(";", -1)}.map{ array => Row.fromSeq(array.toSeq)} val dataFrame = sqlContext.createDataFrame(rowRdd,schema) val testUdaf:TestUdaf = new TestUdaf(schema) val resultDataFrame = dataFrame.groupBy("memberId").agg(testUdaf(dataFrame.columns.map(dataFrame(_)):_*).as("totalAmountPair")) resultDataFrame.show(false) dataFrame.show() } } ##TestUdaf.scala import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, DoubleType, StructType} /** * Created by damatya on 4/3/17. */ class TestUdaf (inputSch:StructType) extends UserDefinedAggregateFunction{ override def inputSchema: StructType = inputSch //inputSchema = inputSch override def bufferSchema: StructType = new StructType() .add("totalRxPaid",DoubleType) .add("totalRxAllowedAmt",DoubleType) override def dataType: DataType = new StructType() .add("totalRxPaid",DoubleType) .add("totalRxAllowedAmt",DoubleType) override def deterministic: Boolean = false override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer.update(0,0D) buffer.update(1,0D) } override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { var paidAmount : Float = 0f var allowedAmount : Float = 0f try { paidAmount=input.getFloat(1) allowedAmount=input.getFloat(2) } catch { case e:Exception => println ("invalid amount") } val totalPaidAmount = buffer.getDouble(0)+paidAmount val totalAllowedAmount = buffer.getDouble(1)+allowedAmount buffer.update(0,totalPaidAmount) buffer.update(1,totalAllowedAmount) } override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1.update(0,buffer1.getDouble(0)+buffer2.getDouble(0)) buffer1.update(1,buffer1.getDouble(1)+buffer2.getDouble(1)) } override def evaluate(buffer: Row): Any = { (buffer.getDouble(0) , buffer.getDouble(1)) } } #testData.csv m123;10.5;1 m123;20;10 m11;10;1 m11;30;1 > Spark Dataframe UDAF issue > -- > > Key: SPARK-20176 > URL: https://issues.apache.org/jira/browse/SPARK-20176 > Project: Spark > Issue Type: IT Help > Components: Spark Core >Affects Versions: 2.0.2 >Reporter: Dinesh Man Amatya > > Getting following error in custom UDAF > Error while decoding: java.util.concurrent.ExecutionException: > java.lang.Exception: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 58, Column 33: Incompatible expression types "boolean" and "java.lang.Boolean" > /* 001 */ public java.lang.Object generate(Object[] references) { > /* 002 */ return new SpecificSafeProjection(references); > /* 003 */ } > /* 004 */ > /* 005 */ class SpecificSafeProjection extends > org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection { > /* 006 */ > /* 007 */ private Object[] references; > /* 008 */ private MutableRow mutableRow; > /* 009 */ private Object[] values; > /* 010 */ private Object[] values1; > /* 011 */ private org.apache.spark.sql.types.StructType schema; > /* 012 */ private org.apache.spark.sql.types.StructType schema1; > /* 013 */ > /* 014 */ > /* 015 */ public SpecificSafeProjection(Object[] references) { > /* 016 */ this.references = references; > /* 017 */ mutableRow = (MutableRow) references[references.length - 1]; > /* 018 */ >
[jira] [Commented] (SPARK-20176) Spark Dataframe UDAF issue
[ https://issues.apache.org/jira/browse/SPARK-20176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15951473#comment-15951473 ] Kazuaki Ishizaki commented on SPARK-20176: -- Could you please post the program that can reproduce this issue? > Spark Dataframe UDAF issue > -- > > Key: SPARK-20176 > URL: https://issues.apache.org/jira/browse/SPARK-20176 > Project: Spark > Issue Type: IT Help > Components: Spark Core >Affects Versions: 2.0.2 >Reporter: Dinesh Man Amatya > > Getting following error in custom UDAF > Error while decoding: java.util.concurrent.ExecutionException: > java.lang.Exception: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 58, Column 33: Incompatible expression types "boolean" and "java.lang.Boolean" > /* 001 */ public java.lang.Object generate(Object[] references) { > /* 002 */ return new SpecificSafeProjection(references); > /* 003 */ } > /* 004 */ > /* 005 */ class SpecificSafeProjection extends > org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection { > /* 006 */ > /* 007 */ private Object[] references; > /* 008 */ private MutableRow mutableRow; > /* 009 */ private Object[] values; > /* 010 */ private Object[] values1; > /* 011 */ private org.apache.spark.sql.types.StructType schema; > /* 012 */ private org.apache.spark.sql.types.StructType schema1; > /* 013 */ > /* 014 */ > /* 015 */ public SpecificSafeProjection(Object[] references) { > /* 016 */ this.references = references; > /* 017 */ mutableRow = (MutableRow) references[references.length - 1]; > /* 018 */ > /* 019 */ > /* 020 */ this.schema = (org.apache.spark.sql.types.StructType) > references[0]; > /* 021 */ this.schema1 = (org.apache.spark.sql.types.StructType) > references[1]; > /* 022 */ } > /* 023 */ > /* 024 */ public java.lang.Object apply(java.lang.Object _i) { > /* 025 */ InternalRow i = (InternalRow) _i; > /* 026 */ > /* 027 */ values = new Object[2]; > /* 028 */ > /* 029 */ boolean isNull2 = i.isNullAt(0); > /* 030 */ UTF8String value2 = isNull2 ? null : (i.getUTF8String(0)); > /* 031 */ > /* 032 */ boolean isNull1 = isNull2; > /* 033 */ final java.lang.String value1 = isNull1 ? null : > (java.lang.String) value2.toString(); > /* 034 */ isNull1 = value1 == null; > /* 035 */ if (isNull1) { > /* 036 */ values[0] = null; > /* 037 */ } else { > /* 038 */ values[0] = value1; > /* 039 */ } > /* 040 */ > /* 041 */ boolean isNull5 = i.isNullAt(1); > /* 042 */ InternalRow value5 = isNull5 ? null : (i.getStruct(1, 2)); > /* 043 */ boolean isNull3 = false; > /* 044 */ org.apache.spark.sql.Row value3 = null; > /* 045 */ if (!false && isNull5) { > /* 046 */ > /* 047 */ final org.apache.spark.sql.Row value6 = null; > /* 048 */ isNull3 = true; > /* 049 */ value3 = value6; > /* 050 */ } else { > /* 051 */ > /* 052 */ values1 = new Object[2]; > /* 053 */ > /* 054 */ boolean isNull10 = i.isNullAt(1); > /* 055 */ InternalRow value10 = isNull10 ? null : (i.getStruct(1, 2)); > /* 056 */ > /* 057 */ boolean isNull9 = isNull10 || false; > /* 058 */ final boolean value9 = isNull9 ? false : (Boolean) > value10.isNullAt(0); > /* 059 */ boolean isNull8 = false; > /* 060 */ double value8 = -1.0; > /* 061 */ if (!isNull9 && value9) { > /* 062 */ > /* 063 */ final double value12 = -1.0; > /* 064 */ isNull8 = true; > /* 065 */ value8 = value12; > /* 066 */ } else { > /* 067 */ > /* 068 */ boolean isNull14 = i.isNullAt(1); > /* 069 */ InternalRow value14 = isNull14 ? null : (i.getStruct(1, 2)); > /* 070 */ boolean isNull13 = isNull14; > /* 071 */ double value13 = -1.0; > /* 072 */ > /* 073 */ if (!isNull14) { > /* 074 */ > /* 075 */ if (value14.isNullAt(0)) { > /* 076 */ isNull13 = true; > /* 077 */ } else { > /* 078 */ value13 = value14.getDouble(0); > /* 079 */ } > /* 080 */ > /* 081 */ } > /* 082 */ isNull8 = isNull13; > /* 083 */ value8 = value13; > /* 084 */ } > /* 085 */ if (isNull8) { > /* 086 */ values1[0] = null; > /* 087 */ } else { > /* 088 */ values1[0] = value8; > /* 089 */ } > /* 090 */ > /* 091 */ boolean isNull17 = i.isNullAt(1); > /* 092 */ InternalRow value17 = isNull17 ? null : (i.getStruct(1, 2)); > /* 093 */ > /* 094 */ boolean isNull16 = isNull17 || false; > /* 095 */ final boolean value16 = isNull16 ? false : (Boolean) > value17.isNullAt(1); > /* 096 */ boolean isNull15 = false; > /* 097 */ double value15 = -1.0; > /* 098 */ if (!isNull16 && value16) { > /* 099 */ > /* 100 */