This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.2 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push: new 2cddc8f7 Revert "[FLINK-29345] Create reusing reader/writer config in orc format" 2cddc8f7 is described below commit 2cddc8f7df36a2ecce77184a4a6060bfe1f0a916 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Wed Sep 28 17:45:16 2022 +0800 Revert "[FLINK-29345] Create reusing reader/writer config in orc format" This reverts commit 835632c6e4758ad7d11ccbdb3a8ebb8dfa6aa709. --- .../store/format/orc/OrcBulkWriterFactory.java | 116 --------------------- .../table/store/format/orc/OrcFileFormat.java | 35 +++---- .../store/format/orc/OrcBulkWriterFactoryTest.java | 83 --------------- .../table/store/format/orc/OrcFileFormatTest.java | 9 +- 4 files changed, 18 insertions(+), 225 deletions(-) diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java deleted file mode 100644 index b6670392..00000000 --- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.store.format.orc; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.serialization.BulkWriter; -import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.orc.vector.Vectorizer; -import org.apache.flink.orc.writer.PhysicalWriterImpl; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.OrcFile; -import org.apache.orc.Writer; -import org.apache.orc.impl.WriterImpl; - -import java.io.IOException; -import java.util.UUID; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Orc {@link BulkWriter.Factory}. The main code is copied from Flink {@code OrcBulkWriterFactory}. - */ -public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> { - - private final Vectorizer<T> vectorizer; - private final OrcFile.WriterOptions writerOptions; - - /** - * Creates a new OrcBulkWriterFactory using the provided Vectorizer, ORC WriterOptions. - * - * @param vectorizer The vectorizer implementation to convert input record to a - * VectorizerRowBatch. - * @param writerOptions ORC WriterOptions. - */ - public OrcBulkWriterFactory(Vectorizer<T> vectorizer, OrcFile.WriterOptions writerOptions) { - this.vectorizer = checkNotNull(vectorizer); - this.writerOptions = checkNotNull(writerOptions); - } - - @Override - public BulkWriter<T> create(FSDataOutputStream out) throws IOException { - OrcFile.WriterOptions opts = getWriterOptions(); - opts.physicalWriter(new PhysicalWriterImpl(out, opts)); - - // The path of the Writer is not used to indicate the destination file - // in this case since we have used a dedicated physical writer to write - // to the give output stream directly. However, the path would be used as - // the key of writer in the ORC memory manager, thus we need to make it unique. - Path unusedPath = new Path(UUID.randomUUID().toString()); - return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, unusedPath, opts)); - } - - @VisibleForTesting - protected OrcFile.WriterOptions getWriterOptions() { - return writerOptions; - } - - /** Orc {@link BulkWriter}. The main code is copied from Flink {@code OrcBulkWriter}. */ - private static class OrcBulkWriter<T> implements BulkWriter<T> { - - private final Writer writer; - private final Vectorizer<T> vectorizer; - private final VectorizedRowBatch rowBatch; - - OrcBulkWriter(Vectorizer<T> vectorizer, Writer writer) { - this.vectorizer = checkNotNull(vectorizer); - this.writer = checkNotNull(writer); - this.rowBatch = vectorizer.getSchema().createRowBatch(); - - // Configure the vectorizer with the writer so that users can add - // metadata on the fly through the Vectorizer#vectorize(...) method. - this.vectorizer.setWriter(this.writer); - } - - @Override - public void addElement(T element) throws IOException { - vectorizer.vectorize(element, rowBatch); - if (rowBatch.size == rowBatch.getMaxSize()) { - writer.addRowBatch(rowBatch); - rowBatch.reset(); - } - } - - @Override - public void flush() throws IOException { - if (rowBatch.size != 0) { - writer.addRowBatch(rowBatch); - rowBatch.reset(); - } - } - - @Override - public void finish() throws IOException { - flush(); - writer.close(); - } - } -} diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java index 5d07840a..399a0db2 100644 --- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java +++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java @@ -27,8 +27,7 @@ import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.orc.OrcFilters; import org.apache.flink.orc.OrcSplitReaderUtil; import org.apache.flink.orc.vector.RowDataVectorizer; -import org.apache.flink.orc.vector.Vectorizer; -import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration; +import org.apache.flink.orc.writer.OrcBulkWriterFactory; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.RowData; import org.apache.flink.table.store.file.predicate.Predicate; @@ -40,7 +39,6 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; -import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import java.util.ArrayList; @@ -54,24 +52,16 @@ import static org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENT /** Orc {@link FileFormat}. The main code is copied from Flink {@code OrcFileFormatFactory}. */ public class OrcFileFormat extends FileFormat { - private final Properties orcProperties; - private final org.apache.hadoop.conf.Configuration readerConf; - private final org.apache.hadoop.conf.Configuration writerConf; + private final Configuration formatOptions; public OrcFileFormat(Configuration formatOptions) { super(org.apache.flink.orc.OrcFileFormatFactory.IDENTIFIER); - this.orcProperties = getOrcProperties(formatOptions); - this.readerConf = new org.apache.hadoop.conf.Configuration(); - this.orcProperties.forEach((k, v) -> readerConf.set(k.toString(), v.toString())); - - org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - this.writerConf = new ThreadLocalClassLoaderConfiguration(); - conf.forEach(entry -> writerConf.set(entry.getKey(), entry.getValue())); + this.formatOptions = formatOptions; } @VisibleForTesting - Properties orcProperties() { - return orcProperties; + Configuration formatOptions() { + return formatOptions; } @Override @@ -93,8 +83,12 @@ public class OrcFileFormat extends FileFormat { } } + Properties properties = getOrcProperties(formatOptions); + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + properties.forEach((k, v) -> conf.set(k.toString(), v.toString())); + return OrcInputFormatFactory.create( - readerConf, + conf, (RowType) refineLogicalType(type), Projection.of(projection).toTopLevelIndexes(), orcPredicates); @@ -106,12 +100,11 @@ public class OrcFileFormat extends FileFormat { TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(refineLogicalType(type)); - Vectorizer<RowData> vectorizer = - new RowDataVectorizer(typeDescription.toString(), orcTypes); - OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(orcProperties, writerConf); - writerOptions.setSchema(vectorizer.getSchema()); - return new OrcBulkWriterFactory<>(vectorizer, writerOptions); + return new OrcBulkWriterFactory<>( + new RowDataVectorizer(typeDescription.toString(), orcTypes), + getOrcProperties(formatOptions), + new org.apache.hadoop.conf.Configuration()); } private static Properties getOrcProperties(ReadableConfig options) { diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactoryTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactoryTest.java deleted file mode 100644 index 374bc1c6..00000000 --- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactoryTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.store.format.orc; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.local.LocalDataOutputStream; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.VarCharType; - -import org.apache.hadoop.fs.Path; -import org.apache.orc.MemoryManager; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; - -/** Test for {@link OrcBulkWriterFactory}. */ -public class OrcBulkWriterFactoryTest { - @TempDir File tempDir; - - @Test - public void testNotOverrideInMemoryManager() throws IOException { - OrcFileFormat orcFileFormat = new OrcFileFormat(new Configuration()); - OrcBulkWriterFactory<RowData> factory = - (OrcBulkWriterFactory<RowData>) - orcFileFormat.createWriterFactory( - RowType.of(new VarCharType(), new IntType())); - - TestMemoryManager memoryManager = new TestMemoryManager(); - factory.getWriterOptions().memory(memoryManager); - - factory.create(new LocalDataOutputStream(new File(tempDir, UUID.randomUUID().toString()))); - factory.create(new LocalDataOutputStream(new File(tempDir, UUID.randomUUID().toString()))); - - List<Path> addedWriterPath = memoryManager.getAddedWriterPath(); - assertEquals(2, addedWriterPath.size()); - assertNotEquals(addedWriterPath.get(1), addedWriterPath.get(0)); - } - - private static class TestMemoryManager implements MemoryManager { - private final List<Path> addedWriterPath = new ArrayList<>(); - - @Override - public void addWriter(Path path, long requestedAllocation, Callback callback) { - addedWriterPath.add(path); - } - - public List<Path> getAddedWriterPath() { - return addedWriterPath; - } - - @Override - public void removeWriter(Path path) {} - - @Override - public void addedRow(int rows) {} - } -} diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileFormatTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileFormatTest.java index ba1dbacb..b99d337f 100644 --- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileFormatTest.java +++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileFormatTest.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; -import static org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENTIFIER; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link OrcFileFormatFactory}. */ @@ -33,8 +32,8 @@ public class OrcFileFormatTest { Configuration options = new Configuration(); options.setString("haha", "1"); OrcFileFormat orc = new OrcFileFormatFactory().create(options); - assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", "")).isEqualTo("1"); - assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", "")).isEqualTo("lz4"); + assertThat(orc.formatOptions().getString("haha", "")).isEqualTo("1"); + assertThat(orc.formatOptions().getString("compress", "")).isEqualTo("lz4"); } @Test @@ -43,7 +42,7 @@ public class OrcFileFormatTest { options.setString("haha", "1"); options.setString("compress", "zlib"); OrcFileFormat orc = new OrcFileFormatFactory().create(options); - assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", "")).isEqualTo("1"); - assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", "")).isEqualTo("zlib"); + assertThat(orc.formatOptions().getString("haha", "")).isEqualTo("1"); + assertThat(orc.formatOptions().getString("compress", "")).isEqualTo("zlib"); } }