This is an automated email from the ASF dual-hosted git repository.

nkollar pushed a commit to branch encryption
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/encryption by this push:
     new 3f2d0e7  PARQUET-1228: Format Structures encryption (#613)
3f2d0e7 is described below

commit 3f2d0e7f5c05907ee37cf549e6ed4bf0e067d491
Author: ggershinsky <ggershin...@users.noreply.github.com>
AuthorDate: Tue Aug 27 12:07:10 2019 +0200

    PARQUET-1228: Format Structures encryption (#613)
---
 .travis.yml                                        |   1 +
 dev/travis-before_install-encryption.sh            |  29 +++
 .../org/apache/parquet/format/BlockCipher.java     |  69 +++++++
 .../main/java/org/apache/parquet/format/Util.java  | 222 +++++++++++++++++----
 pom.xml                                            |   2 +-
 5 files changed, 278 insertions(+), 45 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 3fe18f6..fae25f8 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,4 +1,5 @@
 language: java
+jdk: openjdk8
 before_install:
   - bash dev/travis-before_install.sh
 
diff --git a/dev/travis-before_install-encryption.sh 
b/dev/travis-before_install-encryption.sh
new file mode 100755
index 0000000..0e3a3f6
--- /dev/null
+++ b/dev/travis-before_install-encryption.sh
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+################################################################################
+# This is a branch-specific script that gets invoked at the end of
+# travis-before_install.sh. It is run for the bloom-filter branch only.
+################################################################################
+
+cd ..
+git clone https://github.com/apache/parquet-format.git
+cd parquet-format
+mvn install -DskipTests --batch-mode
+cd $TRAVIS_BUILD_DIR
+
+
diff --git 
a/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java
 
b/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java
new file mode 100755
index 0000000..48c0bf2
--- /dev/null
+++ 
b/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.format;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface BlockCipher{
+
+
+  public interface Encryptor{
+    /**
+     * Encrypts the plaintext.
+     * 
+     * @param plaintext - starts at offset 0 of the input, and fills up the 
entire byte array.
+     * @param AAD - Additional Authenticated Data for the encryption (ignored 
in case of CTR cipher)
+     * @return lengthAndCiphertext The first 4 bytes of the returned value are 
the ciphertext length (little endian int). 
+     * The ciphertext starts at offset 4  and fills up the rest of the 
returned byte array.
+     * The ciphertext includes the nonce and (in case of GCM cipher) the tag, 
as detailed in the 
+     * Parquet Modular Encryption specification.
+     * @throws IOException thrown upon any crypto problem encountered during 
encryption
+     */
+    public byte[] encrypt(byte[] plaintext, byte[] AAD) throws IOException;
+  }
+
+
+  public interface Decryptor{  
+    /**
+     * Decrypts the ciphertext. 
+     * 
+     * @param lengthAndCiphertext - The first 4 bytes of the input are the 
ciphertext length (little endian int). 
+     * The ciphertext starts at offset 4  and fills up the rest of the input 
byte array.
+     * The ciphertext includes the nonce and (in case of GCM cipher) the tag, 
as detailed in the 
+     * Parquet Modular Encryption specification.
+     * @param AAD - Additional Authenticated Data for the decryption (ignored 
in case of CTR cipher)
+     * @return plaintext - starts at offset 0 of the output value, and fills 
up the entire byte array.
+     * @throws IOException thrown upon any crypto problem encountered during 
decryption
+     */
+    public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) throws 
IOException;
+
+    /**
+     * Convenience decryption method that reads the length and ciphertext from 
the input stream.
+     * 
+     * @param from Input stream with length and ciphertext.
+     * @param AAD - Additional Authenticated Data for the decryption (ignored 
in case of CTR cipher)
+     * @return plaintext -  starts at offset 0 of the output, and fills up the 
entire byte array.
+     * @throws IOException thrown upon any crypto or IO problem encountered 
during decryption
+     */
+    public byte[] decrypt(InputStream from, byte[] AAD) throws IOException;
+  }
+}
+
diff --git 
a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java 
b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
index d09d007..9242290 100644
--- 
a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
+++ 
b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
@@ -20,6 +20,8 @@
 package org.apache.parquet.format;
 
 import static org.apache.parquet.format.FileMetaData._Fields.CREATED_BY;
+import static 
org.apache.parquet.format.FileMetaData._Fields.ENCRYPTION_ALGORITHM;
+import static 
org.apache.parquet.format.FileMetaData._Fields.FOOTER_SIGNING_KEY_METADATA;
 import static 
org.apache.parquet.format.FileMetaData._Fields.KEY_VALUE_METADATA;
 import static org.apache.parquet.format.FileMetaData._Fields.NUM_ROWS;
 import static org.apache.parquet.format.FileMetaData._Fields.ROW_GROUPS;
@@ -30,9 +32,11 @@ import static 
org.apache.parquet.format.event.Consumers.listElementsOf;
 import static org.apache.parquet.format.event.Consumers.listOf;
 import static org.apache.parquet.format.event.Consumers.struct;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 
 import org.apache.thrift.TBase;
@@ -40,7 +44,7 @@ import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TIOStreamTransport;
-
+import org.apache.thrift.transport.TMemoryBuffer;
 import org.apache.parquet.format.event.Consumers.Consumer;
 import org.apache.parquet.format.event.Consumers.DelegatingFieldConsumer;
 import org.apache.parquet.format.event.EventBasedThriftReader;
@@ -54,37 +58,91 @@ import 
org.apache.parquet.format.event.TypedConsumer.StringConsumer;
  */
 public class Util {
 
+  private final static int INIT_MEM_ALLOC_ENCR_BUFFER = 100;
+
   public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream 
to) throws IOException {
-    write(columnIndex, to);
+    writeColumnIndex(columnIndex, to, null, null);
+  }
+
+  public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream 
to, 
+      BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException {
+    write(columnIndex, to, encryptor, AAD);
   }
 
   public static ColumnIndex readColumnIndex(InputStream from) throws 
IOException {
-    return read(from, new ColumnIndex());
+    return readColumnIndex(from, null, null);
+  }
+
+  public static ColumnIndex readColumnIndex(InputStream from, 
+      BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+    return read(from, new ColumnIndex(), decryptor, AAD);
   }
 
   public static void writeOffsetIndex(OffsetIndex offsetIndex, OutputStream 
to) throws IOException {
-    write(offsetIndex, to);
+    writeOffsetIndex(offsetIndex, to, null, null);
+  }
+
+  public static void writeOffsetIndex(OffsetIndex offsetIndex, OutputStream 
to, 
+      BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException {
+    write(offsetIndex, to, encryptor, AAD);
   }
 
   public static OffsetIndex readOffsetIndex(InputStream from) throws 
IOException {
-    return read(from, new OffsetIndex());
+    return readOffsetIndex(from, null, null);
+  }
+
+  public static OffsetIndex readOffsetIndex(InputStream from, 
+      BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+    return read(from, new OffsetIndex(), decryptor, AAD);
   }
 
   public static void writePageHeader(PageHeader pageHeader, OutputStream to) 
throws IOException {
-    write(pageHeader, to);
+    writePageHeader(pageHeader, to, null, null);
+  }
+
+  public static void writePageHeader(PageHeader pageHeader, OutputStream to, 
+      BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException {
+    write(pageHeader, to, encryptor, AAD);
   }
 
   public static PageHeader readPageHeader(InputStream from) throws IOException 
{
-    return read(from, new PageHeader());
+    return readPageHeader(from, null, null);
+  }
+
+  public static PageHeader readPageHeader(InputStream from, 
+      BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+    return read(from, new PageHeader(), decryptor, AAD);
+  }
+
+  public static void writeFileMetaData(org.apache.parquet.format.FileMetaData 
fileMetadata, 
+      OutputStream to) throws IOException {
+    writeFileMetaData(fileMetadata, to, null, null);
   }
 
-  public static void writeFileMetaData(org.apache.parquet.format.FileMetaData 
fileMetadata, OutputStream to) throws IOException {
-    write(fileMetadata, to);
+  public static void writeFileMetaData(org.apache.parquet.format.FileMetaData 
fileMetadata, 
+      OutputStream to, BlockCipher.Encryptor encryptor, byte[] AAD) throws 
IOException {
+    write(fileMetadata, to, encryptor, AAD);
   }
 
   public static FileMetaData readFileMetaData(InputStream from) throws 
IOException {
-    return read(from, new FileMetaData());
+    return readFileMetaData(from, null, null);
+  }
+
+  public static FileMetaData readFileMetaData(InputStream from, 
+      BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+    return read(from, new FileMetaData(), decryptor, AAD);
+  }
+
+  public static void writeColumnMetaData(ColumnMetaData columnMetaData, 
OutputStream to, 
+      BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException {
+    write(columnMetaData, to, encryptor, AAD);
   }
+
+  public static ColumnMetaData readColumnMetaData(InputStream from, 
+      BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+    return read(from, new ColumnMetaData(), decryptor, AAD);
+  }
+
   /**
    * reads the meta data from the stream
    * @param from the stream to read the metadata from
@@ -93,15 +151,28 @@ public class Util {
    * @throws IOException if any I/O error occurs during the reading
    */
   public static FileMetaData readFileMetaData(InputStream from, boolean 
skipRowGroups) throws IOException {
+    return readFileMetaData(from, skipRowGroups, (BlockCipher.Decryptor) null, 
(byte[]) null);
+  }
+
+  public static FileMetaData readFileMetaData(InputStream from, boolean 
skipRowGroups, 
+      BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
     FileMetaData md = new FileMetaData();
     if (skipRowGroups) {
-      readFileMetaData(from, new DefaultFileMetaDataConsumer(md), 
skipRowGroups);
+      readFileMetaData(from, new DefaultFileMetaDataConsumer(md), 
skipRowGroups, decryptor, AAD);
     } else {
-      read(from, md);
+      read(from, md, decryptor, AAD);
     }
     return md;
   }
 
+  public static void 
writeFileCryptoMetaData(org.apache.parquet.format.FileCryptoMetaData 
cryptoMetadata, OutputStream to) throws IOException { 
+    write(cryptoMetadata, to, null, null);
+  }
+
+  public static FileCryptoMetaData readFileCryptoMetaData(InputStream from) 
throws IOException {
+    return read(from, new FileCryptoMetaData(), null, null);
+  }
+
   /**
    * To read metadata in a streaming fashion.
    *
@@ -113,6 +184,8 @@ public class Util {
     abstract public void addRowGroup(RowGroup rowGroup);
     abstract public void addKeyValueMetaData(KeyValue kv);
     abstract public void setCreatedBy(String createdBy);
+    abstract public void setEncryptionAlgorithm(EncryptionAlgorithm 
encryptionAlgorithm);
+    abstract public void setFooterSigningKeyMetadata(byte[] 
footerSigningKeyMetadata);
   }
 
   /**
@@ -155,41 +228,73 @@ public class Util {
     public void addKeyValueMetaData(KeyValue kv) {
       md.addToKey_value_metadata(kv);
     }
+
+    @Override
+    public void setEncryptionAlgorithm(EncryptionAlgorithm 
encryptionAlgorithm) {
+      md.setEncryption_algorithm(encryptionAlgorithm);
+    }
+
+    @Override
+    public void setFooterSigningKeyMetadata(byte[] footerSigningKeyMetadata) {
+      md.setFooter_signing_key_metadata(footerSigningKeyMetadata);
+    }
   }
 
   public static void readFileMetaData(InputStream from, FileMetaDataConsumer 
consumer) throws IOException {
-    readFileMetaData(from, consumer, false);
+    readFileMetaData(from, consumer, null, null);
+  }
+
+  public static void readFileMetaData(InputStream from, FileMetaDataConsumer 
consumer, 
+      BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+    readFileMetaData(from, consumer, false, decryptor, AAD);
   }
 
   public static void readFileMetaData(InputStream from, final 
FileMetaDataConsumer consumer, boolean skipRowGroups) throws IOException {
+    readFileMetaData(from, consumer, skipRowGroups, null, null);
+  }
+
+  public static void readFileMetaData(final InputStream input, final 
FileMetaDataConsumer consumer, 
+      boolean skipRowGroups, BlockCipher.Decryptor decryptor, byte[] AAD) 
throws IOException {
     try {
       DelegatingFieldConsumer eventConsumer = fieldConsumer()
-      .onField(VERSION, new I32Consumer() {
-        @Override
-        public void consume(int value) {
-          consumer.setVersion(value);
-        }
-      }).onField(SCHEMA, listOf(SchemaElement.class, new 
Consumer<List<SchemaElement>>() {
-        @Override
-        public void consume(List<SchemaElement> schema) {
-          consumer.setSchema(schema);
-        }
-      })).onField(NUM_ROWS, new I64Consumer() {
-        @Override
-        public void consume(long value) {
-          consumer.setNumRows(value);
-        }
-      }).onField(KEY_VALUE_METADATA, listElementsOf(struct(KeyValue.class, new 
Consumer<KeyValue>() {
-        @Override
-        public void consume(KeyValue kv) {
-          consumer.addKeyValueMetaData(kv);
-        }
-      }))).onField(CREATED_BY, new StringConsumer() {
-        @Override
-        public void consume(String value) {
-          consumer.setCreatedBy(value);
-        }
-      });
+          .onField(VERSION, new I32Consumer() {
+            @Override
+            public void consume(int value) {
+              consumer.setVersion(value);
+            }
+          }).onField(SCHEMA, listOf(SchemaElement.class, new 
Consumer<List<SchemaElement>>() {
+            @Override
+            public void consume(List<SchemaElement> schema) {
+              consumer.setSchema(schema);
+            }
+          })).onField(NUM_ROWS, new I64Consumer() {
+            @Override
+            public void consume(long value) {
+              consumer.setNumRows(value);
+            }
+          }).onField(KEY_VALUE_METADATA, listElementsOf(struct(KeyValue.class, 
new Consumer<KeyValue>() {
+            @Override
+            public void consume(KeyValue kv) {
+              consumer.addKeyValueMetaData(kv);
+            }
+          }))).onField(CREATED_BY, new StringConsumer() {
+            @Override
+            public void consume(String value) {
+              consumer.setCreatedBy(value);
+            }
+          }).onField(ENCRYPTION_ALGORITHM, struct(EncryptionAlgorithm.class, 
new Consumer<EncryptionAlgorithm>() {
+            @Override
+            public void consume(EncryptionAlgorithm encryptionAlgorithm) {
+              consumer.setEncryptionAlgorithm(encryptionAlgorithm);
+            }
+          })).onField(FOOTER_SIGNING_KEY_METADATA, new StringConsumer() {
+            @Override
+            public void consume(String value) {
+              byte[] keyMetadata = value.getBytes(StandardCharsets.UTF_8);
+              consumer.setFooterSigningKeyMetadata(keyMetadata);
+            }
+          });
+
       if (!skipRowGroups) {
         eventConsumer = eventConsumer.onField(ROW_GROUPS, 
listElementsOf(struct(RowGroup.class, new Consumer<RowGroup>() {
           @Override
@@ -198,8 +303,16 @@ public class Util {
           }
         })));
       }
-      new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
 
+      final InputStream from;
+      if (null == decryptor) {
+        from = input;
+      }
+      else {
+        byte[] plainText =  decryptor.decrypt(input, AAD);
+        from = new ByteArrayInputStream(plainText);
+      }
+      new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
     } catch (TException e) {
       throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
     }
@@ -217,7 +330,16 @@ public class Util {
     return new InterningProtocol(new TCompactProtocol(t));
   }
 
-  private static <T extends TBase<?,?>> T read(InputStream from, T tbase) 
throws IOException {
+
+  private static <T extends TBase<?,?>> T read(final InputStream input, T 
tbase, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+    final InputStream from;
+    if (null == decryptor) {
+      from = input;
+    } else {
+      byte[] plainText = decryptor.decrypt(input, AAD);
+      from = new ByteArrayInputStream(plainText);
+    }
+
     try {
       tbase.read(protocol(from));
       return tbase;
@@ -226,11 +348,23 @@ public class Util {
     }
   }
 
-  private static void write(TBase<?, ?> tbase, OutputStream to) throws 
IOException {
-    try {
-      tbase.write(protocol(to));
+  private static void write(TBase<?, ?> tbase, OutputStream to, 
BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException {
+    if (null == encryptor) { 
+      try {
+        tbase.write(protocol(to));
+        return;
+      } catch (TException e) {
+        throw new IOException("can not write " + tbase, e);
+      }
+    }
+    // Serialize and encrypt the structure
+    try (TMemoryBuffer thriftMemoryBuffer = new 
TMemoryBuffer(INIT_MEM_ALLOC_ENCR_BUFFER)) {
+      tbase.write(new InterningProtocol(new 
TCompactProtocol(thriftMemoryBuffer)));
+      byte[] encryptedBuffer = 
encryptor.encrypt(thriftMemoryBuffer.getArray(), AAD);
+      to.write(encryptedBuffer);
     } catch (TException e) {
       throw new IOException("can not write " + tbase, e);
     }
   }
 }
+
diff --git a/pom.xml b/pom.xml
index 1808dba..7bfef73 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@
     <hadoop1.version>1.2.1</hadoop1.version>
     <cascading.version>2.7.1</cascading.version>
     <cascading3.version>3.1.2</cascading3.version>
-    <parquet.format.version>2.6.0</parquet.format.version>
+    <parquet.format.version>2.7.0-SNAPSHOT</parquet.format.version>
     <previous.version>1.7.0</previous.version>
     <thrift.executable>thrift</thrift.executable>
     <format.thrift.executable>thrift</format.thrift.executable>

Reply via email to