GitHub user ltuantai95 edited a discussion: Can't write hudi format in catalog
I have code stream from datagen and write hudi format with a catalog
```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
import os
# Set up the streaming execution environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1) # good for local testing
env.enable_checkpointing(60000) # Checkpoint every 60 seconds
# Create table environment for streaming mode (no Blink planner)
t_env = StreamTableEnvironment.create(env)
t_env.execute_sql(
"""
CREATE CATALOG hudi_catalog WITH (
'type' = 'hudi',
'catalog.path' = 'hdfs://namenode:8020/hudi_catalog'
)
"""
)
t_env.execute_sql("USE CATALOG hudi_catalog")
t_env.execute_sql("CREATE DATABASE IF NOT EXISTS hudi_db")
t_env.execute_sql("""
CREATE TABLE IF NOT EXISTS hudi_catalog.hudi_db.orders_datagen (
order_number BIGINT PRIMARY KEY NOT ENFORCED,
price DECIMAL(10,2)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.order_number.kind' = 'sequence',
'fields.order_number.start' = '1',
'fields.order_number.end' = '1000',
'fields.price.kind' = 'random',
'fields.price.min' = '20.00',
'fields.price.max' = '1000.00'
)
""")
t_env.execute_sql(
"""
CREATE TABLE IF NOT EXISTS hudi_catalog.hudi_db.hudi_orders (
order_number BIGINT PRIMARY KEY NOT ENFORCED,
price DECIMAL(32,2)
) WITH (
'connector' = 'hudi',
'table.type' = 'COPY_ON_WRITE'
)
"""
)
t_env.execute_sql("""
INSERT INTO hudi_catalog.hudi_db.hudi_orders
SELECT * FROM hudi_catalog.hudi_db.orders_datagen
""")
```
Job submit success and finish immediately with no errors but no data
(*.parquet) writed.
Thanks for your help!
GitHub link: https://github.com/apache/hudi/discussions/17992
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]