meatheadmike commented on issue #11341:
URL: https://github.com/apache/iceberg/issues/11341#issuecomment-2460554308
This bug does not appear to be limited to AWS nor Flink. I'm getting the
same error with the following:
```
spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS
{iceberg_catalog}.{iceberg_db}.{iceberg_table}
(
offset BIGINT NOT NULL,
kafka_timestamp TIMESTAMP NOT NULL,
partition_key INT NOT NULL,
domain STRING NOT NULL,
risk STRING,
timestamp TIMESTAMP NOT NULL
)
USING iceberg
LOCATION '{iceberg_table_path}'
TBLPROPERTIES (
'format-version'='2',
'write.delete.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'write.metadata.delete-after-commit.enabled'='true',
'write.update.mode'='merge-on-read',
'write.upsert.enabled'='true'
)
""").collect()
spark.sql(f"ALTER TABLE {iceberg_catalog}.{iceberg_db}.{iceberg_table} SET
IDENTIFIER FIELDS domain").collect()
def processMicroBatch(batch_df, batch_id):
batch_df.printSchema()
batch_df.show()
batch_df.createOrReplaceTempView("kafka_source")
batch_df.sparkSession.sql(f"""
MERGE INTO `{iceberg_catalog}`.`{iceberg_db}`.`{iceberg_table}` t
USING (SELECT * FROM `kafka_source`) s
ON t.domain = s.domain
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
df.writeStream \
.format("iceberg") \
.trigger(processingTime="1 minutes") \
.options(**iceberg_options) \
.outputMode("append") \
.foreachBatch(processMicroBatch) \
.start()
spark.streams.awaitAnyTermination()
```
The returned error is:
```pyspark.errors.exceptions.captured.IllegalArgumentException: Cannot add
fieldId 4 as an identifier field: field does not exist ```
Note that the identifier field is the 4th field in the schema.
So it would appear that it is currently not possible to do streaming upserts
with spark.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]