This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b0ddd7f1b [core] Remove old default value for read side (#5804)
6b0ddd7f1b is described below

commit 6b0ddd7f1b828d6fb564c609e15520209cb9e997
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 27 15:10:23 2025 +0800

    [core] Remove old default value for read side (#5804)
---
 .../paimon/operation/DefaultValueAssigner.java     | 175 ---------------------
 .../privilege/FileBasedPrivilegeManager.java       |   1 +
 .../paimon/table/AbstractFileStoreTable.java       |   2 -
 .../paimon/table/source/AbstractDataTableRead.java |  11 --
 .../paimon/table/source/DataTableBatchScan.java    |   6 +-
 .../paimon/table/source/DataTableStreamScan.java   |   5 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  |   7 +-
 .../paimon/operation/DefaultValueAssignerTest.java | 135 ----------------
 .../paimon/table/PrimaryKeySimpleTableTest.java    |  51 ------
 .../source/snapshot/DefaultValueScannerTest.java   |  92 -----------
 .../apache/paimon/flink/ReadWriteTableITCase.java  |  80 ----------
 11 files changed, 4 insertions(+), 561 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
deleted file mode 100644
index 416237b60b..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.casting.CastExecutor;
-import org.apache.paimon.casting.CastExecutors;
-import org.apache.paimon.casting.DefaultValueRow;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.PredicateReplaceVisitor;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.VarCharType;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
-
-/**
- * The field Default value assigner. note that invoke of assigning should be 
after merge and schema
- * evolution.
- *
- * @deprecated default value in reading is not recommended
- */
-@Deprecated
-public class DefaultValueAssigner {
-
-    public static final String DEFAULT_VALUE_SUFFIX = "default-value";
-
-    private final RowType rowType;
-    private final Map<String, String> defaultValues;
-
-    private boolean needToAssign;
-
-    private @Nullable RowType readRowType;
-    private DefaultValueRow defaultValueRow;
-
-    private DefaultValueAssigner(Map<String, String> defaultValues, RowType 
rowType) {
-        this.defaultValues = defaultValues;
-        this.needToAssign = !defaultValues.isEmpty();
-        this.rowType = rowType;
-    }
-
-    public DefaultValueAssigner handleReadRowType(RowType readRowType) {
-        this.readRowType = readRowType;
-        List<String> requiredFieldNames = readRowType.getFieldNames();
-        needToAssign = 
defaultValues.keySet().stream().anyMatch(requiredFieldNames::contains);
-        return this;
-    }
-
-    /** assign default value for column which value is null. */
-    public RecordReader<InternalRow> 
assignFieldsDefaultValue(RecordReader<InternalRow> reader) {
-        if (!needToAssign) {
-            return reader;
-        }
-
-        if (defaultValueRow == null) {
-            defaultValueRow = createDefaultValueRow();
-        }
-
-        return reader.transform(defaultValueRow::replaceRow);
-    }
-
-    @VisibleForTesting
-    DefaultValueRow createDefaultValueRow() {
-        List<DataField> fields;
-        if (readRowType != null) {
-            fields = readRowType.getFields();
-        } else {
-            fields = rowType.getFields();
-        }
-
-        GenericRow row = new GenericRow(fields.size());
-        for (int i = 0; i < fields.size(); i++) {
-            DataField dataField = fields.get(i);
-            String defaultValueStr = defaultValues.get(dataField.name());
-            if (defaultValueStr == null) {
-                continue;
-            }
-
-            @SuppressWarnings("unchecked")
-            CastExecutor<Object, Object> resolve =
-                    (CastExecutor<Object, Object>)
-                            CastExecutors.resolve(VarCharType.STRING_TYPE, 
dataField.type());
-
-            if (resolve == null) {
-                throw new RuntimeException(
-                        "Default value do not support the type of " + 
dataField.type());
-            }
-            Object defaultValue = 
resolve.cast(BinaryString.fromString(defaultValueStr));
-            row.setField(i, defaultValue);
-        }
-
-        return DefaultValueRow.from(row);
-    }
-
-    public Predicate handlePredicate(Predicate filters) {
-        Predicate result = filters;
-        if (!defaultValues.isEmpty()) {
-            if (filters != null) {
-                // TODO improve predicate tree with replacing always true and 
always false
-                PredicateReplaceVisitor deletePredicateWithFieldNameVisitor =
-                        predicate -> {
-                            if 
(defaultValues.containsKey(predicate.fieldName())) {
-                                return Optional.empty();
-                            }
-                            return Optional.of(predicate);
-                        };
-
-                ArrayList<Predicate> filterWithoutDefaultValueField = new 
ArrayList<>();
-
-                List<Predicate> predicates = 
PredicateBuilder.splitAnd(filters);
-                for (Predicate predicate : predicates) {
-                    predicate
-                            .visit(deletePredicateWithFieldNameVisitor)
-                            .ifPresent(filterWithoutDefaultValueField::add);
-                }
-
-                if (!filterWithoutDefaultValueField.isEmpty()) {
-                    result = 
PredicateBuilder.and(filterWithoutDefaultValueField);
-                } else {
-                    result = null;
-                }
-            }
-        }
-        return result;
-    }
-
-    public static DefaultValueAssigner create(TableSchema schema) {
-        Map<String, String> defaultValues = 
getFieldDefaultValues(schema.options());
-        return new DefaultValueAssigner(defaultValues, 
schema.logicalRowType());
-    }
-
-    private static Map<String, String> getFieldDefaultValues(Map<String, 
String> options) {
-        Map<String, String> defaultValues = new HashMap<>();
-        String fieldPrefix = FIELDS_PREFIX + ".";
-        String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX;
-        for (Map.Entry<String, String> option : options.entrySet()) {
-            String key = option.getKey();
-            if (key != null && key.startsWith(fieldPrefix) && 
key.endsWith(defaultValueSuffix)) {
-                String fieldName = key.replace(fieldPrefix, 
"").replace(defaultValueSuffix, "");
-                defaultValues.put(fieldName, option.getValue());
-            }
-        }
-        return defaultValues;
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
index ec86e0cd17..3f4d89466e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
@@ -384,6 +384,7 @@ public class FileBasedPrivilegeManager implements 
PrivilegeManager {
     private void createUserTable() {
         Options options = new Options();
         options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.FILE_FORMAT, "avro");
         Path tableRoot = new Path(warehouse, USER_TABLE_DIR);
         SchemaManager schemaManager = new SchemaManager(fileIO, tableRoot);
         try {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index daa00f295a..7ea19d9f02 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -27,7 +27,6 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
-import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.options.ExpireConfig;
 import org.apache.paimon.options.Options;
@@ -259,7 +258,6 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 changelogManager(),
                 splitGenerator(),
                 nonPartitionFilterConsumer(),
-                DefaultValueAssigner.create(tableSchema),
                 store().pathFactory(),
                 name(),
                 store().newIndexFileHandler());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index da2ce8cb56..fe73d3c251 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateProjectionConverter;
 import org.apache.paimon.reader.RecordReader;
@@ -33,8 +32,6 @@ import java.util.Optional;
 /** A {@link InnerTableRead} for data table. */
 public abstract class AbstractDataTableRead implements InnerTableRead {
 
-    private final DefaultValueAssigner defaultValueAssigner;
-
     private RowType readType;
     private boolean executeFilter = false;
     private Predicate predicate;
@@ -42,7 +39,6 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
 
     public AbstractDataTableRead(TableSchema schema) {
         this.schema = schema;
-        this.defaultValueAssigner = schema == null ? null : 
DefaultValueAssigner.create(schema);
     }
 
     public abstract void applyReadType(RowType readType);
@@ -57,9 +53,6 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
     @Override
     public final InnerTableRead withFilter(Predicate predicate) {
         this.predicate = predicate;
-        if (defaultValueAssigner != null) {
-            predicate = defaultValueAssigner.handlePredicate(predicate);
-        }
         return innerWithFilter(predicate);
     }
 
@@ -82,7 +75,6 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
     @Override
     public final InnerTableRead withReadType(RowType readType) {
         this.readType = readType;
-        this.defaultValueAssigner.handleReadRowType(readType);
         applyReadType(readType);
         return this;
     }
@@ -90,9 +82,6 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
     @Override
     public final RecordReader<InternalRow> createReader(Split split) throws 
IOException {
         RecordReader<InternalRow> reader = reader(split);
-        if (defaultValueAssigner != null) {
-            reader = defaultValueAssigner.assignFieldsDefaultValue(reader);
-        }
         if (executeFilter) {
             reader = executeFilter(reader);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index a1b94f59a5..0bf6927e4e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
@@ -34,8 +33,6 @@ import java.util.List;
 /** {@link TableScan} implementation for batch planning. */
 public class DataTableBatchScan extends AbstractDataTableScan {
 
-    private final DefaultValueAssigner defaultValueAssigner;
-
     private StartingScanner startingScanner;
     private boolean hasNext;
 
@@ -49,7 +46,6 @@ public class DataTableBatchScan extends AbstractDataTableScan 
{
         super(schema, options, snapshotReader, queryAuth);
 
         this.hasNext = true;
-        this.defaultValueAssigner = DefaultValueAssigner.create(schema);
 
         if (!schema.primaryKeys().isEmpty() && options.batchScanSkipLevel0()) {
             snapshotReader.withLevelFilter(level -> level > 
0).enableValueFilter();
@@ -62,7 +58,7 @@ public class DataTableBatchScan extends AbstractDataTableScan 
{
     @Override
     public InnerTableScan withFilter(Predicate predicate) {
         super.withFilter(predicate);
-        
snapshotReader.withFilter(defaultValueAssigner.handlePredicate(predicate));
+        snapshotReader.withFilter(predicate);
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 2f42674296..a6ab782dba 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -24,7 +24,6 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.Consumer;
 import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
@@ -62,7 +61,6 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
     private final StreamScanMode scanMode;
     private final SnapshotManager snapshotManager;
     private final boolean supportStreamingReadOverwrite;
-    private final DefaultValueAssigner defaultValueAssigner;
     private final NextSnapshotFetcher nextSnapshotProvider;
     private final boolean hasPk;
 
@@ -92,7 +90,6 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
         this.scanMode = 
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
         this.snapshotManager = snapshotManager;
         this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
-        this.defaultValueAssigner = DefaultValueAssigner.create(schema);
         this.nextSnapshotProvider =
                 new NextSnapshotFetcher(
                         snapshotManager, changelogManager, 
options.changelogLifecycleDecoupled());
@@ -107,7 +104,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
     @Override
     public DataTableStreamScan withFilter(Predicate predicate) {
         super.withFilter(predicate);
-        
snapshotReader.withFilter(defaultValueAssigner.handlePredicate(predicate));
+        snapshotReader.withFilter(predicate);
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index d6658be267..5855e88ab1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -34,7 +34,6 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.operation.metrics.ScanMetrics;
@@ -85,7 +84,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
     private final ConsumerManager consumerManager;
     private final SplitGenerator splitGenerator;
     private final BiConsumer<FileStoreScan, Predicate> 
nonPartitionFilterConsumer;
-    private final DefaultValueAssigner defaultValueAssigner;
     private final FileStorePathFactory pathFactory;
     private final String tableName;
     private final IndexFileHandler indexFileHandler;
@@ -101,7 +99,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
             ChangelogManager changelogManager,
             SplitGenerator splitGenerator,
             BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,
-            DefaultValueAssigner defaultValueAssigner,
             FileStorePathFactory pathFactory,
             String tableName,
             IndexFileHandler indexFileHandler) {
@@ -118,7 +115,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
                         snapshotManager.branch());
         this.splitGenerator = splitGenerator;
         this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
-        this.defaultValueAssigner = defaultValueAssigner;
         this.pathFactory = pathFactory;
 
         this.tableName = tableName;
@@ -218,8 +214,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
 
         List<Predicate> partitionFilters = new ArrayList<>();
         List<Predicate> nonPartitionFilters = new ArrayList<>();
-        for (Predicate p :
-                
PredicateBuilder.splitAnd(defaultValueAssigner.handlePredicate(predicate))) {
+        for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
             Optional<Predicate> mapped = transformFieldMapping(p, 
fieldIdxToPartitionIdx);
             if (mapped.isPresent()) {
                 partitionFilters.add(mapped.get());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
deleted file mode 100644
index 7cb31fba27..0000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-
-import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-class DefaultValueAssignerTest {
-    @TempDir java.nio.file.Path tempDir;
-
-    private TableSchema tableSchema;
-
-    @BeforeEach
-    public void beforeEach() throws Exception {
-        Path tablePath = new Path(tempDir.toUri());
-        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
-        Map<String, String> options = new HashMap<>();
-        options.put(
-                String.format(
-                        "%s.%s.%s",
-                        CoreOptions.FIELDS_PREFIX,
-                        "col4",
-                        DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
-                "0");
-        options.put(
-                String.format(
-                        "%s.%s.%s",
-                        CoreOptions.FIELDS_PREFIX,
-                        "col5",
-                        DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
-                "1");
-        Schema schema =
-                new Schema(
-                        Lists.newArrayList(
-                                new DataField(0, "col0", DataTypes.STRING()),
-                                new DataField(1, "col1", DataTypes.STRING()),
-                                new DataField(2, "col2", DataTypes.STRING()),
-                                new DataField(3, "col3", DataTypes.STRING()),
-                                new DataField(4, "col4", DataTypes.STRING()),
-                                new DataField(5, "col5", DataTypes.STRING())),
-                        Lists.newArrayList("col0"),
-                        Collections.emptyList(),
-                        options,
-                        "");
-        tableSchema = schemaManager.createTable(schema);
-    }
-
-    @Test
-    public void testGeneralRow() {
-        DefaultValueAssigner defaultValueAssigner = 
DefaultValueAssigner.create(tableSchema);
-        RowType readRowType =
-                tableSchema.projectedLogicalRowType(Lists.newArrayList("col5", 
"col4", "col0"));
-        defaultValueAssigner = 
defaultValueAssigner.handleReadRowType(readRowType);
-        InternalRow row = 
defaultValueAssigner.createDefaultValueRow().defaultValueRow();
-        assertThat(String.format("%s|%s|%s", row.getString(0), 
row.getString(1), row.getString(2)))
-                .isEqualTo("1|0|null");
-    }
-
-    @Test
-    public void testHandlePredicate() {
-        DefaultValueAssigner defaultValueAssigner = 
DefaultValueAssigner.create(tableSchema);
-        PredicateBuilder predicateBuilder = new 
PredicateBuilder(tableSchema.logicalRowType());
-
-        {
-            Predicate predicate =
-                    PredicateBuilder.and(
-                            
predicateBuilder.equal(predicateBuilder.indexOf("col5"), "100"),
-                            
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
-            Predicate actual = defaultValueAssigner.handlePredicate(predicate);
-            assertThat(actual)
-                    
.isEqualTo(predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
-        }
-
-        {
-            Predicate predicate =
-                    PredicateBuilder.and(
-                            
predicateBuilder.equal(predicateBuilder.indexOf("col5"), "100"),
-                            
predicateBuilder.equal(predicateBuilder.indexOf("col4"), "1"));
-            Predicate actual = defaultValueAssigner.handlePredicate(predicate);
-            Assertions.assertThat(actual).isNull();
-        }
-
-        {
-            Predicate actual = defaultValueAssigner.handlePredicate(null);
-            Assertions.assertThat(actual).isNull();
-        }
-
-        {
-            Predicate actual =
-                    defaultValueAssigner.handlePredicate(
-                            
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
-            assertThat(actual)
-                    
.isEqualTo(predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
-        }
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 693c378a21..73208f290a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -36,7 +36,6 @@ import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.operation.AbstractFileStoreWrite;
-import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
@@ -1281,56 +1280,6 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
         commit.close();
     }
 
-    @Test
-    public void testAuditLogWithDefaultValueFields() throws Exception {
-        FileStoreTable table =
-                createFileStoreTable(
-                        conf -> {
-                            conf.set(CHANGELOG_PRODUCER, 
ChangelogProducer.INPUT);
-                            conf.set(
-                                    String.format(
-                                            "%s.%s.%s",
-                                            CoreOptions.FIELDS_PREFIX,
-                                            "b",
-                                            
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
-                                    "0");
-                        });
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
-
-        write.write(rowDataWithKind(RowKind.INSERT, 2, 20, 200L));
-        write.write(rowDataWithKind(RowKind.INSERT, 2, 21, null));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        write.close();
-        commit.close();
-
-        AuditLogTable auditLogTable = new AuditLogTable(table);
-        Function<InternalRow, String> rowDataToString =
-                row ->
-                        internalRowToString(
-                                row,
-                                DataTypes.ROW(
-                                        DataTypes.STRING(),
-                                        DataTypes.INT(),
-                                        DataTypes.INT(),
-                                        DataTypes.BIGINT()));
-
-        PredicateBuilder predicateBuilder = new 
PredicateBuilder(auditLogTable.rowType());
-
-        SnapshotReader snapshotReader =
-                auditLogTable
-                        .newSnapshotReader()
-                        .withFilter(
-                                and(
-                                        
predicateBuilder.equal(predicateBuilder.indexOf("b"), 300),
-                                        
predicateBuilder.equal(predicateBuilder.indexOf("pt"), 2)));
-        InnerTableRead read = auditLogTable.newRead();
-        List<String> result =
-                getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowDataToString);
-        assertThat(result).containsExactlyInAnyOrder("+I[+I, 2, 20, 200]", 
"+I[+I, 2, 21, 0]");
-    }
-
     @Test
     public void testAuditLog() throws Exception {
         FileStoreTable table =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
deleted file mode 100644
index ec253143a7..0000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.DefaultValueAssigner;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.table.source.TableScan;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** test default value on streaming scan. */
-public class DefaultValueScannerTest extends ScannerTestBase {
-    @Test
-    public void testDefaultValue() throws Exception {
-        TableRead read = table.newRead();
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
-        StreamDataTableScan scan = table.newStreamScan();
-
-        write.write(rowData(1, 10, 101L));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        write.write(rowData(1, 10, null));
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        {
-            TableScan.Plan plan = scan.plan();
-            assertThat(getResult(read, plan.splits()))
-                    .hasSameElementsAs(Arrays.asList("+I 1|10|100"));
-        }
-
-        write.write(rowData(2, 11, 200L));
-        write.write(rowData(2, 12, null));
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        {
-            // Predicate pushdown for default fields will not work
-            PredicateBuilder predicateBuilder =
-                    new PredicateBuilder(table.schema().logicalRowType());
-
-            Predicate ptEqual = 
predicateBuilder.equal(predicateBuilder.indexOf("pt"), 2);
-            Predicate bEqual = 
predicateBuilder.equal(predicateBuilder.indexOf("b"), 200);
-            Predicate predicate = PredicateBuilder.and(ptEqual, bEqual);
-
-            TableScan.Plan plan = scan.withFilter(predicate).plan();
-            read = table.newRead().withFilter(predicate);
-            List<String> result = getResult(read, plan.splits());
-            assertThat(result).hasSameElementsAs(Arrays.asList("+I 2|11|200", 
"+I 2|12|100"));
-        }
-        write.close();
-        commit.close();
-    }
-
-    protected FileStoreTable createFileStoreTable() throws Exception {
-        Options options = new Options();
-        options.set(
-                String.format(
-                        "%s.%s.%s",
-                        CoreOptions.FIELDS_PREFIX, "b", 
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
-                "100");
-        return createFileStoreTable(options);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 676836d15c..f5e6a797ca 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -23,15 +23,12 @@ import org.apache.paimon.flink.sink.FlinkTableSink;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.BlockingIterator;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -58,7 +55,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -75,10 +71,7 @@ import java.util.stream.Stream;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.paimon.CoreOptions.BUCKET;
-import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
-import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
-import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM;
@@ -1586,79 +1579,6 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                         changelogRow("+I", 3L, "Euro", 119L, "2022-01-02")));
     }
 
-    @Test
-    public void testDefaultValueWithoutPrimaryKey() throws Exception {
-        Map<String, String> options = new HashMap<>();
-        options.put(
-                CoreOptions.FIELDS_PREFIX + ".rate." + 
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX,
-                "1000");
-
-        String table =
-                createTable(
-                        Arrays.asList(
-                                "id BIGINT NOT NULL",
-                                "currency STRING",
-                                "rate BIGINT",
-                                "dt String"),
-                        Collections.emptyList(),
-                        Collections.singletonList("id"),
-                        Collections.emptyList(),
-                        options);
-        insertInto(
-                table,
-                "(1, 'US Dollar', 114, '2022-01-01')",
-                "(2, 'Yen', cast(null as int), '2022-01-01')",
-                "(3, 'Euro', cast(null as int), '2022-01-01')",
-                "(3, 'Euro', 119, '2022-01-02')");
-
-        List<Row> expectedRecords =
-                Arrays.asList(
-                        // part = 2022-01-01
-                        changelogRow("+I", 2L, "Yen", 1000L, "2022-01-01"),
-                        changelogRow("+I", 3L, "Euro", 1000L, "2022-01-01"));
-
-        String querySql = String.format("SELECT * FROM %s where rate = 1000", 
table);
-        testBatchRead(querySql, expectedRecords);
-    }
-
-    @ParameterizedTest
-    @EnumSource(CoreOptions.MergeEngine.class)
-    public void testDefaultValueWithPrimaryKey(CoreOptions.MergeEngine 
mergeEngine)
-            throws Exception {
-        Map<String, String> options = new HashMap<>();
-        options.put(
-                CoreOptions.FIELDS_PREFIX + ".rate." + 
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX,
-                "1000");
-        options.put(MERGE_ENGINE.key(), mergeEngine.toString());
-        if (mergeEngine == FIRST_ROW) {
-            options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
-        }
-        String table =
-                createTable(
-                        Arrays.asList(
-                                "id BIGINT NOT NULL",
-                                "currency STRING",
-                                "rate BIGINT",
-                                "dt String"),
-                        Lists.newArrayList("id", "dt"),
-                        Collections.emptyList(),
-                        Lists.newArrayList("dt"),
-                        options);
-        insertInto(
-                table,
-                "(1, 'US Dollar', 114, '2022-01-01')",
-                "(2, 'Yen', cast(null as int), '2022-01-01')",
-                "(2, 'Yen', cast(null as int), '2022-01-01')",
-                "(3, 'Euro', cast(null as int) , '2022-01-02')");
-
-        List<Row> expectedRecords =
-                Arrays.asList(changelogRow("+I", 3L, "Euro", 1000L, 
"2022-01-02"));
-
-        String querySql =
-                String.format("SELECT * FROM %s where rate = 1000 and currency 
='Euro'", table);
-        testBatchRead(querySql, expectedRecords);
-    }
-
     @Test
     public void testUpdateWithoutPrimaryKey() throws Exception {
         // Step1: define table schema


Reply via email to