This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 2cddc8f7 Revert "[FLINK-29345] Create reusing reader/writer config in 
orc format"
2cddc8f7 is described below

commit 2cddc8f7df36a2ecce77184a4a6060bfe1f0a916
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Wed Sep 28 17:45:16 2022 +0800

    Revert "[FLINK-29345] Create reusing reader/writer config in orc format"
    
    This reverts commit 835632c6e4758ad7d11ccbdb3a8ebb8dfa6aa709.
---
 .../store/format/orc/OrcBulkWriterFactory.java     | 116 ---------------------
 .../table/store/format/orc/OrcFileFormat.java      |  35 +++----
 .../store/format/orc/OrcBulkWriterFactoryTest.java |  83 ---------------
 .../table/store/format/orc/OrcFileFormatTest.java  |   9 +-
 4 files changed, 18 insertions(+), 225 deletions(-)

diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java
deleted file mode 100644
index b6670392..00000000
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactory.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.format.orc;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.orc.vector.Vectorizer;
-import org.apache.flink.orc.writer.PhysicalWriterImpl;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Writer;
-import org.apache.orc.impl.WriterImpl;
-
-import java.io.IOException;
-import java.util.UUID;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Orc {@link BulkWriter.Factory}. The main code is copied from Flink {@code 
OrcBulkWriterFactory}.
- */
-public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
-
-    private final Vectorizer<T> vectorizer;
-    private final OrcFile.WriterOptions writerOptions;
-
-    /**
-     * Creates a new OrcBulkWriterFactory using the provided Vectorizer, ORC 
WriterOptions.
-     *
-     * @param vectorizer The vectorizer implementation to convert input record 
to a
-     *     VectorizerRowBatch.
-     * @param writerOptions ORC WriterOptions.
-     */
-    public OrcBulkWriterFactory(Vectorizer<T> vectorizer, 
OrcFile.WriterOptions writerOptions) {
-        this.vectorizer = checkNotNull(vectorizer);
-        this.writerOptions = checkNotNull(writerOptions);
-    }
-
-    @Override
-    public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
-        OrcFile.WriterOptions opts = getWriterOptions();
-        opts.physicalWriter(new PhysicalWriterImpl(out, opts));
-
-        // The path of the Writer is not used to indicate the destination file
-        // in this case since we have used a dedicated physical writer to write
-        // to the give output stream directly. However, the path would be used 
as
-        // the key of writer in the ORC memory manager, thus we need to make 
it unique.
-        Path unusedPath = new Path(UUID.randomUUID().toString());
-        return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, 
unusedPath, opts));
-    }
-
-    @VisibleForTesting
-    protected OrcFile.WriterOptions getWriterOptions() {
-        return writerOptions;
-    }
-
-    /** Orc {@link BulkWriter}. The main code is copied from Flink {@code 
OrcBulkWriter}. */
-    private static class OrcBulkWriter<T> implements BulkWriter<T> {
-
-        private final Writer writer;
-        private final Vectorizer<T> vectorizer;
-        private final VectorizedRowBatch rowBatch;
-
-        OrcBulkWriter(Vectorizer<T> vectorizer, Writer writer) {
-            this.vectorizer = checkNotNull(vectorizer);
-            this.writer = checkNotNull(writer);
-            this.rowBatch = vectorizer.getSchema().createRowBatch();
-
-            // Configure the vectorizer with the writer so that users can add
-            // metadata on the fly through the Vectorizer#vectorize(...) 
method.
-            this.vectorizer.setWriter(this.writer);
-        }
-
-        @Override
-        public void addElement(T element) throws IOException {
-            vectorizer.vectorize(element, rowBatch);
-            if (rowBatch.size == rowBatch.getMaxSize()) {
-                writer.addRowBatch(rowBatch);
-                rowBatch.reset();
-            }
-        }
-
-        @Override
-        public void flush() throws IOException {
-            if (rowBatch.size != 0) {
-                writer.addRowBatch(rowBatch);
-                rowBatch.reset();
-            }
-        }
-
-        @Override
-        public void finish() throws IOException {
-            flush();
-            writer.close();
-        }
-    }
-}
diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 5d07840a..399a0db2 100644
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -27,8 +27,7 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.orc.OrcFilters;
 import org.apache.flink.orc.OrcSplitReaderUtil;
 import org.apache.flink.orc.vector.RowDataVectorizer;
-import org.apache.flink.orc.vector.Vectorizer;
-import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
+import org.apache.flink.orc.writer.OrcBulkWriterFactory;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.predicate.Predicate;
@@ -40,7 +39,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
 
 import java.util.ArrayList;
