This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 017f4ccbc8 [python] Fix file name prefix in postpone mode. (#6668)
017f4ccbc8 is described below

commit 017f4ccbc812269ed4b532536c059d0088c13e13
Author: umi <[email protected]>
AuthorDate: Thu Nov 27 11:43:55 2025 +0800

    [python] Fix file name prefix in postpone mode. (#6668)
---
 paimon-python/dev/lint-python.sh                   |   8 +-
 paimon-python/pypaimon/common/core_options.py      |   5 +
 .../pypaimon/tests/rest/rest_simple_test.py        |  48 ------
 .../pypaimon/tests/write/table_write_test.py       | 177 ++++++++++++++++++++-
 paimon-python/pypaimon/write/file_store_write.py   |  19 ++-
 paimon-python/pypaimon/write/table_write.py        |   9 +-
 paimon-python/pypaimon/write/write_builder.py      |   4 +-
 paimon-python/pypaimon/write/writer/blob_writer.py |  11 +-
 .../pypaimon/write/writer/data_blob_writer.py      |  11 +-
 paimon-python/pypaimon/write/writer/data_writer.py |   6 +-
 10 files changed, 225 insertions(+), 73 deletions(-)

diff --git a/paimon-python/dev/lint-python.sh b/paimon-python/dev/lint-python.sh
index fb57bfdd41..469ee56c9d 100755
--- a/paimon-python/dev/lint-python.sh
+++ b/paimon-python/dev/lint-python.sh
@@ -107,10 +107,14 @@ function collect_checks() {
 function get_all_supported_checks() {
     _OLD_IFS=$IFS
     IFS=$'\n'
-    SUPPORT_CHECKS=()
+    SUPPORT_CHECKS=("flake8_check" "pytest_check" "mixed_check") # control the 
calling sequence
     for fun in $(declare -F); do
         if [[ `regexp_match "$fun" "_check$"` = true ]]; then
-            SUPPORT_CHECKS+=("${fun:11}")
+            check_name="${fun:11}"
+            # Only add if not already in SUPPORT_CHECKS
+            if [[ ! `contains_element "${SUPPORT_CHECKS[*]}" "$check_name"` = 
true ]]; then
+                SUPPORT_CHECKS+=("$check_name")
+            fi
         fi
     done
     IFS=$_OLD_IFS
diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
index 028f757b77..0686132979 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -50,6 +50,7 @@ class CoreOptions(str, Enum):
     FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor"
     TARGET_FILE_SIZE = "target-file-size"
     BLOB_TARGET_FILE_SIZE = "blob.target-file-size"
+    DATA_FILE_PREFIX = "data-file.prefix"
     # Scan options
     SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
     INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
@@ -64,6 +65,10 @@ class CoreOptions(str, Enum):
     DATA_FILE_EXTERNAL_PATHS_STRATEGY = "data-file.external-paths.strategy"
     DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS = 
"data-file.external-paths.specific-fs"
 
+    @staticmethod
+    def data_file_prefix(options: dict) -> str:
+        return options.get(CoreOptions.DATA_FILE_PREFIX, "data-")
+
     @staticmethod
     def blob_as_descriptor(options: dict) -> bool:
         return options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, 
"false").lower() == 'true'
diff --git a/paimon-python/pypaimon/tests/rest/rest_simple_test.py 
b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
index 19aec430fd..1b62d5a11f 100644
--- a/paimon-python/pypaimon/tests/rest/rest_simple_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
@@ -15,8 +15,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
-import glob
-import os
 
 import pyarrow as pa
 
@@ -566,52 +564,6 @@ class RESTSimpleTest(RESTBaseTest):
         expected = pa.Table.from_pydict(data_expected, schema=self.pa_schema)
         self.assertEqual(actual, expected)
 
-    def test_postpone_write(self):
-        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
-                                            options={'bucket': -2})
-        self.rest_catalog.create_table('default.test_postpone', schema, False)
-        table = self.rest_catalog.get_table('default.test_postpone')
-
-        expect = pa.Table.from_pydict(self.data, schema=self.pa_schema)
-
-        write_builder = table.new_batch_write_builder()
-        table_write = write_builder.new_write()
-        table_commit = write_builder.new_commit()
-        table_write.write_arrow(expect)
-        commit_messages = table_write.prepare_commit()
-        table_commit.commit(commit_messages)
-        table_write.close()
-        table_commit.close()
-
-        self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_postpone/snapshot/LATEST"))
-        self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_postpone/snapshot/snapshot-1"))
-        self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_postpone/manifest"))
-        self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_postpone/manifest/*")), 3)
-        self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_postpone/user_id=2/bucket-postpone/*.avro")), 1)
-
-    def test_postpone_read_write(self):
-        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
-                                            options={'bucket': -2})
-        self.rest_catalog.create_table('default.test_postpone', schema, False)
-        table = self.rest_catalog.get_table('default.test_postpone')
-
-        expect = pa.Table.from_pydict(self.data, schema=self.pa_schema)
-
-        write_builder = table.new_batch_write_builder()
-        table_write = write_builder.new_write()
-        table_commit = write_builder.new_commit()
-        table_write.write_arrow(expect)
-        commit_messages = table_write.prepare_commit()
-        table_commit.commit(commit_messages)
-        table_write.close()
-        table_commit.close()
-
-        read_builder = table.new_read_builder()
-        table_read = read_builder.new_read()
-        splits = read_builder.new_scan().plan().splits()
-        actual = table_read.to_arrow(splits)
-        self.assertTrue(not actual)
-
     def test_create_drop_database_table(self):
         # test create database
         self.rest_catalog.create_database("db1", False)
diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py 
b/paimon-python/pypaimon/tests/write/table_write_test.py
index 21b76731ac..04c9610a4b 100644
--- a/paimon-python/pypaimon/tests/write/table_write_test.py
+++ b/paimon-python/pypaimon/tests/write/table_write_test.py
@@ -15,7 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
-
+import glob
 import os
 import shutil
 
@@ -153,3 +153,178 @@ class TableWriteTest(unittest.TestCase):
         splits = read_builder.new_scan().plan().splits()
         actual = table_read.to_arrow(splits).sort_by('user_id')
         self.assertEqual(self.expected, actual)
+
+    def test_postpone_read_write(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
+                                            options={'bucket': -2})
+        self.catalog.create_table('default.test_postpone', schema, False)
+        table = self.catalog.get_table('default.test_postpone')
+        data = {
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'behavior': ['a', 'b', 'c', None],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+        }
+        expect = pa.Table.from_pydict(data, schema=self.pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(expect)
+        commit_messages = table_write.prepare_commit()
+        table_commit.commit(commit_messages)
+        table_write.close()
+        table_commit.close()
+
+        self.assertTrue(os.path.exists(self.warehouse + 
"/default.db/test_postpone/snapshot/LATEST"))
+        self.assertTrue(os.path.exists(self.warehouse + 
"/default.db/test_postpone/snapshot/snapshot-1"))
+        self.assertTrue(os.path.exists(self.warehouse + 
"/default.db/test_postpone/manifest"))
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/default.db/test_postpone/manifest/*")), 3)
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/default.db/test_postpone/user_id=2/bucket-postpone/*.avro")),
+                         1)
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        actual = table_read.to_arrow(splits)
+        self.assertTrue(not actual)
+
+    def test_data_file_prefix_postpone(self):
+        """Test that generated data file names follow the expected prefix 
format."""
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
+                                            options={'bucket': -2})
+        self.catalog.create_table('default.test_file_prefix_postpone', schema, 
False)
+        table = self.catalog.get_table('default.test_file_prefix_postpone')
+
+        # Write some data to generate files
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        data = {
+            'user_id': [1, 2],
+            'item_id': [1001, 1002],
+            'behavior': ['a', 'b'],
+            'dt': ['p1', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+
+        commit_messages = table_write.prepare_commit()
+        table_commit.commit(commit_messages)
+        table_write.close()
+        table_commit.close()
+
+        # Find generated data files
+        table_path = os.path.join(self.warehouse, 'default.db', 
'test_file_prefix_postpone')
+        data_files = []
+        for root, dirs, files in os.walk(table_path):
+            for file in files:
+                if file.endswith('.parquet') or file.endswith('.avro') or 
file.endswith('.orc'):
+                    data_files.append(file)
+
+        # Verify at least one data file was created
+        self.assertGreater(len(data_files), 0, "No data files were generated")
+
+        # Verify file name format: 
{table_prefix}-u-{commit_user}-s-{random_number}-w--{uuid}-0.{format}
+        # Expected pattern: data--u-{user}-s-{random}-w--{uuid}-0.{format}
+        expected_pattern = r'^data--u-.+-s-\d+-w-.+-0\.avro$'
+
+        for file_name in data_files:
+            self.assertRegex(file_name, expected_pattern,
+                             f"File name '{file_name}' does not match expected 
prefix format")
+
+            # Additional checks for specific components
+            parts = file_name.split('-')
+            self.assertEqual('data', parts[0], f"File prefix should start with 
'data', got '{parts[0]}'")
+            self.assertEqual('u', parts[2], f"Second part should be 'u', got 
'{parts[2]}'")
+            self.assertEqual('s', parts[8], f"Fourth part should be 's', got 
'{parts[8]}'")
+            self.assertEqual('w', parts[10], f"Sixth part should be 'w', got 
'{parts[10]}'")
+
+    def test_data_file_prefix_default(self):
+        """Test that generated data file names follow the expected prefix 
format."""
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['user_id'])
+        self.catalog.create_table('default.test_file_prefix_default', schema, 
False)
+        table = self.catalog.get_table('default.test_file_prefix_default')
+
+        # Write some data to generate files
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        data = {
+            'user_id': [1, 2],
+            'item_id': [1001, 1002],
+            'behavior': ['a', 'b'],
+            'dt': ['p1', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+
+        commit_messages = table_write.prepare_commit()
+        table_commit.commit(commit_messages)
+        table_write.close()
+        table_commit.close()
+
+        # Find generated data files
+        table_path = os.path.join(self.warehouse, 'default.db', 
'test_file_prefix_default')
+        data_files = []
+        for root, dirs, files in os.walk(table_path):
+            for file in files:
+                if file.endswith('.parquet') or file.endswith('.avro') or 
file.endswith('.orc'):
+                    data_files.append(file)
+
+        # Verify at least one data file was created
+        self.assertGreater(len(data_files), 0, "No data files were generated")
+
+        expected_pattern = r'^data-.+-0\.parquet$'
+
+        for file_name in data_files:
+            self.assertRegex(file_name, expected_pattern,
+                             f"File name '{file_name}' does not match expected 
prefix format")
+
+            # Additional checks for specific components
+            parts = file_name.split('-')
+            self.assertEqual('data', parts[0], f"File prefix should start with 
'data', got '{parts[0]}'")
+
+    def test_data_file_prefix(self):
+        """Test that generated data file names follow the expected prefix 
format."""
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['user_id'],
+                                            options={'data-file.prefix': 
'test_prefix'})
+        self.catalog.create_table('default.test_file_prefix', schema, False)
+        table = self.catalog.get_table('default.test_file_prefix')
+
+        # Write some data to generate files
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        data = {
+            'user_id': [1, 2],
+            'item_id': [1001, 1002],
+            'behavior': ['a', 'b'],
+            'dt': ['p1', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+
+        commit_messages = table_write.prepare_commit()
+        table_commit.commit(commit_messages)
+        table_write.close()
+        table_commit.close()
+
+        # Find generated data files
+        table_path = os.path.join(self.warehouse, 'default.db', 
'test_file_prefix')
+        data_files = []
+        for root, dirs, files in os.walk(table_path):
+            for file in files:
+                if file.endswith('.parquet') or file.endswith('.avro') or 
file.endswith('.orc'):
+                    data_files.append(file)
+
+        # Verify at least one data file was created
+        self.assertGreater(len(data_files), 0, "No data files were generated")
+
+        expected_pattern = r'^test_prefix.+-0\.parquet$'
+
+        for file_name in data_files:
+            self.assertRegex(file_name, expected_pattern,
+                             f"File name '{file_name}' does not match expected 
prefix format")
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index c100b64966..119bef6115 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -15,10 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+import random
 from typing import Dict, List, Tuple
 
 import pyarrow as pa
 
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.write.commit_message import CommitMessage
 from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
 from pypaimon.write.writer.data_blob_writer import DataBlobWriter
@@ -30,7 +32,7 @@ from pypaimon.table.bucket_mode import BucketMode
 class FileStoreWrite:
     """Base class for file store write operations."""
 
-    def __init__(self, table):
+    def __init__(self, table, commit_user):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
@@ -38,15 +40,21 @@ class FileStoreWrite:
         self.max_seq_numbers: dict = {}
         self.write_cols = None
         self.commit_identifier = 0
+        self.options = dict(table.options)
+        if (CoreOptions.BUCKET in table.options and
+                self.table.bucket_mode() == BucketMode.POSTPONE_MODE):
+            self.options[CoreOptions.DATA_FILE_PREFIX] = \
+                
(f"{CoreOptions.data_file_prefix(table.options)}-u-{commit_user}"
+                 f"-s-{random.randint(0, 2 ** 31 - 2)}-w-")
 
     def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
         key = (partition, bucket)
         if key not in self.data_writers:
-            self.data_writers[key] = self._create_data_writer(partition, 
bucket)
+            self.data_writers[key] = self._create_data_writer(partition, 
bucket, self.options)
         writer = self.data_writers[key]
         writer.write(data)
 
-    def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter:
+    def _create_data_writer(self, partition: Tuple, bucket: int, options: 
Dict[str, str]) -> DataWriter:
         def max_seq_number():
             return self._seq_number_stats(partition).get(bucket, 1)
 
@@ -57,13 +65,15 @@ class FileStoreWrite:
                 partition=partition,
                 bucket=bucket,
                 max_seq_number=0,
+                options=options
             )
         elif self.table.is_primary_key_table:
             return KeyValueDataWriter(
                 table=self.table,
                 partition=partition,
                 bucket=bucket,
-                max_seq_number=max_seq_number())
+                max_seq_number=max_seq_number(),
+                options=options)
         else:
             seq_number = 0 if self.table.bucket_mode() == 
BucketMode.BUCKET_UNAWARE else max_seq_number()
             return AppendOnlyDataWriter(
@@ -71,6 +81,7 @@ class FileStoreWrite:
                 partition=partition,
                 bucket=bucket,
                 max_seq_number=seq_number,
+                options=options,
                 write_cols=self.write_cols
             )
 
diff --git a/paimon-python/pypaimon/write/table_write.py 
b/paimon-python/pypaimon/write/table_write.py
index 7f415a0ba7..0ac73356a3 100644
--- a/paimon-python/pypaimon/write/table_write.py
+++ b/paimon-python/pypaimon/write/table_write.py
@@ -27,13 +27,14 @@ from pypaimon.write.file_store_write import FileStoreWrite
 
 
 class TableWrite:
-    def __init__(self, table):
+    def __init__(self, table, commit_user):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
         self.table_pyarrow_schema = 
PyarrowFieldParser.from_paimon_schema(self.table.table_schema.fields)
-        self.file_store_write = FileStoreWrite(self.table)
+        self.file_store_write = FileStoreWrite(self.table, commit_user)
         self.row_key_extractor = self.table.create_row_key_extractor()
+        self.commit_user = commit_user
 
     def write_arrow(self, table: pa.Table):
         batches_iterator = table.to_batches()
@@ -79,8 +80,8 @@ class TableWrite:
 
 
 class BatchTableWrite(TableWrite):
-    def __init__(self, table):
-        super().__init__(table)
+    def __init__(self, table, commit_user):
+        super().__init__(table, commit_user)
         self.batch_committed = False
 
     def prepare_commit(self) -> List[CommitMessage]:
diff --git a/paimon-python/pypaimon/write/write_builder.py 
b/paimon-python/pypaimon/write/write_builder.py
index 8c9ed725f5..fdda0c54b5 100644
--- a/paimon-python/pypaimon/write/write_builder.py
+++ b/paimon-python/pypaimon/write/write_builder.py
@@ -53,7 +53,7 @@ class WriteBuilder(ABC):
 class BatchWriteBuilder(WriteBuilder):
 
     def new_write(self) -> BatchTableWrite:
-        return BatchTableWrite(self.table)
+        return BatchTableWrite(self.table, self.commit_user)
 
     def new_commit(self) -> BatchTableCommit:
         commit = BatchTableCommit(self.table, self.commit_user, 
self.static_partition)
@@ -62,7 +62,7 @@ class BatchWriteBuilder(WriteBuilder):
 
 class StreamWriteBuilder(WriteBuilder):
     def new_write(self) -> StreamTableWrite:
-        return StreamTableWrite(self.table)
+        return StreamTableWrite(self.table, self.commit_user)
 
     def new_commit(self) -> StreamTableCommit:
         commit = StreamTableCommit(self.table, self.commit_user, 
self.static_partition)
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py 
b/paimon-python/pypaimon/write/writer/blob_writer.py
index 92c7e6ea1d..c8d0d45076 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -19,7 +19,7 @@
 import logging
 import uuid
 import pyarrow as pa
-from typing import Optional, Tuple
+from typing import Optional, Tuple, Dict
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
@@ -32,8 +32,10 @@ CHECK_ROLLING_RECORD_CNT = 1000
 
 class BlobWriter(AppendOnlyDataWriter):
 
-    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int, blob_column: str):
-        super().__init__(table, partition, bucket, max_seq_number, 
[blob_column])
+    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int, blob_column: str,
+                 options: Dict[str, str] = None):
+        super().__init__(table, partition, bucket, max_seq_number,
+                         options, write_cols=[blob_column])
 
         # Override file format to "blob"
         self.file_format = CoreOptions.FILE_FORMAT_BLOB
@@ -95,7 +97,8 @@ class BlobWriter(AppendOnlyDataWriter):
         self.sequence_generator.next()
 
     def open_current_writer(self):
-        file_name = 
f"data-{self.file_uuid}-{self.file_count}.{self.file_format}"
+        file_name = (f"{CoreOptions.data_file_prefix(self.options)}"
+                     f"{self.file_uuid}-{self.file_count}.{self.file_format}")
         self.file_count += 1  # Increment counter for next file
         file_path = self._generate_file_path(file_name)
         self.current_file_path = file_path
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 26ba1bee7b..6b81525c7f 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -19,7 +19,7 @@
 import logging
 import uuid
 from datetime import datetime
-from typing import List, Optional, Tuple
+from typing import List, Optional, Tuple, Dict
 
 import pyarrow as pa
 
@@ -75,8 +75,8 @@ class DataBlobWriter(DataWriter):
     # Constant for checking rolling condition periodically
     CHECK_ROLLING_RECORD_CNT = 1000
 
-    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int):
-        super().__init__(table, partition, bucket, max_seq_number)
+    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int, options: Dict[str, str] = None):
+        super().__init__(table, partition, bucket, max_seq_number, options)
 
         # Determine blob column from table schema
         self.blob_column_name = self._get_blob_columns_from_schema()
@@ -100,7 +100,8 @@ class DataBlobWriter(DataWriter):
             partition=self.partition,
             bucket=self.bucket,
             max_seq_number=max_seq_number,
-            blob_column=self.blob_column_name
+            blob_column=self.blob_column_name,
+            options=options
         )
 
         logger.info(f"Initialized DataBlobWriter with blob column: 
{self.blob_column_name}")
@@ -245,7 +246,7 @@ class DataBlobWriter(DataWriter):
         if data.num_rows == 0:
             return None
 
-        file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
+        file_name = 
f"{CoreOptions.data_file_prefix(self.options)}{uuid.uuid4()}-0.{self.file_format}"
         file_path = self._generate_file_path(file_name)
 
         # Write file based on format
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 56487094ed..079b8d26d6 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -34,7 +34,7 @@ from pypaimon.table.row.generic_row import GenericRow
 class DataWriter(ABC):
     """Base class for data writers that handle PyArrow tables directly."""
 
-    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int,
+    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int, options: Dict[str, str] = None,
                  write_cols: Optional[List[str]] = None):
         from pypaimon.table.file_store_table import FileStoreTable
 
@@ -46,7 +46,7 @@ class DataWriter(ABC):
         self.trimmed_primary_keys_fields = 
self.table.trimmed_primary_keys_fields
         self.trimmed_primary_keys = self.table.trimmed_primary_keys
 
-        options = self.table.options
+        self.options = options
         self.target_file_size = CoreOptions.target_file_size(options, 
self.table.is_primary_key_table)
         # POSTPONE_BUCKET uses AVRO format, otherwise default to PARQUET
         default_format = (
@@ -158,7 +158,7 @@ class DataWriter(ABC):
     def _write_data_to_file(self, data: pa.Table):
         if data.num_rows == 0:
             return
-        file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
+        file_name = 
f"{CoreOptions.data_file_prefix(self.options)}{uuid.uuid4()}-0.{self.file_format}"
         file_path = self._generate_file_path(file_name)
 
         is_external_path = self.external_path_provider is not None

Reply via email to