Henryk,
I could reproduce your issue and achieve the desired result using SQL DDL.
Here's the workaround.
package replicator
import org.apache.spark.sql.SparkSession
object Bucketing extends App {
val spark = SparkSession.builder()
.appName("ReproduceError")
.master("local[*]")
// Configure the Iceberg catalog
.config("spark.sql.catalog.my_catalog",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.my_catalog.type", "hadoop")
.config("spark.sql.catalog.my_catalog.warehouse",
"file:///tmp/iceberg_warehouse")
.getOrCreate()
import spark.implicits._
val df = Seq(
("2023-10-01", 1, 10),
("2023-10-02", 2, 20),
("2023-10-03", 3, 30)
).toDF("date", "x", "y")
spark.sql("""
CREATE TABLE IF NOT EXISTS my_catalog.default.some_table (
date STRING,
x INT,
y INT
)
USING iceberg
PARTITIONED BY (date, x, bucket(10, y))
""")
// Step 2: Write data to the Iceberg table
df.writeTo("my_catalog.default.some_table")
.append()
}
Seems like the V2 writer doesn't support a transform function like bucket
inside partitionedBy
Best Regards
Soumasish Goswami
in: www.linkedin.com/in/soumasish
# (415) 530-0405
-
On Fri, Nov 29, 2024 at 4:38 AM Henryk Česnolovič <
[email protected]> wrote:
> Hello.
>
> Maybe somebody has faced the same issue. Trying to write data to the table
> while using DataFrame API v2. Table is partitioned by buckets using
> df.writeTo("some_table").partitionedBy(col("date"), col("x"), bucket(10,
> col("y"))).using("iceberg").createOrReplace()
> Can I somehow prepare df in terms of partitions before writing to
> destination to not to write too many files? Raw data is not grouped by
> keys. Expectations are like
> df.repartition(col("x"), bucket(10,
> col("y")).writeTo("some_table").partitionedBy(col("date"), col("x"),
> bucket(10, col("y"))).using("iceberg").createOrReplace() .
> bucket function can't be used in that way, because getting [INTERNAL_ERROR]
> Cannot generate code for expression: bucket(10, input[0, bigint, true])
>
> Thanks
>