mahendrachandrasekhar opened a new issue, #9275:
URL: https://github.com/apache/iceberg/issues/9275
We are using a Java Custom Catalog with iceberg. The Table is created
properly, however we get an issue when we insert the data.
`public String createCustomTable(String tableName) {
try {
TableIdentifier tableIdentifier = TableIdentifier.of(name(),
tableName);
Schema schema = readSchema(tableIdentifier);
Map<String, String> properties = ImmutableMap.of(
TableProperties.DEFAULT_FILE_FORMAT,
FileFormat.PARQUET.name()
);
PartitionSpec partitionSpec = PartitionSpec.builderFor(schema)
.identity(getPartitionKeyfromSchema(tableIdentifier.name()))
.build();
String tableLocation = defaultLocation +
tableIdentifier.namespace().toString() + "/" + tableIdentifier.name();
catalog.createTable(tableIdentifier, schema, partitionSpec,
tableLocation, properties);
catalog.loadTable(TableIdentifier.of(name(), tableName));
return "Table created";
} catch (Exception e) {
return e.getMessage();
}
}
public String insertData(String tableName, String csvPath) throws
IOException {
Table icebergTable =
catalog.loadTable(TableIdentifier.of(name(), tableName));
SparkSession spark = SparkSession.builder()
.config("spark.master", "local")
.getOrCreate();
String headerJson = readHeaderJson(tableName);
LOGGER.info("Header JSON for {}: {}", tableName, headerJson);
String[] columns = headerJson.split(",");
Dataset<Row> df = spark.read()
.option("header", "false")
.option("inferSchema", "false")
.option("comment", "#")
.option("sep", "|")
.csv(csvPath)
.toDF(columns);
LOGGER.info("Actual columns: {}", Arrays.toString(df.columns()));
for (String col : df.columns()) {
df = df.withColumn(col, df.col(col).cast("string"));
}
df.write().format("iceberg").mode(SaveMode.Append).save(icebergTable.location());
LOGGER.info("Data inserted successfully into table: {}",
tableName);
}
`
When we execute this via a main program in Java it works perfectly
However when we create a jar out of this and call this from Spark it gives
us this error
`
ERROR:root:Error: An error occurred while calling o0.insertData.
: java.lang.ClassNotFoundException:
Failed to find data source: iceberg. Please find packages at
http://spark.apache.org/third-party-projects.html
at
org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:443)
at
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
at
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
at
org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:852)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at com.xyz.catalog.CustomCatalog.insertData(CustomCatalog.java:178)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: iceberg.DefaultSource
at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:656)
at scala.util.Try$.apply(Try.scala:210)
at
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:656)
at scala.util.Failure.orElse(Try.scala:221)
at
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:656)
... 16 more
`
We are trying to create and insert data in a iceberg table using a custom
catalog. We expect the insert to work properly. This is the place we are
getting the error
`
df.write().format("iceberg").mode(SaveMode.Append).save(icebergTable.location());
`
It does not work with passing the jar to spark-submit either.
`
spark-submit --master spark://spark:7077
/home/airflow/spark/app/my-module.py --packages
org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.4.1
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]