Hello,
Newbie here.Need help to figure out the issue here.
Doing a simple local spark Save using Iceberg with S3.
I see that my metadata folder was created in S3, so my schema/table
creation was successful.
When I try to run a Spark Write, I get a NullPointerException at
java.lang.NullPointerException
at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77)
at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:28)
at org.apache.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:331)
at com.google.common.collect.Iterators$6.transform(Iterators.java:783)
at
com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
at com.google.common.collect.Iterators.addAll(Iterators.java:356)
at com.google.common.collect.Lists.newArrayList(Lists.java:143)
at com.google.common.collect.Lists.newArrayList(Lists.java:130)
at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:55)
at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:28)
at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:364)
at org.apache.iceberg.types.TypeUtil$VisitFuture.get(TypeUtil.java:316)
at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:40)
at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:28)
at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:336)
at org.apache.iceberg.types.TypeUtil.reassignIds(TypeUtil.java:137)
at
org.apache.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:163)
at
org.apache.iceberg.spark.source.IcebergSource.validateWriteSchema(IcebergSource.java:146)
at
org.apache.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:76)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:255)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
Maybe I am making a mistake in Schema Creation?
new Schema(
required(1, "mwId", Types.StringType.get()),
required(2, "mwVersion", LongType.get()),
required(3, "id", Types.LongType.get()),
required(4, "id_str", Types.StringType.get()),
optional(5, "text", Types.StringType.get()),
optional(6, "created_at", Types.StringType.get()),
optional(7, "lang", Types.StringType.get())
);
PartitionSpec I used was PartitionSpec.unpartitioned();
The write code I used was:
Dataset<TweetItem> myDS;
...... (populate myDS)
myDS.write()
.format("iceberg")
.mode("append")
.save(getTableLocation());
If I do a PrintSchema, I get:
root
|-- columns: struct (nullable = true)
|-- created_at: string (nullable = true)
|-- encoder: struct (nullable = true)
|-- id: long (nullable = true)
|-- id_str: string (nullable = true)
|-- lang: string (nullable = true)
|-- mwId: string (nullable = true)
|-- mwVersion: long (nullable = true)
|-- partitionSpec: struct (nullable = true)
|-- schema: struct (nullable = true)
| |-- aliases: map (nullable = true)
| | |-- key: string
| | |-- value: integer (valueContainsNull = true)
|-- tableLocation: string (nullable = true)
|-- text: string (nullable = true)
Appreciate your help.
regards
Sandeep
--
The
information contained in this email may be confidential. It has been
sent for the sole use of the intended recipient(s). If the
reader of this
email is not an intended recipient, you are hereby
notified that any
unauthorized review, use, disclosure, dissemination,
distribution, or
copying of this message is strictly prohibited. If you
have received this
email in error, please notify
the sender immediately and destroy all copies
of the message.