tsreaper commented on code in PR #550: URL: https://github.com/apache/flink-table-store/pull/550#discussion_r1119644649
########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java: ########## @@ -18,121 +18,55 @@ package org.apache.flink.table.store.table.sink; -import org.apache.flink.table.store.file.manifest.ManifestCommittable; -import org.apache.flink.table.store.file.operation.FileStoreCommit; -import org.apache.flink.table.store.file.operation.FileStoreExpire; +import org.apache.flink.table.store.annotation.Experimental; +import org.apache.flink.table.store.file.catalog.CatalogLock; import org.apache.flink.table.store.file.operation.Lock; -import org.apache.flink.table.store.file.operation.PartitionExpire; +import org.apache.flink.table.store.table.Table; -import javax.annotation.Nullable; - -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Set; /** - * An abstraction layer above {@link FileStoreCommit} and {@link FileStoreExpire} to provide - * snapshot commit and expiration. + * Commit of {@link Table} to provide {@link CommitMessage} committing. Review Comment: ```suggestion * Create and commit snapshots of a {@link Table}. Snapshots are produced from {@link CommitMessage}s, which themselves are generated by {@link TableWrite}. Also see {@link TableWrite#prepareCommit}. ``` ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java: ########## @@ -18,121 +18,55 @@ package org.apache.flink.table.store.table.sink; -import org.apache.flink.table.store.file.manifest.ManifestCommittable; -import org.apache.flink.table.store.file.operation.FileStoreCommit; -import org.apache.flink.table.store.file.operation.FileStoreExpire; +import org.apache.flink.table.store.annotation.Experimental; +import org.apache.flink.table.store.file.catalog.CatalogLock; import org.apache.flink.table.store.file.operation.Lock; -import org.apache.flink.table.store.file.operation.PartitionExpire; +import org.apache.flink.table.store.table.Table; -import javax.annotation.Nullable; - -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Set; /** - * An abstraction layer above {@link FileStoreCommit} and {@link FileStoreExpire} to provide - * snapshot commit and expiration. + * Commit of {@link Table} to provide {@link CommitMessage} committing. + * + * <ol> + * <li>Before calling {@link TableCommit#commit}, if user cannot determine if this commit is done + * before, user should first call {@link TableCommit#filterCommitted}. + * <li>Before committing, it will first check for conflicts by checking if all files to be removed + * currently exists, and if modified files have overlapping key ranges with existing files. + * <li>After that it use the external {@link CatalogLock} (if provided) or the atomic rename of + * the file system to ensure atomicity. + * <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts. + * <li>If atomic rename fails it tries again after reading the latest snapshot from step 2. + * </ol> + * + * <p>According to the options, the expiration of snapshots and partitions will be completed in + * commit. Review Comment: Why combining `TableCommit` and `TableExpire` together? By calling commit from Java code I guess users do not expect deleting an old snapshot. If users really want to expire a snapshot they should call additional methods. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java: ########## @@ -18,39 +18,48 @@ package org.apache.flink.table.store.table.sink; +import org.apache.flink.table.store.annotation.Experimental; import org.apache.flink.table.store.data.BinaryRow; import org.apache.flink.table.store.data.InternalRow; import org.apache.flink.table.store.file.disk.IOManager; -import org.apache.flink.table.store.file.io.DataFileMeta; +import org.apache.flink.table.store.table.Table; import java.util.List; /** - * An abstraction layer above {@link org.apache.flink.table.store.file.operation.FileStoreWrite} to - * provide {@link InternalRow} writing. + * Write of {@link Table} to provide {@link InternalRow} writing. + * + * @since 0.4.0 */ +@Experimental public interface TableWrite extends AutoCloseable { - TableWrite withOverwrite(boolean overwrite); - + /** With {@link IOManager}, this is needed if 'write-buffer-spillable' is opened. */ TableWrite withIOManager(IOManager ioManager); - SinkRecord write(InternalRow rowData) throws Exception; + /** Compute partition by an input row. */ + BinaryRow getPartition(InternalRow row); - /** Log record need to preserve original pk (which includes partition fields). */ - SinkRecord toLogRecord(SinkRecord record); + /** Compute bucket by an input row. */ + int getBucket(InternalRow row); - void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception; + /** Write a row to the writer. */ + void write(InternalRow row) throws Exception; /** - * Notify that some new files are created at given snapshot in given bucket. + * Compact a bucket of a partition. By default, it will determine whether to perform the + * compaction according to the 'num-sorted-run.compaction-trigger' option. If fullCompaction is + * true, it will force a full compaction, which is expensive. * - * <p>Most probably, these files are created by another job. Currently this method is only used - * by the dedicated compact job to see files created by writer jobs. + * <p>NOTE: In Java API, full compaction is not automatically executed + * ('changelog-producer.compaction-interval' is ignored). If you open 'changelog-producer' of + * 'full-compaction', please execute this method regularly to produce changelog. Review Comment: Move `changelog-producer.compaction-interval` to Flink's option class. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java: ########## @@ -18,121 +18,55 @@ package org.apache.flink.table.store.table.sink; -import org.apache.flink.table.store.file.manifest.ManifestCommittable; -import org.apache.flink.table.store.file.operation.FileStoreCommit; -import org.apache.flink.table.store.file.operation.FileStoreExpire; +import org.apache.flink.table.store.annotation.Experimental; +import org.apache.flink.table.store.file.catalog.CatalogLock; import org.apache.flink.table.store.file.operation.Lock; -import org.apache.flink.table.store.file.operation.PartitionExpire; +import org.apache.flink.table.store.table.Table; -import javax.annotation.Nullable; - -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Set; /** - * An abstraction layer above {@link FileStoreCommit} and {@link FileStoreExpire} to provide - * snapshot commit and expiration. + * Commit of {@link Table} to provide {@link CommitMessage} committing. + * + * <ol> + * <li>Before calling {@link TableCommit#commit}, if user cannot determine if this commit is done + * before, user should first call {@link TableCommit#filterCommitted}. + * <li>Before committing, it will first check for conflicts by checking if all files to be removed + * currently exists, and if modified files have overlapping key ranges with existing files. + * <li>After that it use the external {@link CatalogLock} (if provided) or the atomic rename of + * the file system to ensure atomicity. + * <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts. + * <li>If atomic rename fails it tries again after reading the latest snapshot from step 2. + * </ol> + * + * <p>According to the options, the expiration of snapshots and partitions will be completed in + * commit. + * + * @since 0.4.0 */ -public class TableCommit implements AutoCloseable { - - private final FileStoreCommit commit; - @Nullable private final FileStoreExpire expire; - @Nullable private final PartitionExpire partitionExpire; - - @Nullable private List<Map<String, String>> overwritePartitions = null; - @Nullable private Lock lock; - - public TableCommit( - FileStoreCommit commit, - @Nullable FileStoreExpire expire, - @Nullable PartitionExpire partitionExpire) { - this.commit = commit; - this.expire = expire; - this.partitionExpire = partitionExpire; - } - - public TableCommit withOverwritePartition(@Nullable Map<String, String> overwritePartition) { - if (overwritePartition != null) { - this.overwritePartitions = Collections.singletonList(overwritePartition); - } - return this; - } - - public TableCommit withOverwritePartitions( - @Nullable List<Map<String, String>> overwritePartitions) { - this.overwritePartitions = overwritePartitions; - return this; - } - - public TableCommit withLock(Lock lock) { - commit.withLock(lock); - - if (expire != null) { - expire.withLock(lock); - } - - if (partitionExpire != null) { - partitionExpire.withLock(lock); - } - - this.lock = lock; - return this; - } - - public TableCommit withCreateEmptyCommit(boolean createEmptyCommit) { - commit.withCreateEmptyCommit(createEmptyCommit); - return this; - } - - public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committables) { - return commit.filterCommitted(committables); - } - - public void commit(long identifier, List<FileCommittable> fileCommittables) { - ManifestCommittable committable = new ManifestCommittable(identifier); - for (FileCommittable fileCommittable : fileCommittables) { - committable.addFileCommittable(fileCommittable); - } - commit(Collections.singletonList(committable)); - } - - public void commit(List<ManifestCommittable> committables) { - if (overwritePartitions == null) { - for (ManifestCommittable committable : committables) { - commit.commit(committable, new HashMap<>()); - } - } else { - ManifestCommittable committable; - if (committables.size() > 1) { - throw new RuntimeException( - "Multiple committables appear in overwrite mode, this may be a bug, please report it: " - + committables); - } else if (committables.size() == 1) { - committable = committables.get(0); - } else { - // create an empty committable - // identifier is Long.MAX_VALUE, come from batch job - // TODO maybe it can be produced by CommitterOperator - committable = new ManifestCommittable(Long.MAX_VALUE); - } - commit.overwrite(overwritePartitions, committable, new HashMap<>()); - } - - if (expire != null) { - expire.expire(); - } - - if (partitionExpire != null) { - partitionExpire.expire(); - } - } - - @Override - public void close() throws Exception { - if (lock != null) { - lock.close(); - } - } +@Experimental +public interface TableCommit extends AutoCloseable { + + /** + * Default ignore empty commit, if this is set to false, when there is no new data, an empty + * commit will also be created. + * + * <p>NOTE: It is recommended to set 'ignoreEmptyCommit' to false in streaming write, in order + * to better remove duplicate commits (See {@link #filterCommitted}). + */ + TableCommit ignoreEmptyCommit(boolean ignoreEmptyCommit); + + /** Filter committed commits. This method is used for failover cases. */ + Set<Long> filterCommitted(Set<Long> commitIdentifiers); + + /** @deprecated lock should pass from table. */ + @Deprecated + TableCommit withLock(Lock lock); Review Comment: No need for `@Deprecated`. Just move this method to `InnerTableCommit`. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilder.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.table.source; + +import org.apache.flink.table.store.annotation.Experimental; +import org.apache.flink.table.store.data.InternalRow; +import org.apache.flink.table.store.file.predicate.Predicate; +import org.apache.flink.table.store.file.predicate.PredicateBuilder; +import org.apache.flink.table.store.types.RowType; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +/** + * An interface for building the {@link TableScan} and {@link TableRead}. + * + * <p>Example of distributed reading: + * + * <pre>{@code + * // 1. Create a ReadBuilder (Serializable) + * Table table = catalog.getTable(...); + * ReadBuilder builder = table.newReadBuilder() + * .withFilter(...) + * .withProjection(...); + * + * // 2. Plan splits in 'Coordinator' (or named 'Driver'): + * List<Split> splits = builder.newScan().plan().splits(); + * + * // 3. Distribute these splits to different tasks + * + * // 4. Read a split in task + * TableRead read = builder.newRead(); + * RecordReader<InternalRow> reader = read.createReader(split); + * reader.forEachRemaining(...); + * + * }</pre> + * + * <p>NOTE: {@link InternalRow} cannot be saved in memory. It may be reused internally, so you need + * to convert it into your own data structure or copy it. + * + * @since 0.4.0 + */ +@Experimental +public interface ReadBuilder extends Serializable { + + /** A name to identify the table. */ + String tableName(); + + /** Returns read row type, projected by {@link #withProjection}. */ + RowType readType(); + + /** + * Push filters, the relationship between predicates is 'AND'. It will filter the data as much + * as possible, but it is not guaranteed that it is a complete filter. + */ + default ReadBuilder withFilter(List<Predicate> predicates) { + if (predicates == null || predicates.isEmpty()) { + return this; + } + return withFilter(PredicateBuilder.and(predicates)); + } + + /** + * Push filters, will filter the data as much as possible, but it is not guaranteed that it is a + * complete filter. + */ + ReadBuilder withFilter(Predicate predicate); + + /** Push top-level projection. */ Review Comment: ```suggestion /** * Apply projection to the reader. * * <p>NOTE: Nested row projection is currently not supported. */ ``` ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java: ########## @@ -18,39 +18,48 @@ package org.apache.flink.table.store.table.sink; +import org.apache.flink.table.store.annotation.Experimental; import org.apache.flink.table.store.data.BinaryRow; import org.apache.flink.table.store.data.InternalRow; import org.apache.flink.table.store.file.disk.IOManager; -import org.apache.flink.table.store.file.io.DataFileMeta; +import org.apache.flink.table.store.table.Table; import java.util.List; /** - * An abstraction layer above {@link org.apache.flink.table.store.file.operation.FileStoreWrite} to - * provide {@link InternalRow} writing. + * Write of {@link Table} to provide {@link InternalRow} writing. + * + * @since 0.4.0 */ +@Experimental public interface TableWrite extends AutoCloseable { - TableWrite withOverwrite(boolean overwrite); - + /** With {@link IOManager}, this is needed if 'write-buffer-spillable' is opened. */ Review Comment: ```suggestion /** With {@link IOManager}, this is needed if 'write-buffer-spillable' is set to true. */ ``` ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java: ########## @@ -18,39 +18,48 @@ package org.apache.flink.table.store.table.sink; +import org.apache.flink.table.store.annotation.Experimental; import org.apache.flink.table.store.data.BinaryRow; import org.apache.flink.table.store.data.InternalRow; import org.apache.flink.table.store.file.disk.IOManager; -import org.apache.flink.table.store.file.io.DataFileMeta; +import org.apache.flink.table.store.table.Table; import java.util.List; /** - * An abstraction layer above {@link org.apache.flink.table.store.file.operation.FileStoreWrite} to - * provide {@link InternalRow} writing. + * Write of {@link Table} to provide {@link InternalRow} writing. + * + * @since 0.4.0 */ +@Experimental public interface TableWrite extends AutoCloseable { - TableWrite withOverwrite(boolean overwrite); - + /** With {@link IOManager}, this is needed if 'write-buffer-spillable' is opened. */ TableWrite withIOManager(IOManager ioManager); - SinkRecord write(InternalRow rowData) throws Exception; + /** Compute partition by an input row. */ Review Comment: ```suggestion /** Calculate which partition {@code row} belongs to. */ ``` ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java: ########## @@ -18,39 +18,48 @@ package org.apache.flink.table.store.table.sink; +import org.apache.flink.table.store.annotation.Experimental; import org.apache.flink.table.store.data.BinaryRow; import org.apache.flink.table.store.data.InternalRow; import org.apache.flink.table.store.file.disk.IOManager; -import org.apache.flink.table.store.file.io.DataFileMeta; +import org.apache.flink.table.store.table.Table; import java.util.List; /** - * An abstraction layer above {@link org.apache.flink.table.store.file.operation.FileStoreWrite} to - * provide {@link InternalRow} writing. + * Write of {@link Table} to provide {@link InternalRow} writing. + * + * @since 0.4.0 */ +@Experimental public interface TableWrite extends AutoCloseable { - TableWrite withOverwrite(boolean overwrite); - + /** With {@link IOManager}, this is needed if 'write-buffer-spillable' is opened. */ TableWrite withIOManager(IOManager ioManager); - SinkRecord write(InternalRow rowData) throws Exception; + /** Compute partition by an input row. */ + BinaryRow getPartition(InternalRow row); - /** Log record need to preserve original pk (which includes partition fields). */ - SinkRecord toLogRecord(SinkRecord record); + /** Compute bucket by an input row. */ Review Comment: ```suggestion /** Calculate which bucket {@code row} belongs to. */ ``` ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java: ########## @@ -18,121 +18,55 @@ package org.apache.flink.table.store.table.sink; -import org.apache.flink.table.store.file.manifest.ManifestCommittable; -import org.apache.flink.table.store.file.operation.FileStoreCommit; -import org.apache.flink.table.store.file.operation.FileStoreExpire; +import org.apache.flink.table.store.annotation.Experimental; +import org.apache.flink.table.store.file.catalog.CatalogLock; import org.apache.flink.table.store.file.operation.Lock; -import org.apache.flink.table.store.file.operation.PartitionExpire; +import org.apache.flink.table.store.table.Table; -import javax.annotation.Nullable; - -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Set; /** - * An abstraction layer above {@link FileStoreCommit} and {@link FileStoreExpire} to provide - * snapshot commit and expiration. + * Commit of {@link Table} to provide {@link CommitMessage} committing. + * + * <ol> + * <li>Before calling {@link TableCommit#commit}, if user cannot determine if this commit is done + * before, user should first call {@link TableCommit#filterCommitted}. + * <li>Before committing, it will first check for conflicts by checking if all files to be removed + * currently exists, and if modified files have overlapping key ranges with existing files. + * <li>After that it use the external {@link CatalogLock} (if provided) or the atomic rename of + * the file system to ensure atomicity. + * <li>If commit fails due to conflicts or exception it tries its best to clean up and aborts. + * <li>If atomic rename fails it tries again after reading the latest snapshot from step 2. + * </ol> Review Comment: This is the implementation detail of `FileStoreCommitImpl`. Users don't have to understand them. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteBuilder.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.table.sink; + +import org.apache.flink.table.store.annotation.Experimental; +import org.apache.flink.table.store.data.InternalRow; +import org.apache.flink.table.store.types.RowType; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * An interface for building the {@link TableWrite} and {@link TableCommit}. + * + * @since 0.4.0 + */ +@Experimental +public interface WriteBuilder extends Serializable { Review Comment: There are detailed example in `ReadBuilder`. Also add example in `WriteBuilder`? ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilder.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.table.source; + +import org.apache.flink.table.store.annotation.Experimental; +import org.apache.flink.table.store.data.InternalRow; +import org.apache.flink.table.store.file.predicate.Predicate; +import org.apache.flink.table.store.file.predicate.PredicateBuilder; +import org.apache.flink.table.store.types.RowType; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +/** + * An interface for building the {@link TableScan} and {@link TableRead}. + * + * <p>Example of distributed reading: + * + * <pre>{@code + * // 1. Create a ReadBuilder (Serializable) + * Table table = catalog.getTable(...); + * ReadBuilder builder = table.newReadBuilder() + * .withFilter(...) + * .withProjection(...); + * + * // 2. Plan splits in 'Coordinator' (or named 'Driver'): + * List<Split> splits = builder.newScan().plan().splits(); + * + * // 3. Distribute these splits to different tasks + * + * // 4. Read a split in task + * TableRead read = builder.newRead(); + * RecordReader<InternalRow> reader = read.createReader(split); + * reader.forEachRemaining(...); + * + * }</pre> + * + * <p>NOTE: {@link InternalRow} cannot be saved in memory. It may be reused internally, so you need + * to convert it into your own data structure or copy it. + * + * @since 0.4.0 + */ +@Experimental +public interface ReadBuilder extends Serializable { + + /** A name to identify the table. */ + String tableName(); + + /** Returns read row type, projected by {@link #withProjection}. */ + RowType readType(); + + /** + * Push filters, the relationship between predicates is 'AND'. It will filter the data as much + * as possible, but it is not guaranteed that it is a complete filter. + */ Review Comment: ```suggestion /** * Apply filters to the readers to decrease the number of produced records. * * <p>This interface filters records as much as possible, however some produced records may not satisfy all predicates. Users need to recheck all records. */ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org