This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1d3844605 [core] Drop stats in manifest file reading (#4534)
1d3844605 is described below
commit 1d3844605ab71d328d1519e08ebc350eeaaa5f5a
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Nov 19 15:37:54 2024 +0800
[core] Drop stats in manifest file reading (#4534)
---
docs/content/maintenance/metrics.md | 10 -
.../main/java/org/apache/paimon/utils/Filter.java | 7 +
.../org/apache/paimon/utils/ThreadPoolUtils.java | 6 +-
.../UnawareAppendTableCompactionCoordinator.java | 8 +-
.../java/org/apache/paimon/manifest/FileEntry.java | 25 ++-
.../paimon/manifest/FilteredManifestEntry.java | 30 +--
.../org/apache/paimon/manifest/ManifestFile.java | 13 --
.../paimon/operation/AbstractFileStoreScan.java | 209 +++++++++++++--------
.../paimon/operation/AppendOnlyFileStoreScan.java | 7 -
.../org/apache/paimon/operation/FileStoreScan.java | 2 +
.../paimon/operation/KeyValueFileStoreScan.java | 38 +++-
.../paimon/operation/LocalOrphanFilesClean.java | 4 +-
.../paimon/operation/metrics/ScanMetrics.java | 12 --
.../apache/paimon/operation/metrics/ScanStats.java | 22 +--
.../apache/paimon/table/sink/TableCommitImpl.java | 4 +-
.../paimon/table/source/DataTableBatchScan.java | 2 +-
.../snapshot/IncrementalStartingScanner.java | 6 +-
.../table/source/snapshot/SnapshotReader.java | 2 +
.../table/source/snapshot/SnapshotReaderImpl.java | 6 +
.../apache/paimon/table/system/AuditLogTable.java | 6 +
.../paimon/table/system/ReadOptimizedTable.java | 3 +-
.../paimon/utils/ManifestReadThreadPool.java | 4 +-
.../java/org/apache/paimon/utils/ObjectsCache.java | 17 +-
.../java/org/apache/paimon/utils/ObjectsFile.java | 27 ++-
.../paimon/operation/metrics/ScanMetricsTest.java | 26 +--
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 17 --
.../org/apache/paimon/utils/ObjectsCacheTest.java | 18 +-
.../flink/source/ContinuousFileStoreSource.java | 2 +-
.../paimon/flink/source/FlinkSourceBuilder.java | 2 +-
.../paimon/flink/source/FlinkTableSource.java | 20 +-
.../paimon/flink/source/StaticFileStoreSource.java | 2 +-
.../flink/source/operator/MonitorFunction.java | 2 +-
.../paimon/hive/utils/HiveSplitGenerator.java | 3 +-
.../paimon/spark/ColumnPruningAndPushDown.scala | 2 +-
.../commands/PaimonAnalyzeTableColumnCommand.scala | 7 +-
.../apache/paimon/spark/sources/StreamHelper.scala | 3 +-
36 files changed, 299 insertions(+), 275 deletions(-)
diff --git a/docs/content/maintenance/metrics.md
b/docs/content/maintenance/metrics.md
index 139bdfff6..2c3067267 100644
--- a/docs/content/maintenance/metrics.md
+++ b/docs/content/maintenance/metrics.md
@@ -67,16 +67,6 @@ Below is lists of Paimon built-in metrics. They are
summarized into types of sca
<td>Gauge</td>
<td>Number of scanned manifest files in the last scan.</td>
</tr>
- <tr>
- <td>lastSkippedByPartitionAndStats</td>
- <td>Gauge</td>
- <td>Skipped table files by partition filter and value / key stats
information in the last scan.</td>
- </tr>
- <tr>
- <td>lastSkippedByWholeBucketFilesFilter</td>
- <td>Gauge</td>
- <td>Skipped table files by bucket level value filter (only primary
key table) in the last scan.</td>
- </tr>
<tr>
<td>lastScanSkippedTableFiles</td>
<td>Gauge</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
b/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
index 2764bc773..4d9416252 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
@@ -37,6 +37,13 @@ public interface Filter<T> {
*/
boolean test(T t);
+ default Filter<T> and(Filter<? super T> other) {
+ if (other == null) {
+ return this;
+ }
+ return t -> test(t) && other.test(t);
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
static <T> Filter<T> alwaysTrue() {
return (Filter) ALWAYS_TRUE;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
index 02b5d73fc..112b9ad1c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
@@ -110,7 +110,9 @@ public class ThreadPoolUtils {
if (stack.isEmpty()) {
return;
}
- activeList = randomlyExecute(executor,
processor, stack.poll());
+ activeList =
+ randomlyExecuteSequentialReturn(
+ executor, processor,
stack.poll());
}
}
}
@@ -132,7 +134,7 @@ public class ThreadPoolUtils {
awaitAllFutures(futures);
}
- public static <U, T> Iterator<T> randomlyExecute(
+ public static <U, T> Iterator<T> randomlyExecuteSequentialReturn(
ExecutorService executor, Function<U, List<T>> processor,
Collection<U> input) {
List<Future<List<T>>> futures = new ArrayList<>(input.size());
ClassLoader cl = Thread.currentThread().getContextClassLoader();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
index 842b22316..577f28d0f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
@@ -27,6 +27,7 @@ import
org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintai
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
@@ -441,7 +442,12 @@ public class UnawareAppendTableCompactionCoordinator {
}
if (currentIterator.hasNext()) {
- return currentIterator.next();
+ ManifestEntry entry = currentIterator.next();
+ if (entry.kind() == FileKind.DELETE) {
+ continue;
+ } else {
+ return entry;
+ }
}
currentIterator = null;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 3b3e514e0..91e07a369 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -28,15 +28,17 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
/** Entry representing a file. */
@@ -214,7 +216,11 @@ public interface FileEntry {
return readDeletedEntries(
m ->
manifestFile.read(
- m.fileName(), m.fileSize(),
Filter.alwaysTrue(), deletedFilter()),
+ m.fileName(),
+ m.fileSize(),
+ Filter.alwaysTrue(),
+ deletedFilter(),
+ Filter.alwaysTrue()),
manifestFiles,
manifestReadParallelism);
}
@@ -234,11 +240,11 @@ public interface FileEntry {
.filter(e -> e.kind() == FileKind.DELETE)
.map(FileEntry::identifier)
.collect(Collectors.toList());
- Iterable<Identifier> identifiers =
- sequentialBatchedExecute(processor, manifestFiles,
manifestReadParallelism);
- Set<Identifier> result = new HashSet<>();
- for (Identifier identifier : identifiers) {
- result.add(identifier);
+ Iterator<Identifier> identifiers =
+ randomlyExecuteSequentialReturn(processor, manifestFiles,
manifestReadParallelism);
+ Set<Identifier> result = ConcurrentHashMap.newKeySet();
+ while (identifiers.hasNext()) {
+ result.add(identifiers.next());
}
return result;
}
@@ -247,4 +253,9 @@ public interface FileEntry {
Function<InternalRow, FileKind> getter =
ManifestEntrySerializer.kindGetter();
return row -> getter.apply(row) == FileKind.DELETE;
}
+
+ static Filter<InternalRow> addFilter() {
+ Function<InternalRow, FileKind> getter =
ManifestEntrySerializer.kindGetter();
+ return row -> getter.apply(row) == FileKind.ADD;
+ }
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java
similarity index 54%
copy from paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
copy to
paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java
index 2764bc773..29ae6f638 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java
@@ -16,29 +16,19 @@
* limitations under the License.
*/
-package org.apache.paimon.utils;
+package org.apache.paimon.manifest;
-import org.apache.paimon.predicate.Predicate;
+/** Wrap a {@link ManifestEntry} to contain {@link #selected}. */
+public class FilteredManifestEntry extends ManifestEntry {
-/**
- * Represents a filter (boolean-valued function) of one argument. This class
is for avoiding name
- * conflicting to {@link Predicate}.
- */
-@FunctionalInterface
-public interface Filter<T> {
-
- Filter<?> ALWAYS_TRUE = t -> true;
+ private final boolean selected;
- /**
- * Evaluates this predicate on the given argument.
- *
- * @param t the input argument
- * @return {@code true} if the input argument matches the predicate,
otherwise {@code false}
- */
- boolean test(T t);
+ public FilteredManifestEntry(ManifestEntry entry, boolean selected) {
+ super(entry.kind(), entry.partition(), entry.bucket(),
entry.totalBuckets(), entry.file());
+ this.selected = selected;
+ }
- @SuppressWarnings({"unchecked", "rawtypes"})
- static <T> Filter<T> alwaysTrue() {
- return (Filter) ALWAYS_TRUE;
+ public boolean selected() {
+ return selected;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 38181a823..128f5262a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -211,18 +211,5 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
suggestedFileSize,
cache);
}
-
- public ObjectsFile<SimpleFileEntry> createSimpleFileEntryReader() {
- RowType entryType =
VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA);
- return new ObjectsFile<>(
- fileIO,
- new SimpleFileEntrySerializer(),
- entryType,
- fileFormat.createReaderFactory(entryType),
- fileFormat.createWriterFactory(entryType),
- compression,
- pathFactory.manifestFileFactory(),
- cache);
- }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 0e1f9357e..98e064451 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -23,7 +23,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.manifest.BucketEntry;
import org.apache.paimon.manifest.FileEntry;
-import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileEntry.Identifier;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
@@ -43,8 +43,6 @@ import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -62,6 +60,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
+import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -193,6 +192,11 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan enableValueFilter() {
+ return this;
+ }
+
@Override
public FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter)
{
this.manifestEntryFilter = filter;
@@ -241,47 +245,46 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
Snapshot snapshot = manifestsResult.snapshot;
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
- long startDataFiles =
- manifestsResult.allManifests.stream()
- .mapToLong(f -> f.numAddedFiles() -
f.numDeletedFiles())
- .sum();
-
- Collection<ManifestEntry> mergedEntries =
- readAndMergeFileEntries(manifests, this::readManifest);
-
- long skippedByPartitionAndStats = startDataFiles -
mergedEntries.size();
-
- // We group files by bucket here, and filter them by the whole bucket
filter.
- // Why do this: because in primary key table, we can't just filter the
value
- // by the stat in files (see
`PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
- // but we can do this by filter the whole bucket files
- List<ManifestEntry> files =
- mergedEntries.stream()
- .collect(
- Collectors.groupingBy(
- // we use LinkedHashMap to avoid
disorder
- file -> Pair.of(file.partition(),
file.bucket()),
- LinkedHashMap::new,
- Collectors.toList()))
- .values()
- .stream()
- .map(this::filterWholeBucketByStats)
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ Iterator<ManifestEntry> iterator = readManifestEntries(manifests,
false);
+ List<ManifestEntry> files = new ArrayList<>();
+ while (iterator.hasNext()) {
+ files.add(iterator.next());
+ }
+
+ if (wholeBucketFilterEnabled()) {
+ // We group files by bucket here, and filter them by the whole
bucket filter.
+ // Why do this: because in primary key table, we can't just filter
the value
+ // by the stat in files (see
`PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
+ // but we can do this by filter the whole bucket files
+ files =
+ files.stream()
+ .collect(
+ Collectors.groupingBy(
+ // we use LinkedHashMap to avoid
disorder
+ file -> Pair.of(file.partition(),
file.bucket()),
+ LinkedHashMap::new,
+ Collectors.toList()))
+ .values()
+ .stream()
+ .map(this::filterWholeBucketByStats)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
+ List<ManifestEntry> result = files;
- long skippedByWholeBucketFiles = mergedEntries.size() - files.size();
long scanDuration = (System.nanoTime() - started) / 1_000_000;
- checkState(
- startDataFiles - skippedByPartitionAndStats -
skippedByWholeBucketFiles
- == files.size());
if (scanMetrics != null) {
+ long allDataFiles =
+ manifestsResult.allManifests.stream()
+ .mapToLong(f -> f.numAddedFiles() -
f.numDeletedFiles())
+ .sum();
scanMetrics.reportScan(
new ScanStats(
scanDuration,
manifests.size(),
- skippedByPartitionAndStats,
- skippedByWholeBucketFiles,
- files.size()));
+ allDataFiles - result.size(),
+ result.size()));
}
return new Plan() {
@@ -299,12 +302,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public List<ManifestEntry> files() {
- if (dropStats) {
- return files.stream()
- .map(ManifestEntry::copyWithoutStats)
- .collect(Collectors.toList());
- }
- return files;
+ return result;
}
};
}
@@ -312,9 +310,15 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public List<SimpleFileEntry> readSimpleEntries() {
List<ManifestFileMeta> manifests = readManifests().filteredManifests;
- Collection<SimpleFileEntry> mergedEntries =
- readAndMergeFileEntries(manifests, this::readSimpleEntries);
- return new ArrayList<>(mergedEntries);
+ Iterator<SimpleFileEntry> iterator =
+ scanMode == ScanMode.ALL
+ ? readAndMergeFileEntries(manifests,
SimpleFileEntry::from, false)
+ : readAndNoMergeFileEntries(manifests,
SimpleFileEntry::from, false);
+ List<SimpleFileEntry> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ return result;
}
@Override
@@ -343,23 +347,57 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public Iterator<ManifestEntry> readFileIterator() {
- List<ManifestFileMeta> manifests = readManifests().filteredManifests;
- Set<FileEntry.Identifier> deleteEntries =
- FileEntry.readDeletedEntries(this::readSimpleEntries,
manifests, parallelism);
- Iterator<ManifestEntry> iterator =
- sequentialBatchedExecute(this::readManifest, manifests,
parallelism).iterator();
- return Iterators.filter(
- iterator,
- entry ->
- entry != null
- && entry.kind() == FileKind.ADD
- &&
!deleteEntries.contains(entry.identifier()));
+ // useSequential: reduce memory and iterator can be stopping
+ return readManifestEntries(readManifests().filteredManifests, true);
+ }
+
+ private Iterator<ManifestEntry> readManifestEntries(
+ List<ManifestFileMeta> manifests, boolean useSequential) {
+ return scanMode == ScanMode.ALL
+ ? readAndMergeFileEntries(manifests, Function.identity(),
useSequential)
+ : readAndNoMergeFileEntries(manifests, Function.identity(),
useSequential);
+ }
+
+ private <T extends FileEntry> Iterator<T> readAndMergeFileEntries(
+ List<ManifestFileMeta> manifests,
+ Function<List<ManifestEntry>, List<T>> converter,
+ boolean useSequential) {
+ Set<Identifier> deletedEntries =
+ FileEntry.readDeletedEntries(
+ manifest -> readManifest(manifest,
FileEntry.deletedFilter(), null),
+ manifests,
+ parallelism);
+
+ manifests =
+ manifests.stream()
+ .filter(file -> file.numAddedFiles() > 0)
+ .collect(Collectors.toList());
+
+ Function<ManifestFileMeta, List<T>> processor =
+ manifest ->
+ converter.apply(
+ readManifest(
+ manifest,
+ FileEntry.addFilter(),
+ entry ->
!deletedEntries.contains(entry.identifier())));
+ if (useSequential) {
+ return sequentialBatchedExecute(processor, manifests,
parallelism).iterator();
+ } else {
+ return randomlyExecuteSequentialReturn(processor, manifests,
parallelism);
+ }
}
- public <T extends FileEntry> Collection<T> readAndMergeFileEntries(
- List<ManifestFileMeta> manifests, Function<ManifestFileMeta,
List<T>> manifestReader) {
- return FileEntry.mergeEntries(
- sequentialBatchedExecute(manifestReader, manifests,
parallelism));
+ private <T extends FileEntry> Iterator<T> readAndNoMergeFileEntries(
+ List<ManifestFileMeta> manifests,
+ Function<List<ManifestEntry>, List<T>> converter,
+ boolean useSequential) {
+ Function<ManifestFileMeta, List<T>> reader =
+ manifest -> converter.apply(readManifest(manifest));
+ if (useSequential) {
+ return sequentialBatchedExecute(reader, manifests,
parallelism).iterator();
+ } else {
+ return randomlyExecuteSequentialReturn(reader, manifests,
parallelism);
+ }
}
private ManifestsReader.Result readManifests() {
@@ -384,12 +422,24 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
/** Note: Keep this thread-safe. */
protected abstract boolean filterByStats(ManifestEntry entry);
- /** Note: Keep this thread-safe. */
- protected abstract List<ManifestEntry>
filterWholeBucketByStats(List<ManifestEntry> entries);
+ protected boolean wholeBucketFilterEnabled() {
+ return false;
+ }
+
+ protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry>
entries) {
+ return entries;
+ }
/** Note: Keep this thread-safe. */
@Override
public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
+ return readManifest(manifest, null, null);
+ }
+
+ private List<ManifestEntry> readManifest(
+ ManifestFileMeta manifest,
+ @Nullable Filter<InternalRow> additionalFilter,
+ @Nullable Filter<ManifestEntry> additionalTFilter) {
List<ManifestEntry> entries =
manifestFileFactory
.create()
@@ -397,29 +447,24 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
manifest.fileName(),
manifest.fileSize(),
createCacheRowFilter(),
- createEntryRowFilter());
- List<ManifestEntry> filteredEntries = new ArrayList<>(entries.size());
- for (ManifestEntry entry : entries) {
- if ((manifestEntryFilter == null ||
manifestEntryFilter.test(entry))
- && filterByStats(entry)) {
- filteredEntries.add(entry);
+ createEntryRowFilter().and(additionalFilter),
+ entry ->
+ (additionalTFilter == null ||
additionalTFilter.test(entry))
+ && (manifestEntryFilter == null
+ ||
manifestEntryFilter.test(entry))
+ && filterByStats(entry));
+ if (dropStats) {
+ List<ManifestEntry> copied = new ArrayList<>(entries.size());
+ for (ManifestEntry entry : entries) {
+ copied.add(dropStats(entry));
}
+ entries = copied;
}
- return filteredEntries;
+ return entries;
}
- /** Note: Keep this thread-safe. */
- private List<SimpleFileEntry> readSimpleEntries(ManifestFileMeta manifest)
{
- return manifestFileFactory
- .createSimpleFileEntryReader()
- .read(
- manifest.fileName(),
- manifest.fileSize(),
- // use filter for ManifestEntry
- // currently, projection is not pushed down to file
format
- // see SimpleFileEntrySerializer
- createCacheRowFilter(),
- createEntryRowFilter());
+ protected ManifestEntry dropStats(ManifestEntry entry) {
+ return entry.copyWithoutStats();
}
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 60b4e7933..d2ca5da42 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -34,7 +34,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
/** {@link FileStoreScan} for {@link AppendOnlyFileStore}. */
@@ -100,12 +99,6 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
&& (!fileIndexReadEnabled ||
testFileIndex(entry.file().embeddedIndex(), entry));
}
- @Override
- protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry>
entries) {
- // We don't need to filter per-bucket entries here
- return entries;
- }
-
private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes,
ManifestEntry entry) {
if (embeddedIndexBytes == null) {
return true;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index e643bf161..7663f4822 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -73,6 +73,8 @@ public interface FileStoreScan {
FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
+ FileStoreScan enableValueFilter();
+
FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter);
FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index c368d9e51..8d8c51996 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FilteredManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.predicate.Predicate;
@@ -45,7 +46,6 @@ import java.util.List;
import java.util.Map;
import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
-import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
/** {@link FileStoreScan} for {@link KeyValueFileStore}. */
@@ -64,6 +64,8 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
private final boolean fileIndexReadEnabled;
private final Map<Long, Predicate> schemaId2DataFilter = new HashMap<>();
+ private boolean valueFilterForceEnabled = false;
+
public KeyValueFileStoreScan(
ManifestsReader manifestsReader,
BucketSelectConverter bucketSelectConverter,
@@ -110,11 +112,17 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan enableValueFilter() {
+ this.valueFilterForceEnabled = true;
+ return this;
+ }
+
/** Note: Keep this thread-safe. */
@Override
protected boolean filterByStats(ManifestEntry entry) {
DataFileMeta file = entry.file();
- if (isValueFilterEnabled(entry) && !filterByValueFilter(entry)) {
+ if (isValueFilterEnabled() && !filterByValueFilter(entry)) {
return false;
}
@@ -130,6 +138,14 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
return true;
}
+ @Override
+ protected ManifestEntry dropStats(ManifestEntry entry) {
+ if (!isValueFilterEnabled() && wholeBucketFilterEnabled()) {
+ return new FilteredManifestEntry(entry.copyWithoutStats(),
filterByValueFilter(entry));
+ }
+ return entry.copyWithoutStats();
+ }
+
private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes,
ManifestEntry entry) {
if (embeddedIndexBytes == null) {
return true;
@@ -150,14 +166,14 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
}
}
- private boolean isValueFilterEnabled(ManifestEntry entry) {
+ private boolean isValueFilterEnabled() {
if (valueFilter == null) {
return false;
}
switch (scanMode) {
case ALL:
- return (deletionVectorsEnabled || mergeEngine == FIRST_ROW) &&
entry.level() > 0;
+ return valueFilterForceEnabled;
case DELTA:
return false;
case CHANGELOG:
@@ -168,13 +184,13 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
}
}
- /** Note: Keep this thread-safe. */
@Override
- protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry>
entries) {
- if (valueFilter == null || scanMode != ScanMode.ALL) {
- return entries;
- }
+ protected boolean wholeBucketFilterEnabled() {
+ return valueFilter != null && scanMode == ScanMode.ALL;
+ }
+ @Override
+ protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry>
entries) {
return noOverlapping(entries)
? filterWholeBucketPerFile(entries)
: filterWholeBucketAllFiles(entries);
@@ -207,6 +223,10 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
}
private boolean filterByValueFilter(ManifestEntry entry) {
+ if (entry instanceof FilteredManifestEntry) {
+ return ((FilteredManifestEntry) entry).selected();
+ }
+
DataFileMeta file = entry.file();
SimpleStatsEvolution.Result result =
fieldValueStatsConverters
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index 3ee108c10..a5eea6d65 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -53,7 +53,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
-import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
+import static
org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
/**
@@ -180,7 +180,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
.filter(this::oldEnough)
.map(FileStatus::getPath)
.collect(Collectors.toList());
- Iterator<Path> allPaths = randomlyExecute(executor, processor,
fileDirs);
+ Iterator<Path> allPaths = randomlyExecuteSequentialReturn(executor,
processor, fileDirs);
Map<String, Path> result = new HashMap<>();
while (allPaths.hasNext()) {
Path next = allPaths.next();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
index 9fcbb8960..96f0aec1c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
@@ -49,12 +49,6 @@ public class ScanMetrics {
public static final String SCAN_DURATION = "scanDuration";
public static final String LAST_SCANNED_MANIFESTS = "lastScannedManifests";
- public static final String LAST_SKIPPED_BY_PARTITION_AND_STATS =
- "lastSkippedByPartitionAndStats";
-
- public static final String LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER =
- "lastSkippedByWholeBucketFilesFilter";
-
public static final String LAST_SCAN_SKIPPED_TABLE_FILES =
"lastScanSkippedTableFiles";
public static final String LAST_SCAN_RESULTED_TABLE_FILES =
"lastScanResultedTableFiles";
@@ -66,12 +60,6 @@ public class ScanMetrics {
metricGroup.gauge(
LAST_SCANNED_MANIFESTS,
() -> latestScan == null ? 0L :
latestScan.getScannedManifests());
- metricGroup.gauge(
- LAST_SKIPPED_BY_PARTITION_AND_STATS,
- () -> latestScan == null ? 0L :
latestScan.getSkippedByPartitionAndStats());
- metricGroup.gauge(
- LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER,
- () -> latestScan == null ? 0L :
latestScan.getSkippedByWholeBucketFiles());
metricGroup.gauge(
LAST_SCAN_SKIPPED_TABLE_FILES,
() -> latestScan == null ? 0L :
latestScan.getSkippedTableFiles());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
index e760282e6..700619c36 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
@@ -25,23 +25,15 @@ public class ScanStats {
// the unit is milliseconds
private final long duration;
private final long scannedManifests;
- private final long skippedByPartitionAndStats;
- private final long skippedByWholeBucketFiles;
private final long skippedTableFiles;
private final long resultedTableFiles;
public ScanStats(
- long duration,
- long scannedManifests,
- long skippedByPartitionAndStats,
- long skippedByWholeBucketFiles,
- long resultedTableFiles) {
+ long duration, long scannedManifests, long skippedTableFiles, long
resultedTableFiles) {
this.duration = duration;
this.scannedManifests = scannedManifests;
- this.skippedByPartitionAndStats = skippedByPartitionAndStats;
- this.skippedByWholeBucketFiles = skippedByWholeBucketFiles;
- this.skippedTableFiles = skippedByPartitionAndStats +
skippedByWholeBucketFiles;
+ this.skippedTableFiles = skippedTableFiles;
this.resultedTableFiles = resultedTableFiles;
}
@@ -60,16 +52,6 @@ public class ScanStats {
return resultedTableFiles;
}
- @VisibleForTesting
- protected long getSkippedByPartitionAndStats() {
- return skippedByPartitionAndStats;
- }
-
- @VisibleForTesting
- protected long getSkippedByWholeBucketFiles() {
- return skippedByWholeBucketFiles;
- }
-
@VisibleForTesting
protected long getDuration() {
return duration;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index b4f8fa47d..73c55942a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -68,7 +68,7 @@ import static
org.apache.paimon.CoreOptions.ExpireExecutionMode;
import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
import static
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
import static org.apache.paimon.utils.Preconditions.checkState;
-import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
+import static
org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
/** An abstraction layer above {@link FileStoreCommit} to provide snapshot
commit and expiration. */
public class TableCommitImpl implements InnerTableCommit {
@@ -292,7 +292,7 @@ public class TableCommitImpl implements InnerTableCommit {
List<Path> nonExistFiles =
Lists.newArrayList(
- randomlyExecute(
+ randomlyExecuteSequentialReturn(
getExecutorService(null),
f -> nonExists.test(f) ? singletonList(f) :
emptyList(),
files));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index d3e8a2adb..635802cc9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -51,7 +51,7 @@ public class DataTableBatchScan extends AbstractDataTableScan
{
this.hasNext = true;
this.defaultValueAssigner = defaultValueAssigner;
if (pkTable && (options.deletionVectorsEnabled() ||
options.mergeEngine() == FIRST_ROW)) {
- snapshotReader.withLevelFilter(level -> level > 0);
+ snapshotReader.withLevelFilter(level -> level >
0).enableValueFilter();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 358d86cbe..9bfb54f2c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -31,7 +31,6 @@ import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
@@ -50,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
+import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** {@link StartingScanner} for incremental changes by snapshot. */
@@ -84,7 +84,7 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
.collect(Collectors.toList());
Iterator<ManifestFileMeta> manifests =
- ManifestReadThreadPool.randomlyExecute(
+ randomlyExecuteSequentialReturn(
id -> {
Snapshot snapshot = snapshotManager.snapshot(id);
switch (scanMode) {
@@ -111,7 +111,7 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
reader.parallelism());
Iterator<ManifestEntry> entries =
- ManifestReadThreadPool.randomlyExecute(
+ randomlyExecuteSequentialReturn(
reader::readManifest, Lists.newArrayList(manifests),
reader.parallelism());
while (entries.hasNext()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index b59cf98bb..f3e0a92b8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -77,6 +77,8 @@ public interface SnapshotReader {
SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
+ SnapshotReader enableValueFilter();
+
SnapshotReader withManifestEntryFilter(Filter<ManifestEntry> filter);
SnapshotReader withBucket(int bucket);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 7ce537ee5..ce01bdba9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -234,6 +234,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader enableValueFilter() {
+ scan.enableValueFilter();
+ return this;
+ }
+
@Override
public SnapshotReader withManifestEntryFilter(Filter<ManifestEntry>
filter) {
scan.withManifestEntryFilter(filter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index e56ee9041..b0cbe0772 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -319,6 +319,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader enableValueFilter() {
+ wrapped.enableValueFilter();
+ return this;
+ }
+
@Override
public SnapshotReader withManifestEntryFilter(Filter<ManifestEntry>
filter) {
wrapped.withManifestEntryFilter(filter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index e28ae3760..deb149791 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -120,7 +120,8 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
public SnapshotReader newSnapshotReader() {
if (wrapped.schema().primaryKeys().size() > 0) {
return wrapped.newSnapshotReader()
- .withLevelFilter(level -> level ==
coreOptions().numLevels() - 1);
+ .withLevelFilter(level -> level ==
coreOptions().numLevels() - 1)
+ .enableValueFilter();
} else {
return wrapped.newSnapshotReader();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
index d967e778f..49fcfc8bd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
@@ -54,9 +54,9 @@ public class ManifestReadThreadPool {
}
/** This method aims to parallel process tasks with randomly but return
values sequentially. */
- public static <T, U> Iterator<T> randomlyExecute(
+ public static <T, U> Iterator<T> randomlyExecuteSequentialReturn(
Function<U, List<T>> processor, List<U> input, @Nullable Integer
threadNum) {
ThreadPoolExecutor executor = getExecutorService(threadNum);
- return ThreadPoolUtils.randomlyExecute(executor, processor, input);
+ return ThreadPoolUtils.randomlyExecuteSequentialReturn(executor,
processor, input);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
index 8c490e008..1c9d9664f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -66,11 +66,12 @@ public class ObjectsCache<K, V> {
K key,
@Nullable Long fileSize,
Filter<InternalRow> loadFilter,
- Filter<InternalRow> readFilter)
+ Filter<InternalRow> readFilter,
+ Filter<V> readVFilter)
throws IOException {
Segments segments = cache.getIfPresents(key);
if (segments != null) {
- return readFromSegments(segments, readFilter);
+ return readFromSegments(segments, readFilter, readVFilter);
} else {
if (fileSize == null) {
fileSize = fileSizeFunction.apply(key);
@@ -78,15 +79,16 @@ public class ObjectsCache<K, V> {
if (fileSize <= cache.maxElementSize()) {
segments = readSegments(key, fileSize, loadFilter);
cache.put(key, segments);
- return readFromSegments(segments, readFilter);
+ return readFromSegments(segments, readFilter, readVFilter);
} else {
return readFromIterator(
- reader.apply(key, fileSize), projectedSerializer,
readFilter);
+ reader.apply(key, fileSize), projectedSerializer,
readFilter, readVFilter);
}
}
}
- private List<V> readFromSegments(Segments segments, Filter<InternalRow>
readFilter)
+ private List<V> readFromSegments(
+ Segments segments, Filter<InternalRow> readFilter, Filter<V>
readVFilter)
throws IOException {
InternalRowSerializer formatSerializer = this.formatSerializer.get();
List<V> entries = new ArrayList<>();
@@ -98,7 +100,10 @@ public class ObjectsCache<K, V> {
try {
formatSerializer.mapFromPages(binaryRow, view);
if (readFilter.test(binaryRow)) {
- entries.add(projectedSerializer.fromRow(binaryRow));
+ V v = projectedSerializer.fromRow(binaryRow);
+ if (readVFilter.test(v)) {
+ entries.add(v);
+ }
}
} catch (EOFException e) {
return entries;
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 3c261f410..b0bea8e66 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -94,7 +94,8 @@ public class ObjectsFile<T> implements SimpleFileReader<T> {
}
public List<T> read(String fileName, @Nullable Long fileSize) {
- return read(fileName, fileSize, Filter.alwaysTrue(),
Filter.alwaysTrue());
+ return read(
+ fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue(),
Filter.alwaysTrue());
}
public List<T> readWithIOException(String fileName) throws IOException {
@@ -103,7 +104,8 @@ public class ObjectsFile<T> implements SimpleFileReader<T> {
public List<T> readWithIOException(String fileName, @Nullable Long
fileSize)
throws IOException {
- return readWithIOException(fileName, fileSize, Filter.alwaysTrue(),
Filter.alwaysTrue());
+ return readWithIOException(
+ fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue(),
Filter.alwaysTrue());
}
public boolean exists(String fileName) {
@@ -118,9 +120,10 @@ public class ObjectsFile<T> implements SimpleFileReader<T>
{
String fileName,
@Nullable Long fileSize,
Filter<InternalRow> loadFilter,
- Filter<InternalRow> readFilter) {
+ Filter<InternalRow> readFilter,
+ Filter<T> readTFilter) {
try {
- return readWithIOException(fileName, fileSize, loadFilter,
readFilter);
+ return readWithIOException(fileName, fileSize, loadFilter,
readFilter, readTFilter);
} catch (IOException e) {
throw new RuntimeException("Failed to read " + fileName, e);
}
@@ -130,14 +133,16 @@ public class ObjectsFile<T> implements
SimpleFileReader<T> {
String fileName,
@Nullable Long fileSize,
Filter<InternalRow> loadFilter,
- Filter<InternalRow> readFilter)
+ Filter<InternalRow> readFilter,
+ Filter<T> readTFilter)
throws IOException {
Path path = pathFactory.toPath(fileName);
if (cache != null) {
- return cache.read(path, fileSize, loadFilter, readFilter);
+ return cache.read(path, fileSize, loadFilter, readFilter,
readTFilter);
}
- return readFromIterator(createIterator(path, fileSize), serializer,
readFilter);
+ return readFromIterator(
+ createIterator(path, fileSize), serializer, readFilter,
readTFilter);
}
public String writeWithoutRolling(Collection<T> records) {
@@ -184,13 +189,17 @@ public class ObjectsFile<T> implements
SimpleFileReader<T> {
public static <V> List<V> readFromIterator(
CloseableIterator<InternalRow> inputIterator,
ObjectSerializer<V> serializer,
- Filter<InternalRow> readFilter) {
+ Filter<InternalRow> readFilter,
+ Filter<V> readVFilter) {
try (CloseableIterator<InternalRow> iterator = inputIterator) {
List<V> result = new ArrayList<>();
while (iterator.hasNext()) {
InternalRow row = iterator.next();
if (readFilter.test(row)) {
- result.add(serializer.fromRow(row));
+ V v = serializer.fromRow(row);
+ if (readVFilter.test(v)) {
+ result.add(v);
+ }
}
}
return result;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
index 2b9d0e0cb..a0427d95c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
@@ -48,9 +48,7 @@ public class ScanMetricsTest {
ScanMetrics.SCAN_DURATION,
ScanMetrics.LAST_SCANNED_MANIFESTS,
ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES,
- ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES,
- ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS,
- ScanMetrics.LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER);
+ ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES);
}
/** Tests that the metrics are updated properly. */
@@ -66,14 +64,6 @@ public class ScanMetricsTest {
(Histogram)
registeredGenericMetrics.get(ScanMetrics.SCAN_DURATION);
Gauge<Long> lastScannedManifests =
(Gauge<Long>)
registeredGenericMetrics.get(ScanMetrics.LAST_SCANNED_MANIFESTS);
- Gauge<Long> lastSkippedByPartitionAndStats =
- (Gauge<Long>)
- registeredGenericMetrics.get(
-
ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS);
- Gauge<Long> lastSkippedByWholeBucketFilesFilter =
- (Gauge<Long>)
- registeredGenericMetrics.get(
-
ScanMetrics.LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER);
Gauge<Long> lastScanSkippedTableFiles =
(Gauge<Long>)
registeredGenericMetrics.get(ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES);
@@ -85,8 +75,6 @@ public class ScanMetricsTest {
assertThat(scanDuration.getCount()).isEqualTo(0);
assertThat(scanDuration.getStatistics().size()).isEqualTo(0);
assertThat(lastScannedManifests.getValue()).isEqualTo(0);
- assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(0);
-
assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(0);
assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(0);
assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(0);
@@ -104,9 +92,7 @@ public class ScanMetricsTest {
assertThat(scanDuration.getStatistics().getMax()).isEqualTo(200);
assertThat(scanDuration.getStatistics().getStdDev()).isEqualTo(0);
assertThat(lastScannedManifests.getValue()).isEqualTo(20);
- assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(25);
-
assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(32);
- assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(57);
+ assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(25);
assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(10);
// report again
@@ -123,19 +109,17 @@ public class ScanMetricsTest {
assertThat(scanDuration.getStatistics().getMax()).isEqualTo(500);
assertThat(scanDuration.getStatistics().getStdDev()).isCloseTo(212.132,
offset(0.001));
assertThat(lastScannedManifests.getValue()).isEqualTo(22);
- assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(30);
-
assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(33);
- assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(63);
+ assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(30);
assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(8);
}
private void reportOnce(ScanMetrics scanMetrics) {
- ScanStats scanStats = new ScanStats(200, 20, 25, 32, 10);
+ ScanStats scanStats = new ScanStats(200, 20, 25, 10);
scanMetrics.reportScan(scanStats);
}
private void reportAgain(ScanMetrics scanMetrics) {
- ScanStats scanStats = new ScanStats(500, 22, 30, 33, 8);
+ ScanStats scanStats = new ScanStats(500, 22, 30, 8);
scanMetrics.reportScan(scanStats);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 133913c48..51c8b328d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -121,17 +121,6 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link PrimaryKeyFileStoreTable}. */
public class PrimaryKeyFileStoreTableTest extends FileStoreTableTestBase {
- protected static final RowType COMPATIBILITY_ROW_TYPE =
- RowType.of(
- new DataType[] {
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.BIGINT(),
- DataTypes.BINARY(1),
- DataTypes.VARBINARY(1)
- },
- new String[] {"pt", "a", "b", "c", "d"});
-
protected static final Function<InternalRow, String>
COMPATIBILITY_BATCH_ROW_TO_STRING =
rowData ->
rowData.getInt(0)
@@ -144,12 +133,6 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
+ "|"
+ new String(rowData.getBinary(4));
- protected static final Function<InternalRow, String>
COMPATIBILITY_CHANGELOG_ROW_TO_STRING =
- rowData ->
- rowData.getRowKind().shortString()
- + " "
- + COMPATIBILITY_BATCH_ROW_TO_STRING.apply(rowData);
-
@Test
public void testMultipleWriters() throws Exception {
WriteSelector selector =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
index 8a4f0b061..9d3275e3a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
@@ -58,17 +58,23 @@ public class ObjectsCacheTest {
// test empty
map.put("k1", Collections.emptyList());
- List<String> values = cache.read("k1", null, Filter.alwaysTrue(),
Filter.alwaysTrue());
+ List<String> values =
+ cache.read(
+ "k1", null, Filter.alwaysTrue(), Filter.alwaysTrue(),
Filter.alwaysTrue());
assertThat(values).isEmpty();
// test values
List<String> expect = Arrays.asList("v1", "v2", "v3");
map.put("k2", expect);
- values = cache.read("k2", null, Filter.alwaysTrue(),
Filter.alwaysTrue());
+ values =
+ cache.read(
+ "k2", null, Filter.alwaysTrue(), Filter.alwaysTrue(),
Filter.alwaysTrue());
assertThat(values).containsExactlyElementsOf(expect);
// test cache
- values = cache.read("k2", null, Filter.alwaysTrue(),
Filter.alwaysTrue());
+ values =
+ cache.read(
+ "k2", null, Filter.alwaysTrue(), Filter.alwaysTrue(),
Filter.alwaysTrue());
assertThat(values).containsExactlyElementsOf(expect);
// test filter
@@ -77,7 +83,8 @@ public class ObjectsCacheTest {
"k2",
null,
Filter.alwaysTrue(),
- r -> r.getString(0).toString().endsWith("2"));
+ r -> r.getString(0).toString().endsWith("2"),
+ Filter.alwaysTrue());
assertThat(values).containsExactly("v2");
// test load filter
@@ -88,6 +95,7 @@ public class ObjectsCacheTest {
"k3",
null,
r -> r.getString(0).toString().endsWith("2"),
+ Filter.alwaysTrue(),
Filter.alwaysTrue());
assertThat(values).containsExactly("v2");
@@ -99,6 +107,7 @@ public class ObjectsCacheTest {
"k4",
null,
r -> r.getString(0).toString().endsWith("5"),
+ Filter.alwaysTrue(),
Filter.alwaysTrue());
assertThat(values).isEmpty();
@@ -117,6 +126,7 @@ public class ObjectsCacheTest {
k,
null,
Filter.alwaysTrue(),
+ Filter.alwaysTrue(),
Filter.alwaysTrue()))
.containsExactly(k);
} catch (IOException e) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index b7eb1d625..559976921 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -77,7 +77,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
nextSnapshotId = checkpoint.currentSnapshotId();
splits = checkpoint.splits();
}
- StreamTableScan scan = readBuilder.dropStats().newStreamScan();
+ StreamTableScan scan = readBuilder.newStreamScan();
if (metricGroup(context) != null) {
((StreamDataTableScan) scan)
.withMetricsRegistry(new
FlinkMetricRegistry(context.metricGroup()));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index a648bfba6..b3dcd4840 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -178,7 +178,7 @@ public class FlinkSourceBuilder {
if (limit != null) {
readBuilder.withLimit(limit.intValue());
}
- return readBuilder;
+ return readBuilder.dropStats();
}
private DataStream<RowData> buildStaticFileSource() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 9bfd36fdf..12b579589 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -31,6 +31,7 @@ import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -172,12 +173,7 @@ public abstract class FlinkTableSource
protected void scanSplitsForInference() {
if (splitStatistics == null) {
if (table instanceof DataTable) {
- List<PartitionEntry> partitionEntries =
- table.newReadBuilder()
- .withFilter(predicate)
- .dropStats()
- .newScan()
- .listPartitionEntries();
+ List<PartitionEntry> partitionEntries =
newTableScan().listPartitionEntries();
long totalSize = 0;
long rowCount = 0;
for (PartitionEntry entry : partitionEntries) {
@@ -188,13 +184,7 @@ public abstract class FlinkTableSource
splitStatistics =
new SplitStatistics((int) (totalSize / splitTargetSize
+ 1), rowCount);
} else {
- List<Split> splits =
- table.newReadBuilder()
- .withFilter(predicate)
- .dropStats()
- .newScan()
- .plan()
- .splits();
+ List<Split> splits = newTableScan().plan().splits();
splitStatistics =
new SplitStatistics(
splits.size(),
splits.stream().mapToLong(Split::rowCount).sum());
@@ -202,6 +192,10 @@ public abstract class FlinkTableSource
}
}
+ private TableScan newTableScan() {
+ return
table.newReadBuilder().dropStats().withFilter(predicate).newScan();
+ }
+
/** Split statistics for inferring row count and parallelism size. */
protected static class SplitStatistics {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index c388a6dcc..af425aab5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -87,7 +87,7 @@ public class StaticFileStoreSource extends FlinkSource {
private List<FileStoreSourceSplit> getSplits(SplitEnumeratorContext
context) {
FileStoreSourceSplitGenerator splitGenerator = new
FileStoreSourceSplitGenerator();
- TableScan scan = readBuilder.dropStats().newScan();
+ TableScan scan = readBuilder.newScan();
// register scan metrics
if (context.metricGroup() != null) {
((InnerTableScan) scan)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
index f21922670..3805f6f8c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
@@ -106,7 +106,7 @@ public class MonitorFunction extends
RichSourceFunction<Split>
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {
- this.scan = readBuilder.dropStats().newStreamScan();
+ this.scan = readBuilder.newStreamScan();
this.checkpointState =
context.getOperatorStateStore()
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index 33cbc19e0..144afab8e 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -96,7 +96,8 @@ public class HiveSplitGenerator {
scan.withFilter(PredicateBuilder.and(predicatePerPartition));
}
}
- scan.plan()
+ scan.dropStats()
+ .plan()
.splits()
.forEach(
split ->
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index 95c8f4b3a..f29c146b7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -62,7 +62,7 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
_readBuilder.withFilter(pushedPredicate)
}
pushDownLimit.foreach(_readBuilder.withLimit)
- _readBuilder
+ _readBuilder.dropStats()
}
final def metadataColumns: Seq[PaimonMetadataColumn] = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
index 19f73cb6c..9a88ca2e4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.commands
+import org.apache.paimon.manifest.PartitionEntry
import org.apache.paimon.schema.TableSchema
import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
@@ -64,11 +65,9 @@ case class PaimonAnalyzeTableColumnCommand(
// compute stats
val totalSize = table
.newScan()
- .plan()
- .splits()
+ .listPartitionEntries()
.asScala
- .flatMap { case split: DataSplit => split.dataFiles().asScala }
- .map(_.fileSize())
+ .map(_.fileSizeInBytes())
.sum
val (mergedRecordCount, colStats) =
PaimonStatsUtils.computeColumnStats(sparkSession, relation, attributes)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index b44a66fce..7e61d71ac 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -44,7 +44,8 @@ private[spark] trait StreamHelper {
var lastTriggerMillis: Long
- private lazy val streamScan: StreamDataTableScan = table.newStreamScan()
+ private lazy val streamScan: StreamDataTableScan =
+ table.newStreamScan().dropStats().asInstanceOf[StreamDataTableScan]
private lazy val partitionSchema: StructType =
SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(),
table.partitionKeys()))