plusplusjiajia commented on code in PR #7360:
URL: https://github.com/apache/paimon/pull/7360#discussion_r2901427207
##########
paimon-python/pypaimon/cli/cli_table.py:
##########
@@ -185,6 +185,229 @@ def cmd_table_create(args):
sys.exit(1)
+def cmd_table_import(args):
+ """
+ Execute the 'table import' command.
+
+ Imports data from a CSV or JSON file into a Paimon table.
+
+ Args:
+ args: Parsed command line arguments.
+ """
+ import pandas as pd
+ from pypaimon.cli.cli import load_catalog_config, create_catalog
+
+ # Load catalog configuration
+ config_path = args.config
+ config = load_catalog_config(config_path)
+
+ # Create catalog
+ catalog = create_catalog(config)
+
+ # Parse table identifier
+ table_identifier = args.table
+ parts = table_identifier.split('.')
+ if len(parts) != 2:
+ print(f"Error: Invalid table identifier '{table_identifier}'. "
+ f"Expected format: 'database.table'", file=sys.stderr)
+ sys.exit(1)
+
+ database_name, table_name = parts
+
+ # Get table
+ try:
+ table = catalog.get_table(f"{database_name}.{table_name}")
+ except Exception as e:
+ print(f"Error: Failed to get table '{table_identifier}': {e}",
file=sys.stderr)
+ sys.exit(1)
+
+ # Get input file path
+ input_file = args.input
+ if not input_file:
+ print("Error: Input file is required. Use --input option.",
file=sys.stderr)
+ sys.exit(1)
+
+ # Read data from file
+ try:
+ file_lower = input_file.lower()
+ if file_lower.endswith('.csv'):
+ # Read CSV file
+ df = pd.read_csv(input_file)
+ elif file_lower.endswith('.json'):
+ # Read JSON file
+ df = pd.read_json(input_file)
+ else:
+ print("Error: Unsupported file format. Only CSV and JSON files are
supported.", file=sys.stderr)
+ sys.exit(1)
+
+ if df.empty:
+ print("Warning: No data found in file '{input_file}'.",
file=sys.stderr)
+ return
+
+ except FileNotFoundError:
+ print(f"Error: Input file not found: {input_file}", file=sys.stderr)
+ sys.exit(1)
+ except Exception as e:
+ print(f"Error: Failed to read input file: {e}", file=sys.stderr)
+ sys.exit(1)
+
+ # Write data to table
+ try:
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ # Get table schema and convert DataFrame to match it
+ import pyarrow as pa
+ from pypaimon.schema.data_types import PyarrowFieldParser
+ pa_schema =
PyarrowFieldParser.from_paimon_schema(table.table_schema.fields)
+
+ # Convert DataFrame to PyArrow Table with the correct schema
+ table_data = pa.Table.from_pandas(df, schema=pa_schema)
+
+ # Write data
+ table_write.write_arrow(table_data)
+
+ # Commit write
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
Review Comment:
It's better to close table_write and table_commit in a finally block to
ensure resources are always released.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]