HannesDS opened a new issue, #6266:
URL: https://github.com/apache/iceberg/issues/6266
### Apache Iceberg version
1.0.0 (latest release)
### Query engine
Spark
### Please describe the bug 🐞
**Application**:
We are trying to use spark structured streaming in combination with iceberg.
The application is containerised and behind the scenes running on EKS. Spark
structured streaming worked well until we tried to add the iceberg extension.
We have used iceberg successfully before in a local spark session before (not
structured streaming).
**Environment**
We have tried with the following versions:
Iceberg version: 1.0.0 ; 0.14.1 ; 0.13.1
Spark version: 3.2.0 ; 3.0.3
AWS SDK version: 2.17.257 ; 2.18.22
Scala version: 2.12
Extra configuration included into the spark session:
`
REMOTE_CATALOG_NAME: str = "remote_glue_catalog"
config.setAll(
[
(
"spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.0_2.12:0.14.1",
),
("spark.jars.packages",
"software.amazon.awssdk:bundle:2.17.257"),
(
"spark.jars.packages",
"software.amazon.awssdk:url-connection-client:2.17.257",
),
(
f"spark.sql.catalog.{REMOTE_CATALOG_NAME}",
"org.apache.iceberg.spark.SparkCatalog",
),
(
f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.warehouse",
f"{aws.datalake_root_path()}/{table_name}/",
),
(
f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog",
),
(
f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO",
),
(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
),
(f"spark.sql.defaultCatalog", REMOTE_CATALOG_NAME),
]
)`
**Stacktrace**
`22/11/24 09:18:36 WARN SparkSession: Cannot use
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions to configure
session extensions.
java.lang.ClassNotFoundException:
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
at java.base/java.net.URLClassLoader.findClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Unknown Source)
at org.apache.spark.util.Utils$.classForName(Utils.scala:206)
at
org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1160)
at
org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1158)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1158)
at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:101)
at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
Source)
at
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
Source)
at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
....
22/11/24 09:18:45
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:564)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at
org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
22/11/24 09:18:45 ERROR MicroBatchExecution: Query [id = , runId = ]
terminated with error
org.apache.spark.SparkException: Cannot find catalog plugin class for
catalog 'remote_glue_catalog': org.apache.iceberg.spark.SparkCatalog
at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:66)
at
org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at
org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52)
at
org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:122)
at
org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$currentNamespace$1(CatalogManager.scala:100)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:100)
at
org.apache.spark.sql.catalyst.optimizer.GetCurrentDatabase.apply(finishAnalysis.scala:98)
at
org.apache.spark.sql.catalyst.optimizer.GetCurrentDatabase.apply(finishAnalysis.scala:95)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
at
scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
at
scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
at scala.collection.immutable.List.foreach(List.scala:392)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
at
org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:138)
at
org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:81)
at
org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
at
org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:90)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:108)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:105)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:574)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:564)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
ERROR:root:<traceback object at 0x7f50f454a640>
Traceback (most recent call last):
File "/opt/spark/work-dir/src/app/streaming/main.py", line 11, in <module>
streaming_main(registry=streaming_registry, )
File "/opt/spark/work-dir/src/app/app.py", line 20, in main
start_job(session, job=job, environment=args.env, kwargs=args.kwargs)
File "/opt/spark/work-dir/src/app/app.py", line 44, in start_job
job.run(session, kwargs=kwargs)
File
"/opt/spark/work-dir/src/elia_streaming/jobs/current_system_imbalance/producer.py",
line 31, in run
iceberg.awaitTermination()
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 103,
in awaitTermination
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
1304, in __call__
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134, in
deco
File "<string>", line 3, in raise_from
pyspark.sql.utils.StreamingQueryException: Cannot find catalog plugin class
for catalog 'remote_glue_catalog': org.apache.iceberg.spark.SparkCatalog`
**Debug checks**:
- [x] We have checked the docker container if the jars are included.
- [x] We checked the spark config at runtime (logged) to make sure all
options were included which they were
- [x] Checked several version of spark + iceberg
**Question**
So anybody know what is going on?
My guess is that with spark structured streaming the jars aren't really
added to the classpath for the different executors but not sure.
Thanks in advance!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]