chandu-1101 opened a new issue, #8333:
URL: https://github.com/apache/iceberg/issues/8333
### Apache Iceberg version
1.3.1 (latest release)
### Query engine
Spark
### Please describe the bug 🐞
1. I created a iceberg table from existing parquet file
2. Then I am trying to insert cdc data into it and i get the below error.
--> I am not sure if this issue is because of bad data? If yes then how doi
debug the issue?
emr: 6.3.9
spark: 3.3.0
scala: 2_12
spark shell command
```
spark-shell --driver-memory 1g --executor-memory 1g --executor-cores 1
--driver-cores 1 --conf spark.sql.adaptive.enabled=true --conf
spark.sql.adaptive.coalescePartitions.enabled=true --conf
spark.sql.adaptive.coalescePartitions.minPartitionNum=1 --conf
spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf
spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf
spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --conf
spark.yarn.maxAppAttempts=1 --conf spark.yarn.maxAppAttempts=1 --conf
spark.yarn.submit.waitAppCompletion=false --files
/home/hadoop/jars/log4j2.properties --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
--conf spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.cata
log.local.type=hadoop --conf spark.sql.catalog.local.warehouse=$PWD/warehouse
--conf "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" --name
ravic --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1
--jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar
```
Exception::
```
ob aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 7.0 (TID 2711)
(ip-172-25-26-205.prod.phenom.local executor 23): java.lang.ClassCastException:
org.apache.spark.unsafe.types.UTF8String cannot be cast to
java.lang.Long+details
Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 7.0 (TID 2711)
(ip-172-25-26-205.prod.phenom.local executor 23): java.lang.ClassCastException:
org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
at
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
at
org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:95)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_32_62$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.addMetadataColumnsIfNeeded(FileScanRDD.scala:291)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:318)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:184)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
```
code
```
val sess = Application.spark();
val snapshotDf =
sess.read.parquet("s3://bucket/snapshots2/ge11-partitioned/")
// val _snapshotDf = snapshotDf.sortWithinPartitions("__created_date_")
snapshotDf.createOrReplaceTempView("snapshot")
sess.sql(""" CREATE TABLE ge112
USING iceberg
TBLPROPERTIES ('key'='_id.oid')
location 's3://bucket/snapshots2/ge11-ice2/'
as
select * from snapshot """)
sess.sql(""" alter table ge112 add partition field __created_date_ """)
// val cdcSchema1 =
SparkUtils.getSchema("s3://bucket/schemas/GE11GLOBAL_candidates-CandidatesList.json")
val cdcDf =
sess.read.schema(snapshotDf.schema).json("s3://bucket/inputs2/ge11-partitioned/")
cdcDf.createOrReplaceTempView("cdc")
val _cdcDf =sess.sql(""" select *
from cdc c1
where cdc_pk in (
select max(cdc_pk)
from cdc c2
where _id.oid is not null
and _id.oid !=''
and
c2.__created_date_=c1.__created_date_
group by _id.oid) """)
_cdcDf.registerTempTable("_cdc")
sess.sql(""" MERGE INTO ge112 t
using (
select *
from _cdc )u
on t._id.oid = u._id.oid
when matched then update set *
when not matched then insert * """)
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]