ottensjors opened a new issue, #6388:
URL: https://github.com/apache/iceberg/issues/6388
### Apache Iceberg version
1.0.0
### Query engine
Spark
### Please describe the bug 🐞
Hi team,
i'm currently using apache spark 3.3 with apache-iceberg 1.0.0 and AWS S3
and GlueCatalog-integration. please find my spark-config below:
```
# add Iceberg dependency
ICEBERG_VERSION=1.0.0
DEPENDENCIES="org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:$ICEBERG_VERSION"
DEPENDENCIES+=",org.apache.iceberg:iceberg-hive-runtime:1.0.0,org.apache.hadoop:hadoop-aws:3.3.2"
# add AWS dependnecy
AWS_SDK_VERSION=2.17.257
AWS_MAVEN_GROUP=software.amazon.awssdk
AWS_PACKAGES=(
"bundle"
"url-connection-client"
)
for pkg in "${AWS_PACKAGES[@]}"; do
DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION"
done
# start Spark SQL client shell --> works with aws glue catalog
spark-shell --packages $DEPENDENCIES \
--conf
spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf
spark.sql.catalog.my_catalog.warehouse=s3://<s3-bucket>/adhoc-otten/iceberg \
--conf
spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
\
--conf
spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
\
--conf spark.sql.hive.metastore.version=2.3.9 \
--conf
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
\
--conf
hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
```
the spark-session has successfully integrated with Glue catalog and AWS S3.
which is great. I'm able to use all functionallity when it comes to regular
dataframes.
However, I'm currently evaluating apache iceberg with spark structured
streaming. Unfortunately i'm running into an issue. below you can find the code
i use to eveluate it:
```
val df =
spark.readStream.format("iceberg").option("stream-from-timestamp",1670487707799L).option("streaming-skip-overwrite-snapshots","true").load("<table_name>")
df: org.apache.spark.sql.DataFrame = [ITEM_ID: int, ITEM_KEY: bigint ... 5
more fields]
df.writeStream.format("iceberg").outputMode("append").format("iceberg").option("path","default.dimitem_stream").option("checkpointLocation","s3a://<bucket-name>/adhoc-otten/iceberg/default/dimitem_stream/checkpoint").start()
res4: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@22dac77c
```
it all seems to go well. but after a few seconds the following error
manifests itself:
```
22/12/08 14:00:37 ERROR MicroBatchExecution: Query [id =
4db07e32-5e9e-44d9-8a89-7d1c0f64e716, runId =
ce0a9e92-efff-4be8-b9f7-ab8ae9ac179c] terminated with error
org.apache.spark.SparkException: The Spark SQL phase planning failed with an
internal error. Please, fill a bug report in, and provide the full stack trace.
at
org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:657)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:647)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.lang.NullPointerException: Cannot invoke
"org.apache.iceberg.Snapshot.operation()" because "snapshot" is null
at
org.apache.iceberg.spark.source.SparkMicroBatchStream.shouldProcess(SparkMicroBatchStream.java:230)
at
org.apache.iceberg.spark.source.SparkMicroBatchStream.planFiles(SparkMicroBatchStream.java:211)
at
org.apache.iceberg.spark.source.SparkMicroBatchStream.planInputPartitions(SparkMicroBatchStream.java:148)
at
org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.inputPartitions$lzycompute(MicroBatchScanExec.scala:45)
at
org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.inputPartitions(MicroBatchScanExec.scala:45)
at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:142)
at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:141)
at
org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.supportsColumnar(MicroBatchScanExec.scala:29)
at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:153)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
at
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:459)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
```
the main takeaway here is the following line:
`Caused by: java.lang.NullPointerException: Cannot invoke
"org.apache.iceberg.Snapshot.operation()" because "snapshot" is null`
it's probably me doing something stupid and being PICNIC (Problem In Chair
Not In Computer).
However i went to the sourcecode of apache iceberg and was unable to find
any assertion or message that reflects the above statement or any additional
information on the internet.
I was wondering if someone could provide me with some much needed insights
to get this resolved.
Thanks in advance!
regards,
Sjors Otten
principal architect
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]