This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d4340d16708 [FLINK-30491][hive] Hive table partition supports lazy 
deserialization during runtime
d4340d16708 is described below

commit d4340d16708010394d7c57063b5dece8362d41d0
Author: fengli <ldliu...@163.com>
AuthorDate: Sat Dec 24 16:46:03 2022 +0800

    [FLINK-30491][hive] Hive table partition supports lazy deserialization 
during runtime
    
    This closes #21556
---
 .../apache/flink/connectors/hive/HiveSource.java   |  8 +--
 .../flink/connectors/hive/HiveSourceBuilder.java   | 13 +++-
 .../hive/HiveSourceDynamicFileEnumerator.java      | 10 ++--
 .../connectors/hive/HiveSourceFileEnumerator.java  | 13 ++--
 .../hive/HiveTablePartitionSerializer.java         | 68 +++++++++++++++++++++
 .../connectors/hive/util/HivePartitionUtils.java   | 30 ++++++++++
 .../hive/util/HivePartitionUtilsTest.java          | 70 ++++++++++++++++++++++
 7 files changed, 197 insertions(+), 15 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
index d340ed616ea..5b768b0320a 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
@@ -64,7 +64,7 @@ public class HiveSource<T> extends AbstractFileSource<T, 
HiveSourceSplit> {
 
     private final String hiveVersion;
     private final List<String> dynamicFilterPartitionKeys;
-    private final List<HiveTablePartition> partitions;
+    private final List<byte[]> partitionBytes;
     private final ContinuousPartitionFetcher<Partition, ?> fetcher;
     private final HiveTableSource.HiveContinuousPartitionFetcherContext<?> 
fetcherContext;
     private final ObjectPath tablePath;
@@ -80,7 +80,7 @@ public class HiveSource<T> extends AbstractFileSource<T, 
HiveSourceSplit> {
             List<String> partitionKeys,
             String hiveVersion,
             @Nullable List<String> dynamicFilterPartitionKeys,
-            List<HiveTablePartition> partitions,
+            List<byte[]> partitionBytes,
             @Nullable ContinuousPartitionFetcher<Partition, ?> fetcher,
             @Nullable HiveTableSource.HiveContinuousPartitionFetcherContext<?> 
fetcherContext) {
         super(
@@ -94,7 +94,7 @@ public class HiveSource<T> extends AbstractFileSource<T, 
HiveSourceSplit> {
         this.partitionKeys = partitionKeys;
         this.hiveVersion = hiveVersion;
         this.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
-        this.partitions = partitions;
+        this.partitionBytes = partitionBytes;
         this.fetcher = fetcher;
         this.fetcherContext = fetcherContext;
     }
@@ -181,7 +181,7 @@ public class HiveSource<T> extends AbstractFileSource<T, 
HiveSourceSplit> {
                 new HiveSourceDynamicFileEnumerator.Provider(
                         tablePath.getFullName(),
                         dynamicFilterPartitionKeys,
-                        partitions,
+                        partitionBytes,
                         hiveVersion,
                         jobConfWrapper),
                 getAssignerFactory());
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index bb6ee5a6c1e..86de17520a9 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -234,11 +234,18 @@ public class HiveSourceBuilder {
                 continuousSourceSettings == null || partitionKeys.isEmpty()
                         ? DEFAULT_SPLIT_ASSIGNER
                         : SimpleSplitAssigner::new;
+        List<byte[]> hiveTablePartitionBytes = Collections.emptyList();
+        if (partitions != null) {
+            // Serializing the HiveTablePartition list manually at compile 
time to avoid
+            // deserializing it in TaskManager during runtime. The 
HiveTablePartition list is no
+            // need for TM.
+            hiveTablePartitionBytes = 
HivePartitionUtils.serializeHiveTablePartition(partitions);
+        }
+
         return new HiveSource<>(
                 new Path[1],
                 new HiveSourceFileEnumerator.Provider(
-                        partitions != null ? partitions : 
Collections.emptyList(),
-                        new JobConfWrapper(jobConf)),
+                        hiveTablePartitionBytes, new JobConfWrapper(jobConf)),
                 splitAssigner,
                 bulkFormat,
                 continuousSourceSettings,
@@ -247,7 +254,7 @@ public class HiveSourceBuilder {
                 partitionKeys,
                 hiveVersion,
                 dynamicFilterPartitionKeys,
-                partitions,
+                hiveTablePartitionBytes,
                 fetcher,
                 fetcherContext);
     }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
index 530e0f61e52..4d84fd5e048 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
@@ -189,19 +189,21 @@ public class HiveSourceDynamicFileEnumerator implements 
DynamicFileEnumerator {
 
         private final String table;
         private final List<String> dynamicFilterPartitionKeys;
-        private final List<HiveTablePartition> partitions;
+        // The binary HiveTablePartition list, serialize it manually at 
compile time to avoid
+        // deserializing it in TaskManager during runtime.
+        private final List<byte[]> partitionBytes;
         private final String hiveVersion;
         private final JobConfWrapper jobConfWrapper;
 
         public Provider(
                 String table,
                 List<String> dynamicFilterPartitionKeys,
-                List<HiveTablePartition> partitions,
+                List<byte[]> partitionBytes,
                 String hiveVersion,
                 JobConfWrapper jobConfWrapper) {
             this.table = checkNotNull(table);
             this.dynamicFilterPartitionKeys = 
checkNotNull(dynamicFilterPartitionKeys);
-            this.partitions = checkNotNull(partitions);
+            this.partitionBytes = checkNotNull(partitionBytes);
             this.hiveVersion = checkNotNull(hiveVersion);
             this.jobConfWrapper = checkNotNull(jobConfWrapper);
         }
@@ -211,7 +213,7 @@ public class HiveSourceDynamicFileEnumerator implements 
DynamicFileEnumerator {
             return new HiveSourceDynamicFileEnumerator(
                     table,
                     dynamicFilterPartitionKeys,
-                    partitions,
+                    
HivePartitionUtils.deserializeHiveTablePartition(partitionBytes),
                     hiveVersion,
                     jobConfWrapper.conf());
         }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
index 2373e747148..c30cd1c8e1b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceFileEnumerator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
 import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.connectors.hive.util.HivePartitionUtils;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Preconditions;
 
@@ -224,17 +225,21 @@ public class HiveSourceFileEnumerator implements 
FileEnumerator {
 
         private static final long serialVersionUID = 1L;
 
-        private final List<HiveTablePartition> partitions;
+        // The binary HiveTablePartition list, serialize it manually at 
compile time to avoid
+        // deserializing it in TaskManager during runtime.
+        private final List<byte[]> partitionBytes;
         private final JobConfWrapper jobConfWrapper;
 
-        public Provider(List<HiveTablePartition> partitions, JobConfWrapper 
jobConfWrapper) {
-            this.partitions = partitions;
+        public Provider(List<byte[]> partitionBytes, JobConfWrapper 
jobConfWrapper) {
+            this.partitionBytes = partitionBytes;
             this.jobConfWrapper = jobConfWrapper;
         }
 
         @Override
         public FileEnumerator create() {
-            return new HiveSourceFileEnumerator(partitions, 
jobConfWrapper.conf());
+            return new HiveSourceFileEnumerator(
+                    
HivePartitionUtils.deserializeHiveTablePartition(partitionBytes),
+                    jobConfWrapper.conf());
         }
     }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartitionSerializer.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartitionSerializer.java
new file mode 100644
index 00000000000..57875bda400
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartitionSerializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** SerDe for {@link HiveTablePartition}. */
+public class HiveTablePartitionSerializer implements 
SimpleVersionedSerializer<HiveTablePartition> {
+
+    private static final int CURRENT_VERSION = 1;
+
+    public static final HiveTablePartitionSerializer INSTANCE = new 
HiveTablePartitionSerializer();
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(HiveTablePartition hiveTablePartition) throws 
IOException {
+        checkArgument(
+                hiveTablePartition.getClass() == HiveTablePartition.class,
+                "Cannot serialize subclasses of HiveTablePartition");
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        try (ObjectOutputStream outputStream = new 
ObjectOutputStream(byteArrayOutputStream)) {
+            outputStream.writeObject(hiveTablePartition);
+        }
+        return byteArrayOutputStream.toByteArray();
+    }
+
+    @Override
+    public HiveTablePartition deserialize(int version, byte[] serialized) 
throws IOException {
+        if (version == CURRENT_VERSION) {
+            try (ObjectInputStream inputStream =
+                    new ObjectInputStream(new 
ByteArrayInputStream(serialized))) {
+                return (HiveTablePartition) inputStream.readObject();
+            } catch (ClassNotFoundException e) {
+                throw new IOException("Failed to deserialize 
HiveTablePartition", e);
+            }
+        } else {
+            throw new IOException("Unknown version: " + version);
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
index 6d4d83453d3..443f4fea1c8 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive.util;
 
 import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.connectors.hive.HiveTablePartitionSerializer;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -290,4 +291,33 @@ public class HivePartitionUtils {
             }
         }
     }
+
+    public static List<byte[]> serializeHiveTablePartition(
+            List<HiveTablePartition> hiveTablePartitions) {
+        List<byte[]> partitionBytes = new 
ArrayList<>(hiveTablePartitions.size());
+        try {
+            for (HiveTablePartition hiveTablePartition : hiveTablePartitions) {
+                partitionBytes.add(
+                        
HiveTablePartitionSerializer.INSTANCE.serialize(hiveTablePartition));
+            }
+        } catch (IOException ex) {
+            throw new RuntimeException(ex);
+        }
+        return partitionBytes;
+    }
+
+    public static List<HiveTablePartition> deserializeHiveTablePartition(
+            List<byte[]> partitionBytes) {
+        List<HiveTablePartition> hiveTablePartitions = new 
ArrayList<>(partitionBytes.size());
+        try {
+            for (byte[] bytes : partitionBytes) {
+                hiveTablePartitions.add(
+                        HiveTablePartitionSerializer.INSTANCE.deserialize(
+                                
HiveTablePartitionSerializer.INSTANCE.getVersion(), bytes));
+            }
+        } catch (IOException ex) {
+            throw new RuntimeException(ex);
+        }
+        return hiveTablePartitions;
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HivePartitionUtilsTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HivePartitionUtilsTest.java
new file mode 100644
index 00000000000..a57dcb7be2c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HivePartitionUtilsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.util;
+
+import org.apache.flink.connectors.hive.HiveTablePartition;
+
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HivePartitionUtils}. */
+public class HivePartitionUtilsTest {
+
+    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testHiveTablePartitionSerDe() throws Exception {
+        String baseFilePath =
+                
Objects.requireNonNull(this.getClass().getResource("/orc/test.orc")).getPath();
+        File wareHouse = 
temporaryFolder.newFolder("testHiveTablePartitionSerDe");
+        int partitionNum = 10;
+        List<HiveTablePartition> expectedHiveTablePartitions = new 
ArrayList<>();
+        for (int i = 0; i < partitionNum; i++) {
+            // create partition directory
+            Path partitionPath = Paths.get(wareHouse.getPath(), "p_" + i);
+            Files.createDirectory(partitionPath);
+            // copy file to the partition directory
+            Files.copy(Paths.get(baseFilePath), 
Paths.get(partitionPath.toString(), "t.orc"));
+            StorageDescriptor sd = new StorageDescriptor();
+            sd.setLocation(partitionPath.toString());
+            expectedHiveTablePartitions.add(new HiveTablePartition(sd, new 
Properties()));
+        }
+
+        List<byte[]> hiveTablePartitionBytes =
+                
HivePartitionUtils.serializeHiveTablePartition(expectedHiveTablePartitions);
+
+        List<HiveTablePartition> actualHiveTablePartitions =
+                
HivePartitionUtils.deserializeHiveTablePartition(hiveTablePartitionBytes);
+
+        
assertThat(actualHiveTablePartitions).isEqualTo(expectedHiveTablePartitions);
+    }
+}

Reply via email to