knowxyz commented on issue #12802:
URL: https://github.com/apache/iceberg/issues/12802#issuecomment-2806998151
Code i am trying
# Databricks notebook source
#Mianly used variables
from pyspark.sql import SparkSession
# Azure Storage Account Details
container_name = "csv"
mount_point = "/mnt/icebergdata8" # Choose a mount point
mount_point= "dbfs:/FileStore/tables"
storage_account_name = "abc"
storage_account_key = "zZHI+AStlGv+3w=="
file_name = "data.csv"
catalog = "catalog5"
store = "default"
table = "sampleoutput24"
warehouseName ="warehousenew9"
icebergtable_name = catalog + "." + store + "." + table
tempview ="tempviewice12"
# Initialize Spark Session (if not already initialized)
spark = SparkSession.builder \
.appName("IcebergSave") \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.catalog5",
"org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.catalog5.type", "hadoop") \
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,
org.apache.iceberg:iceberg-azure-bundle:1.7.1") \
.config("spark.sql.catalog.catalog5.warehouse",
f"{mount_point}/{warehouseName}").getOrCreate()
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
storage_account_key)
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.iceberg.vectorization.enabled", "false")
spark.conf.set("spark.databricks.io.cache.enabled","false")
spark.conf.set("spark.sql.files.useFsCache", "false")
spark.conf.set("spark.sql.execution.photon.enabled", "false")
spark.conf.set("spark.sql.catalog.catalog5.io-impl",
"org.apache.iceberg.azure.AzureBlobStorageIO")
spark.conf.set("spark.sql.catalog.catalog5.io-impl",
"org.apache.iceberg.azure.adlsv2.ADLSFileIO")
file_path =
f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{file_name}"
try:
dbutils.fs.unmount(mount_point)
print(f"{mount_point} unmounted successfully.")
except:
print("{mount_point} was not mounted or unmounting failed.")
try:
#dbutils.fs.mount(
#
source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
# mount_point=mount_point,
#
extra_configs={f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net":
storage_account_key}
#)
print(f"Mounted {container_name} to {mount_point} using Access Key.")
except Exception as e:
print(f"Error mounting using Access Key: ")
print(f"fs.azure.account.key ready to excute")
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
storage_account_key)
print(f"fs.azure.account.key Excuted")
# COMMAND ----------
sc = spark.sparkContext
print(spark.conf.get("spark.jars.packages"))
spark.conf.get("spark.sql.catalog.catalog5.warehouse")
# COMMAND ----------
#spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
# Create the Spark configuration
# Read the CSV file into a Spark DataFrame
df = spark.read.format("csv").option("header", "true").load(file_path)
# Show the DataFrame
#df.show()
# COMMAND ----------
# Write to Iceberg table with partitioning
#df2 = spark.createDataFrame(df.collect(), df.schema)
df.createOrReplaceTempView(tempview)
# Query the temporary view using SQL
sql = f"SELECT * FROM {tempview}"
resultdf = spark.sql(sql)
#print(icebergtable_name)
# only iceberg here
#resultdf.write.format("iceberg").mode("overwrite").saveAsTable(icebergtable_name)
#resultdf.unpersist()
# COMMAND ----------
schema = resultdf.schema
columns_sql= ""
columns_sql = ", ".join(f"{field.name}
{field.dataType.simpleString().upper()}" for field in schema)
query =f"""
CREATE TABLE {icebergtable_name}({columns_sql}) TBLPROPERTIES(
'delta.columnMapping.mode' = 'name',
'delta.enableIcebergCompatV2' = 'true',
'delta.universalFormat.enabledFormats' = 'iceberg'
);"""
#print (query)
spark.sql(query)
query1 = f"INSERT INTO {icebergtable_name} SELECT * FROM {tempview}"
#r =spark.sql(query1)
#print(query1)
spark.sql(query1)
dfread =
spark.read.format("org.apache.iceberg.spark.source.IcebergSource").load(icebergtable_name)
count = dfread.count()
#print(count)
dfread.show()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]