This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e11702b [FLINK-29293] Set level to KeyValue when reading from data 
files (#310)
8e11702b is described below

commit 8e11702be1fda0ad5692eaf78fce2ff914c36a62
Author: tsreaper <tsreape...@gmail.com>
AuthorDate: Mon Oct 10 10:06:25 2022 +0800

    [FLINK-29293] Set level to KeyValue when reading from data files (#310)
    
    This closes #310.
---
 .../apache/flink/table/store/file/KeyValue.java    | 29 ++++++++++++++--------
 .../file/io/KeyValueDataFileRecordReader.java      | 21 +++++++++-------
 .../store/file/io/KeyValueFileReaderFactory.java   |  5 ++--
 .../store/file/mergetree/MergeTreeReader.java      |  2 +-
 .../file/operation/KeyValueFileStoreRead.java      |  2 +-
 .../store/file/io/KeyValueFileReadWriteTest.java   |  6 +++--
 6 files changed, 40 insertions(+), 25 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index 5f124fe4..fcea7467 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -39,11 +39,15 @@ import java.util.stream.IntStream;
 public class KeyValue {
 
     public static final long UNKNOWN_SEQUENCE = -1;
+    public static final int UNKNOWN_LEVEL = -1;
 
     private RowData key;
+    // determined after written into memory table or read from file
     private long sequenceNumber;
     private RowKind valueKind;
     private RowData value;
+    // determined after read from file
+    private int level;
 
     public KeyValue replace(RowData key, RowKind valueKind, RowData value) {
         return replace(key, UNKNOWN_SEQUENCE, valueKind, value);
@@ -54,6 +58,7 @@ public class KeyValue {
         this.sequenceNumber = sequenceNumber;
         this.valueKind = valueKind;
         this.value = value;
+        this.level = UNKNOWN_LEVEL;
         return this;
     }
 
@@ -78,6 +83,15 @@ public class KeyValue {
         return value;
     }
 
+    public int level() {
+        return level;
+    }
+
+    public KeyValue setLevel(int level) {
+        this.level = level;
+        return this;
+    }
+
     public static RowType schema(RowType keyType, RowType valueType) {
         List<RowType.RowField> fields = new ArrayList<>(keyType.getFields());
         fields.add(new RowType.RowField("_SEQUENCE_NUMBER", new 
BigIntType(false)));
@@ -120,22 +134,17 @@ public class KeyValue {
                         keySerializer.copy(key),
                         sequenceNumber,
                         valueKind,
-                        valueSerializer.copy(value));
+                        valueSerializer.copy(value))
+                .setLevel(level);
     }
 
     @VisibleForTesting
     public String toString(RowType keyType, RowType valueType) {
         String keyString = rowDataToString(key, keyType);
         String valueString = rowDataToString(value, valueType);
-        return "{kind: "
-                + valueKind.name()
-                + ", seq: "
-                + sequenceNumber
-                + ", key: ("
-                + keyString
-                + "), value: ("
-                + valueString
-                + ")}";
+        return String.format(
+                "{kind: %s, seq: %d, key: (%s), value: (%s), level: %d}",
+                valueKind.name(), sequenceNumber, keyString, valueString, 
level);
     }
 
     public static String rowDataToString(RowData row, RowType type) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileRecordReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileRecordReader.java
index cd03c3de..5be4b733 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileRecordReader.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileRecordReader.java
@@ -38,22 +38,25 @@ public class KeyValueDataFileRecordReader implements 
RecordReader<KeyValue> {
 
     private final BulkFormat.Reader<RowData> reader;
     private final KeyValueSerializer serializer;
+    private final int level;
 
     public KeyValueDataFileRecordReader(
             BulkFormat<RowData, FileSourceSplit> readerFactory,
             Path path,
             RowType keyType,
-            RowType valueType)
+            RowType valueType,
+            int level)
             throws IOException {
         this.reader = FileUtils.createFormatReader(readerFactory, path);
         this.serializer = new KeyValueSerializer(keyType, valueType);
+        this.level = level;
     }
 
     @Nullable
     @Override
     public RecordIterator<KeyValue> readBatch() throws IOException {
         BulkFormat.RecordIterator<RowData> iterator = reader.readBatch();
-        return iterator == null ? null : new 
KeyValueDataFileRecordIterator(iterator, serializer);
+        return iterator == null ? null : new 
KeyValueDataFileRecordIterator(iterator);
     }
 
     @Override
@@ -61,16 +64,12 @@ public class KeyValueDataFileRecordReader implements 
RecordReader<KeyValue> {
         reader.close();
     }
 
-    private static class KeyValueDataFileRecordIterator
-            implements RecordReader.RecordIterator<KeyValue> {
+    private class KeyValueDataFileRecordIterator implements 
RecordReader.RecordIterator<KeyValue> {
 
         private final BulkFormat.RecordIterator<RowData> iterator;
-        private final KeyValueSerializer serializer;
 
-        private KeyValueDataFileRecordIterator(
-                BulkFormat.RecordIterator<RowData> iterator, 
KeyValueSerializer serializer) {
+        private 
KeyValueDataFileRecordIterator(BulkFormat.RecordIterator<RowData> iterator) {
             this.iterator = iterator;
-            this.serializer = serializer;
         }
 
         @Override
@@ -78,7 +77,11 @@ public class KeyValueDataFileRecordReader implements 
RecordReader<KeyValue> {
             RecordAndPosition<RowData> result = iterator.next();
 
             // TODO schema evolution
-            return result == null ? null : 
serializer.fromRow(result.getRecord());
+            if (result == null) {
+                return null;
+            } else {
+                return serializer.fromRow(result.getRecord()).setLevel(level);
+            }
         }
 
         @Override
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileReaderFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileReaderFactory.java
index 5b414c23..a1064a64 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileReaderFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileReaderFactory.java
@@ -64,9 +64,10 @@ public class KeyValueFileReaderFactory {
         this.pathFactory = pathFactory;
     }
 
-    public RecordReader<KeyValue> createRecordReader(String fileName) throws 
IOException {
+    public RecordReader<KeyValue> createRecordReader(String fileName, int 
level)
+            throws IOException {
         return new KeyValueDataFileRecordReader(
-                readerFactory, pathFactory.toPath(fileName), keyType, 
valueType);
+                readerFactory, pathFactory.toPath(fileName), keyType, 
valueType, level);
     }
 
     public static Builder builder(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
index 67e65b06..444d10ce 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
@@ -126,7 +126,7 @@ public class MergeTreeReader implements 
RecordReader<KeyValue> {
             SortedRun run, KeyValueFileReaderFactory readerFactory) throws 
IOException {
         List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
         for (DataFileMeta file : run.files()) {
-            readers.add(() -> 
readerFactory.createRecordReader(file.fileName()));
+            readers.add(() -> 
readerFactory.createRecordReader(file.fileName(), file.level()));
         }
         return ConcatRecordReader.create(readers);
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index d07d69c2..0565b1a5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -136,7 +136,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
                 suppliers.add(
                         () ->
                                 readerFactory.createRecordReader(
-                                        
changelogFile(file).orElse(file.fileName())));
+                                        
changelogFile(file).orElse(file.fileName()), file.level()));
             }
             return ConcatRecordReader.create(suppliers);
         } else {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/KeyValueFileReadWriteTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/KeyValueFileReadWriteTest.java
index 6f87316e..5633c8b1 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/KeyValueFileReadWriteTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/KeyValueFileReadWriteTest.java
@@ -68,7 +68,7 @@ public class KeyValueFileReadWriteTest {
     public void testReadNonExistentFile() {
         KeyValueFileReaderFactory readerFactory =
                 createReaderFactory(tempDir.toString(), "avro", null, null);
-        assertThatThrownBy(() -> 
readerFactory.createRecordReader("dummy_file"))
+        assertThatThrownBy(() -> 
readerFactory.createRecordReader("dummy_file", 0))
                 .hasMessageContaining(
                         "you can configure 'snapshot.time-retained' option 
with a larger value.");
     }
@@ -289,7 +289,8 @@ public class KeyValueFileReadWriteTest {
         for (DataFileMeta meta : actualMetas) {
             // check the contents of data file
             CloseableIterator<KeyValue> actualKvsIterator =
-                    new 
RecordReaderIterator<>(readerFactory.createRecordReader(meta.fileName()));
+                    new RecordReaderIterator<>(
+                            readerFactory.createRecordReader(meta.fileName(), 
meta.level()));
             while (actualKvsIterator.hasNext()) {
                 assertThat(expectedIterator.hasNext()).isTrue();
                 KeyValue actualKv = actualKvsIterator.next();
@@ -300,6 +301,7 @@ public class KeyValueFileReadWriteTest {
                                         keySerializer,
                                         projectedValueSerializer))
                         .isTrue();
+                assertThat(actualKv.level()).isEqualTo(meta.level());
             }
             actualKvsIterator.close();
 

Reply via email to