CharanHS30 opened a new issue, #10882:
URL: https://github.com/apache/iceberg/issues/10882
### Query engine
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.io.File
import scala.reflect.io.Directory
class SparkIcebergTest extends AnyFlatSpec with Matchers
with BeforeAndAfter with BeforeAndAfterAll with Logging {
val warehouseDir: String = "src/test/resources/warehouse"
lazy val conf: SparkConf = new SparkConf()
// this needs to match the version of iceberg used in Dependencies.scala
conf.set("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0")
conf.set("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog")
conf.set("spark.sql.catalog.local",
"org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.local.type", "hadoop")
conf.set("spark.sql.catalog.local.warehouse", warehouseDir)
lazy val spark: SparkSession =
SparkSession
.builder()
.appName("iceberg-merge-into-test")
.master("local[*]")
.config(conf)
.enableHiveSupport()
.getOrCreate()
def cleanWarehouse(): Unit = {
if (new Directory(new File(warehouseDir)).deleteRecursively()) {
log.debug("deleted hadoop warehouse dir")
} else {
log.debug("nothing to delete")
}
}
override def afterAll(): Unit = {
spark.stop()
cleanWarehouse()
()
}
("instantiating a spark session with iceberg support" should "allow to
create an iceberg table") in {
spark.sql("CREATE TABLE local.db.table (id int, data string) USING
iceberg").show()
spark.sql("show tables in local.db").count() should be(1)
}
(it should "allow to insert data into an iceberg table") in {
spark.sql("INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b')")
val df: DataFrame = spark.sql("SELECT * FROM local.db.table")
df.count() should be(2)
}
(it should "allow to insert more data into a table") in {
spark.sql("INSERT INTO local.db.table VALUES (3, 'a'), (4, 'b')")
val df: DataFrame = spark.sql("SELECT * FROM local.db.table")
df.count() should be(4)
}
(it should "allow to read data from table with spark read format iceberg")
in {
spark.read.format("iceberg").load("local.db.table").createOrReplaceTempView("source")
val df: DataFrame = spark.sql("SELECT * from source")
df.count() should be(4)
}
(it should "allow to merge records into a table without duplicates") in {
spark.sql("CREATE TABLE local.db.table2 (id int, data string) USING
iceberg").show()
spark.sql("INSERT INTO local.db.table2 VALUES (3, 'a'), (5, 'c')")
// (3, 'a') would be a duplicate --> should not be written
spark.sql(
"""MERGE INTO local.db.table2 t
|USING (SELECT * FROM source) t2
|ON t.id = t2.id
|WHEN NOT MATCHED THEN INSERT *"""
.stripMargin)
val df: DataFrame = spark.sql("SELECT * FROM local.db.table2")
df.count() should be(5)
}
}
### Question
Spark version: 3.3.2
Iceberg version: 1.3.0
Q1. Do you have spark.sql.extensions?
Ans: Yes.
Q2. Do you have the iceberg-spark-runtime package?
Ans: Yes.
I am still getting the error "MERGE INTO TABLE is not supported
temporarily." How can I fix this issue? Kindly help me here.
Hi @ru, I have seen that you have provided many solutions for this, but none
of them have worked for my use case. Kindly help me here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]