chandu-1101 opened a new issue, #8333:
URL: https://github.com/apache/iceberg/issues/8333

   ### Apache Iceberg version
   
   1.3.1 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   1. I created a iceberg table from existing parquet file
   2. Then I am trying to insert cdc data into it and i get the below error.
   
   --> I am not sure if this issue is because of bad data? If yes then how doi 
debug the issue?
   
   emr: 6.3.9
   spark: 3.3.0
   scala: 2_12
   
   spark shell command
   
   ```
   spark-shell --driver-memory 1g --executor-memory 1g --executor-cores 1 
--driver-cores 1 --conf spark.sql.adaptive.enabled=true --conf 
spark.sql.adaptive.coalescePartitions.enabled=true --conf 
spark.sql.adaptive.coalescePartitions.minPartitionNum=1 --conf 
spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --conf 
spark.yarn.maxAppAttempts=1 --conf spark.yarn.maxAppAttempts=1   --conf 
spark.yarn.submit.waitAppCompletion=false  --files 
/home/hadoop/jars/log4j2.properties --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 --conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog 
--conf spark.sql.catalog.spark_catalog.type=hive --conf 
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf 
spark.sql.cata
 log.local.type=hadoop  --conf spark.sql.catalog.local.warehouse=$PWD/warehouse 
 --conf "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" --name 
ravic  --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1  
--jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar
   
   ```
   
   Exception::
   ```
   ob aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most 
recent failure: Lost task 0.3 in stage 7.0 (TID 2711) 
(ip-172-25-26-205.prod.phenom.local executor 23): java.lang.ClassCastException: 
org.apache.spark.unsafe.types.UTF8String cannot be cast to 
java.lang.Long+details
   Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most 
recent failure: Lost task 0.3 in stage 7.0 (TID 2711) 
(ip-172-25-26-205.prod.phenom.local executor 23): java.lang.ClassCastException: 
org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long
        at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
        at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
        at 
org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:95)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_32_62$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.addMetadataColumnsIfNeeded(FileScanRDD.scala:291)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:318)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:184)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:138)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   Driver stacktrace:
   ```
   
   code
   ```
     val sess = Application.spark();
       val snapshotDf = 
sess.read.parquet("s3://bucket/snapshots2/ge11-partitioned/")
       // val _snapshotDf = snapshotDf.sortWithinPartitions("__created_date_")
       snapshotDf.createOrReplaceTempView("snapshot")
       sess.sql(""" CREATE TABLE ge112 
                     USING iceberg 
                     TBLPROPERTIES ('key'='_id.oid') 
                     location 's3://bucket/snapshots2/ge11-ice2/'  
                     as 
                       select * from snapshot """)
       sess.sql(""" alter table ge112 add partition field __created_date_  """) 
            
   
       // val cdcSchema1 = 
SparkUtils.getSchema("s3://bucket/schemas/GE11GLOBAL_candidates-CandidatesList.json")
       val cdcDf = 
sess.read.schema(snapshotDf.schema).json("s3://bucket/inputs2/ge11-partitioned/")
       cdcDf.createOrReplaceTempView("cdc")
       val _cdcDf =sess.sql(""" select * 
                                from cdc c1 
                                where cdc_pk in ( 
                                                   select max(cdc_pk) 
                                                   from cdc c2 
                                                   where _id.oid is not null 
                                                     and _id.oid !='' 
                                                     and 
c2.__created_date_=c1.__created_date_  
                                                   group by _id.oid)  """)
       _cdcDf.registerTempTable("_cdc")
       sess.sql(""" MERGE INTO ge112 t 
                   using ( 
                           select * 
                           from _cdc )u 
                   on t._id.oid = u._id.oid 
                     when matched then update set * 
                     when not matched then insert * """)
       
     }
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to