@@ -54,24 +52,16 @@ import static 
org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENT
 /** Orc {@link FileFormat}. The main code is copied from Flink {@code 
OrcFileFormatFactory}. */
 public class OrcFileFormat extends FileFormat {
 
-    private final Properties orcProperties;
-    private final org.apache.hadoop.conf.Configuration readerConf;
-    private final org.apache.hadoop.conf.Configuration writerConf;
+    private final Configuration formatOptions;
 
     public OrcFileFormat(Configuration formatOptions) {
         super(org.apache.flink.orc.OrcFileFormatFactory.IDENTIFIER);
-        this.orcProperties = getOrcProperties(formatOptions);
-        this.readerConf = new org.apache.hadoop.conf.Configuration();
-        this.orcProperties.forEach((k, v) -> readerConf.set(k.toString(), 
v.toString()));
-
-        org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
-        this.writerConf = new ThreadLocalClassLoaderConfiguration();
-        conf.forEach(entry -> writerConf.set(entry.getKey(), 
entry.getValue()));
+        this.formatOptions = formatOptions;
     }
 
     @VisibleForTesting
-    Properties orcProperties() {
-        return orcProperties;
+    Configuration formatOptions() {
+        return formatOptions;
     }
 
     @Override
@@ -93,8 +83,12 @@ public class OrcFileFormat extends FileFormat {
             }
         }
 
+        Properties properties = getOrcProperties(formatOptions);
+        org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
+        properties.forEach((k, v) -> conf.set(k.toString(), v.toString()));
+
         return OrcInputFormatFactory.create(
-                readerConf,
+                conf,
                 (RowType) refineLogicalType(type),
                 Projection.of(projection).toTopLevelIndexes(),
                 orcPredicates);
@@ -106,12 +100,11 @@ public class OrcFileFormat extends FileFormat {
 
         TypeDescription typeDescription =
                 
OrcSplitReaderUtil.logicalTypeToOrcType(refineLogicalType(type));
-        Vectorizer<RowData> vectorizer =
-                new RowDataVectorizer(typeDescription.toString(), orcTypes);
-        OrcFile.WriterOptions writerOptions = 
OrcFile.writerOptions(orcProperties, writerConf);
-        writerOptions.setSchema(vectorizer.getSchema());
 
-        return new OrcBulkWriterFactory<>(vectorizer, writerOptions);
+        return new OrcBulkWriterFactory<>(
+                new RowDataVectorizer(typeDescription.toString(), orcTypes),
+                getOrcProperties(formatOptions),
+                new org.apache.hadoop.conf.Configuration());
     }
 
     private static Properties getOrcProperties(ReadableConfig options) {
diff --git 
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactoryTest.java
 
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactoryTest.java
deleted file mode 100644
index 374bc1c6..00000000
--- 
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcBulkWriterFactoryTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.format.orc;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.local.LocalDataOutputStream;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.orc.MemoryManager;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-
-/** Test for {@link OrcBulkWriterFactory}. */
-public class OrcBulkWriterFactoryTest {
-    @TempDir File tempDir;
-
-    @Test
-    public void testNotOverrideInMemoryManager() throws IOException {
-        OrcFileFormat orcFileFormat = new OrcFileFormat(new Configuration());
-        OrcBulkWriterFactory<RowData> factory =
-                (OrcBulkWriterFactory<RowData>)
-                        orcFileFormat.createWriterFactory(
-                                RowType.of(new VarCharType(), new IntType()));
-
-        TestMemoryManager memoryManager = new TestMemoryManager();
-        factory.getWriterOptions().memory(memoryManager);
-
-        factory.create(new LocalDataOutputStream(new File(tempDir, 
UUID.randomUUID().toString())));
-        factory.create(new LocalDataOutputStream(new File(tempDir, 
UUID.randomUUID().toString())));
-
-        List<Path> addedWriterPath = memoryManager.getAddedWriterPath();
-        assertEquals(2, addedWriterPath.size());
-        assertNotEquals(addedWriterPath.get(1), addedWriterPath.get(0));
-    }
-
-    private static class TestMemoryManager implements MemoryManager {
-        private final List<Path> addedWriterPath = new ArrayList<>();
-
-        @Override
-        public void addWriter(Path path, long requestedAllocation, Callback 
callback) {
-            addedWriterPath.add(path);
-        }
-
-        public List<Path> getAddedWriterPath() {
-            return addedWriterPath;
-        }
-
-        @Override
-        public void removeWriter(Path path) {}
-
-        @Override
-        public void addedRow(int rows) {}
-    }
-}
diff --git 
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileFormatTest.java
 
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileFormatTest.java
index ba1dbacb..b99d337f 100644
--- 
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileFormatTest.java
+++ 
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFileFormatTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration;
 
 import org.junit.jupiter.api.Test;
 
-import static 
org.apache.flink.table.store.format.orc.OrcFileFormatFactory.IDENTIFIER;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link OrcFileFormatFactory}. */
@@ -33,8 +32,8 @@ public class OrcFileFormatTest {
         Configuration options = new Configuration();
         options.setString("haha", "1");
         OrcFileFormat orc = new OrcFileFormatFactory().create(options);
-        assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", 
"")).isEqualTo("1");
-        assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", 
"")).isEqualTo("lz4");
+        assertThat(orc.formatOptions().getString("haha", "")).isEqualTo("1");
+        assertThat(orc.formatOptions().getString("compress", 
"")).isEqualTo("lz4");
     }
 
     @Test
@@ -43,7 +42,7 @@ public class OrcFileFormatTest {
         options.setString("haha", "1");
         options.setString("compress", "zlib");
         OrcFileFormat orc = new OrcFileFormatFactory().create(options);
-        assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", 
"")).isEqualTo("1");
-        assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", 
"")).isEqualTo("zlib");
+        assertThat(orc.formatOptions().getString("haha", "")).isEqualTo("1");
+        assertThat(orc.formatOptions().getString("compress", 
"")).isEqualTo("zlib");
     }
 }

Reply via email to