soumilshah1995 opened a new issue, #11350:
URL: https://github.com/apache/hudi/issues/11350
I am encountering a warning when using the record-level index (RECORD_INDEX)
with Hudi. During the first run, the following warning is displayed:
```
24/05/28 13:41:45 WARN SparkMetadataTableRecordIndex: Record index not
initialized so falling back to GLOBAL_SIMPLE for tagging records
```
It seems that the RECORD_INDEX is not properly initialized and defaults to
GLOBAL_SIMPLE.
Environment:
Hudi Version: 1.0.0-beta1
Spark Version: 3.4
Java Version: OpenJDK 11
Operating System: macOS (with homebrew)
# Sample code
```
try:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
import datetime
from datetime import datetime
import random
import pandas as pd # Import Pandas library for pretty printing
print("Imports loaded ")
except Exception as e:
print("error", e)
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages
org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION}
pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions',
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
from pyspark.sql.types import StructType, StructField, StringType, DateType,
FloatType
from pyspark.sql.functions import col
from datetime import datetime
# Define schema
schema = StructType([
StructField("orderID", StringType(), True),
StructField("productSKU", StringType(), True),
StructField("customerID", StringType(), True),
StructField("orderDate", StringType(), True),
StructField("orderAmount", FloatType(), True)
])
# Create data
data = [
("order;1", "prod#001", "cust!001", "2024-01-15", 150.00),
("order;002", "prod#002", "cust!002", "2024-01-16", 200.00),
("order;003", "prod#003", "cust!003", "2024-01-17", 300.00),
("order;004", "prod#004", "cust!004", "2024-01-18", 250.00),
("order;005", "prod#005", "cust!005", "2024-01-19", 100.00),
("order;006", "prod#006", "cust!006", "2024-01-20", 350.00),
("order;007", "prod#007", "cust!007", "2024-01-21", 400.00),
]
df = spark.createDataFrame(data, schema)
def write_to_hudi(spark_df,
table_name,
db_name,
method='upsert',
table_type='COPY_ON_WRITE',
recordkey='',
precombine='',
partition_fields='',
index_type='BLOOM'
):
path =
f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name{table_name}"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.metadata.record.index.enable": "true",
"hoodie.index.type": index_type,
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.column.stats.enable": "true",
# "hoodie.datasource.write.partitionpath.urlencode":"true",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.lock.provider":
"org.apache.hudi.client.transaction.lock.InProcessLockProvider",
}
print(hudi_options)
print("\n")
print(path)
print("\n")
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
write_to_hudi(
spark_df=df,
db_name="default",
table_name="orders",
recordkey="orderID",
precombine="orderDate",
partition_fields="orderID",
index_type="RECORD_INDEX"
)
```
## Output
```
{'hoodie.table.name': 'orders', 'hoodie.datasource.write.table.type':
'COPY_ON_WRITE', 'hoodie.datasource.write.table.name': 'orders',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.recordkey.field': 'orderID',
'hoodie.datasource.write.precombine.field': 'orderDate',
'hoodie.datasource.write.partitionpath.field': 'orderID',
'hoodie.metadata.record.index.enable': 'true', 'hoodie.index.type':
'RECORD_INDEX', 'hoodie.metadata.enable': 'true',
'hoodie.metadata.index.column.stats.enable': 'true',
'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
'hoodie.write.lock.provider':
'org.apache.hudi.client.transaction.lock.InProcessLockProvider'}
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_nameorders
24/05/28 13:44:50 WARN SparkMetadataTableRecordIndex: Record index not
initialized so falling back to GLOBAL_SIMPLE for tagging records
24/05/28 13:44:54 WARN DAGScheduler: Broadcasting large task binary with
size 1129.7 KiB
24/05/28 13:44:55 WARN DAGScheduler: Broadcasting large task binary with
size 1230.8 KiB
```

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]