[ https://issues.apache.org/jira/browse/HUDI-1711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nishith Agarwal reassigned HUDI-1711: ------------------------------------- Assignee: sivabalan narayanan > Avro Schema Exception with Spark 3.0 in 0.7 > ------------------------------------------- > > Key: HUDI-1711 > URL: https://issues.apache.org/jira/browse/HUDI-1711 > Project: Apache Hudi > Issue Type: Bug > Components: DeltaStreamer > Reporter: Balaji Varadarajan > Assignee: sivabalan narayanan > Priority: Major > Labels: sev:triage, user-support-issues > > GH: [https://github.com/apache/hudi/issues/2705] > > > {{21/03/22 10:10:35 WARN util.package: Truncated the string representation of > a plan since it was too large. This behavior can be adjusted by setting > 'spark.sql.debug.maxToStringFields'. > 21/03/22 10:10:35 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 > (TID 1) > java.lang.RuntimeException: Error while decoding: > java.lang.NegativeArraySizeException: -1255727808 > createexternalrow(if (isnull(input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true])) null else createexternalrow(if (input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].id, if (input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].name.toString, if (input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].type.toString, if (input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].url.toString, if (input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].user.toString, if (input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].password.toString, if (input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].create_time.toString, if (input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].create_user.toString, if (input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].update_time.toString, if (input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].update_user.toString, if (input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[0, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].del_flag, StructField(id,IntegerType,false), > StructField(name,StringType,true), StructField(type,StringType,true), > StructField(url,StringType,true), StructField(user,StringType,true), > StructField(password,StringType,true), > StructField(create_time,StringType,true), > StructField(create_user,StringType,true), > StructField(update_time,StringType,true), > StructField(update_user,StringType,true), > StructField(del_flag,IntegerType,true)), if (isnull(input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true])) null else createexternalrow(if (input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].id, if (input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].name.toString, if (input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].type.toString, if (input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].url.toString, if (input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].user.toString, if (input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].password.toString, if (input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].create_time.toString, if (input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].create_user.toString, if (input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].update_time.toString, if (input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].update_user.toString, if (input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].isNullAt) null else input[1, > struct<id:int,name:string,type:string,url:string,user:string,password:string,create_time:string,create_user:string,update_time:string,update_user:string,del_flag:int>, > true].del_flag, StructField(id,IntegerType,false), > StructField(name,StringType,true), StructField(type,StringType,true), > StructField(url,StringType,true), StructField(user,StringType,true), > StructField(password,StringType,true), > StructField(create_time,StringType,true), > StructField(create_user,StringType,true), > StructField(update_time,StringType,true), > StructField(update_user,StringType,true), > StructField(del_flag,IntegerType,true)), if (isnull(input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false])) null else createexternalrow(if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].version.toString, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].connector.toString, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].name.toString, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].ts_ms, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].snapshot.toString, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].db.toString, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].table.toString, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].server_id, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].gtid.toString, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].file.toString, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].pos, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].row, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].thread, if (input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].isNullAt) null else input[2, > struct<version:string,connector:string,name:string,ts_ms:bigint,snapshot:string,db:string,table:string,server_id:bigint,gtid:string,file:string,pos:bigint,row:int,thread:bigint,query:string>, > false].query.toString, StructField(version,StringType,false), > StructField(connector,StringType,false), StructField(name,StringType,false), > StructField(ts_ms,LongType,false), StructField(snapshot,StringType,true), > StructField(db,StringType,false), StructField(table,StringType,true), > StructField(server_id,LongType,false), StructField(gtid,StringType,true), > StructField(file,StringType,false), ... 4 more fields), input[3, string, > false].toString, input[4, bigint, true], if (isnull(input[5, > struct<id:string,total_order:bigint,data_collection_order:bigint>, true])) > null else createexternalrow(if (input[5, > struct<id:string,total_order:bigint,data_collection_order:bigint>, > true].isNullAt) null else input[5, > struct<id:string,total_order:bigint,data_collection_order:bigint>, > true].id.toString, if (input[5, > struct<id:string,total_order:bigint,data_collection_order:bigint>, > true].isNullAt) null else input[5, > struct<id:string,total_order:bigint,data_collection_order:bigint>, > true].total_order, if (input[5, > struct<id:string,total_order:bigint,data_collection_order:bigint>, > true].isNullAt) null else input[5, > struct<id:string,total_order:bigint,data_collection_order:bigint>, > true].data_collection_order, StructField(id,StringType,false), > StructField(total_order,LongType,false), > StructField(data_collection_order,LongType,false)), > StructField(before,StructType(StructField(id,IntegerType,false), > StructField(name,StringType,true), StructField(type,StringType,true), > StructField(url,StringType,true), StructField(user,StringType,true), > StructField(password,StringType,true), > StructField(create_time,StringType,true), > StructField(create_user,StringType,true), > StructField(update_time,StringType,true), > StructField(update_user,StringType,true), > StructField(del_flag,IntegerType,true)),true), > StructField(after,StructType(StructField(id,IntegerType,false), > StructField(name,StringType,true), StructField(type,StringType,true), > StructField(url,StringType,true), StructField(user,StringType,true), > StructField(password,StringType,true), > StructField(create_time,StringType,true), > StructField(create_user,StringType,true), > StructField(update_time,StringType,true), > StructField(update_user,StringType,true), > StructField(del_flag,IntegerType,true)),true), > StructField(source,StructType(StructField(version,StringType,false), > StructField(connector,StringType,false), StructField(name,StringType,false), > StructField(ts_ms,LongType,false), StructField(snapshot,StringType,true), > StructField(db,StringType,false), StructField(table,StringType,true), > StructField(server_id,LongType,false), StructField(gtid,StringType,true), > StructField(file,StringType,false), StructField(pos,LongType,false), > StructField(row,IntegerType,false), StructField(thread,LongType,true), > StructField(query,StringType,true)),false), StructField(op,StringType,false), > StructField(ts_ms,LongType,true), > StructField(transaction,StructType(StructField(id,StringType,false), > StructField(total_order,LongType,false), > StructField(data_collection_order,LongType,false)),true)) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:188) > at > org.apache.hudi.Spark3RowDeserializer.deserializeRow(Spark3RowDeserializer.scala:31) > at > org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$1(HoodieSparkUtils.scala:103) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) > at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) > at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) > at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) > at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) > at scala.collection.AbstractIterator.to(Iterator.scala:1429) > at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) > at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) > at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) > at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) > at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1423) > at > org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:127) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: java.lang.NegativeArraySizeException: -1255727808 > at > org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:298) > at > org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1358) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.If_6$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_3_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184) > ... 31 more > 21/03/22 10:10:35 INFO executor.CoarseGrainedExecutorBackend: Got assigned > task 2}} -- This message was sent by Atlassian Jira (v8.3.4#803005)