This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 166cc62  [SPARK-34990][SQL][TESTS] Add ParquetEncryptionSuite
166cc62 is described below

commit 166cc6204c96665e7b568cfcc8ba243e79dbf837
Author: Maya Anderson <ma...@il.ibm.com>
AuthorDate: Sat Apr 24 14:28:00 2021 -0700

    [SPARK-34990][SQL][TESTS] Add ParquetEncryptionSuite
    
    ### What changes were proposed in this pull request?
    
    A simple test that writes and reads an encrypted parquet and verifies that 
it's encrypted by checking its magic string (in encrypted footer mode).
    
    ### Why are the changes needed?
    
    To provide a test coverage for Parquet encryption.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - [x] [SBT / Hadoop 3.2 / Java8 (the 
default)](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137785/testReport)
    - [ ] ~SBT / Hadoop 3.2 / Java11 by adding [test-java11] to the PR title.~ 
(Jenkins Java11 build is broken due to missing JDK11 installation)
    - [x] [SBT / Hadoop 2.7 / Java8 by adding [test-hadoop2.7] to the PR 
title.](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137836/testReport)
    - [x] Maven / Hadoop 3.2 / Java8 by adding [test-maven] to the PR title.
    - [x] Maven / Hadoop 2.7 / Java8 by adding [test-maven][test-hadoop2.7] to 
the PR title.
    
    Closes #32146 from andersonm-ibm/pme_testing.
    
    Authored-by: Maya Anderson <ma...@il.ibm.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 sql/hive/pom.xml                                   |  7 ++
 .../spark/sql/hive/ParquetEncryptionSuite.scala    | 96 ++++++++++++++++++++++
 2 files changed, 103 insertions(+)

diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 4108d0f..729d3f4 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -72,6 +72,13 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-hadoop</artifactId>
+      <version>${parquet.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
 <!--
     <dependency>
       <groupId>com.google.guava</groupId>
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala
new file mode 100644
index 0000000..184ccad
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.io.RandomAccessFile
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
+/**
+ * A test suite that tests parquet modular encryption usage.
+ */
+class ParquetEncryptionSuite extends QueryTest with TestHiveSingleton {
+  import spark.implicits._
+
+  private val encoder = Base64.getEncoder
+  private val footerKey =
+    encoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8))
+  private val key1 = 
encoder.encodeToString("1234567890123450".getBytes(StandardCharsets.UTF_8))
+  private val key2 = 
encoder.encodeToString("1234567890123451".getBytes(StandardCharsets.UTF_8))
+
+  test("SPARK-34990: Write and read an encrypted parquet") {
+    withTempDir { dir =>
+      withSQLConf(
+        "parquet.crypto.factory.class" ->
+          "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory",
+        "parquet.encryption.kms.client.class" ->
+          "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
+        "parquet.encryption.key.list" ->
+          s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
+
+        val inputDF = Seq((1, 22, 333)).toDF("a", "b", "c")
+        val parquetDir = new File(dir, "parquet").getCanonicalPath
+        inputDF.write
+          .option("parquet.encryption.column.keys", "key1: a, b; key2: c")
+          .option("parquet.encryption.footer.key", "footerKey")
+          .parquet(parquetDir)
+
+        verifyParquetEncrypted(parquetDir)
+
+        val parquetDF = spark.read.parquet(parquetDir)
+        assert(parquetDF.inputFiles.nonEmpty)
+        val readDataset = parquetDF.select("a", "b", "c")
+        checkAnswer(readDataset, inputDF)
+      }
+    }
+  }
+
+  /**
+   * Verify that the directory contains an encrypted parquet in
+   * encrypted footer mode by means of checking for all the parquet part files
+   * in the parquet directory that their magic string is PARE, as defined in 
the spec:
+   * 
https://github.com/apache/parquet-format/blob/master/Encryption.md#54-encrypted-footer-mode
+   */
+  private def verifyParquetEncrypted(parquetDir: String): Unit = {
+    val parquetPartitionFiles = getListOfParquetFiles(new File(parquetDir))
+    assert(parquetPartitionFiles.size >= 1)
+    parquetPartitionFiles.foreach { parquetFile =>
+      val magicString = "PARE"
+      val magicStringLength = magicString.length()
+      val byteArray = new Array[Byte](magicStringLength)
+      val randomAccessFile = new RandomAccessFile(parquetFile, "r")
+      try {
+        randomAccessFile.read(byteArray, 0, magicStringLength)
+      } finally {
+        randomAccessFile.close()
+      }
+      val stringRead = new String(byteArray, StandardCharsets.UTF_8)
+      assert(magicString == stringRead)
+    }
+  }
+
+  private def getListOfParquetFiles(dir: File): List[File] = {
+    dir.listFiles.filter(_.isFile).toList.filter { file =>
+      file.getName.endsWith("parquet")
+    }
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to