This is an automated email from the ASF dual-hosted git repository. nkollar pushed a commit to branch encryption in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/encryption by this push: new 3f2d0e7 PARQUET-1228: Format Structures encryption (#613) 3f2d0e7 is described below commit 3f2d0e7f5c05907ee37cf549e6ed4bf0e067d491 Author: ggershinsky <ggershin...@users.noreply.github.com> AuthorDate: Tue Aug 27 12:07:10 2019 +0200 PARQUET-1228: Format Structures encryption (#613) --- .travis.yml | 1 + dev/travis-before_install-encryption.sh | 29 +++ .../org/apache/parquet/format/BlockCipher.java | 69 +++++++ .../main/java/org/apache/parquet/format/Util.java | 222 +++++++++++++++++---- pom.xml | 2 +- 5 files changed, 278 insertions(+), 45 deletions(-) diff --git a/.travis.yml b/.travis.yml index 3fe18f6..fae25f8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,5 @@ language: java +jdk: openjdk8 before_install: - bash dev/travis-before_install.sh diff --git a/dev/travis-before_install-encryption.sh b/dev/travis-before_install-encryption.sh new file mode 100755 index 0000000..0e3a3f6 --- /dev/null +++ b/dev/travis-before_install-encryption.sh @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +################################################################################ +# This is a branch-specific script that gets invoked at the end of +# travis-before_install.sh. It is run for the bloom-filter branch only. +################################################################################ + +cd .. +git clone https://github.com/apache/parquet-format.git +cd parquet-format +mvn install -DskipTests --batch-mode +cd $TRAVIS_BUILD_DIR + + diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java new file mode 100755 index 0000000..48c0bf2 --- /dev/null +++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.format; + +import java.io.IOException; +import java.io.InputStream; + +public interface BlockCipher{ + + + public interface Encryptor{ + /** + * Encrypts the plaintext. + * + * @param plaintext - starts at offset 0 of the input, and fills up the entire byte array. + * @param AAD - Additional Authenticated Data for the encryption (ignored in case of CTR cipher) + * @return lengthAndCiphertext The first 4 bytes of the returned value are the ciphertext length (little endian int). + * The ciphertext starts at offset 4 and fills up the rest of the returned byte array. + * The ciphertext includes the nonce and (in case of GCM cipher) the tag, as detailed in the + * Parquet Modular Encryption specification. + * @throws IOException thrown upon any crypto problem encountered during encryption + */ + public byte[] encrypt(byte[] plaintext, byte[] AAD) throws IOException; + } + + + public interface Decryptor{ + /** + * Decrypts the ciphertext. + * + * @param lengthAndCiphertext - The first 4 bytes of the input are the ciphertext length (little endian int). + * The ciphertext starts at offset 4 and fills up the rest of the input byte array. + * The ciphertext includes the nonce and (in case of GCM cipher) the tag, as detailed in the + * Parquet Modular Encryption specification. + * @param AAD - Additional Authenticated Data for the decryption (ignored in case of CTR cipher) + * @return plaintext - starts at offset 0 of the output value, and fills up the entire byte array. + * @throws IOException thrown upon any crypto problem encountered during decryption + */ + public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) throws IOException; + + /** + * Convenience decryption method that reads the length and ciphertext from the input stream. + * + * @param from Input stream with length and ciphertext. + * @param AAD - Additional Authenticated Data for the decryption (ignored in case of CTR cipher) + * @return plaintext - starts at offset 0 of the output, and fills up the entire byte array. + * @throws IOException thrown upon any crypto or IO problem encountered during decryption + */ + public byte[] decrypt(InputStream from, byte[] AAD) throws IOException; + } +} + diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java index d09d007..9242290 100644 --- a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java +++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java @@ -20,6 +20,8 @@ package org.apache.parquet.format; import static org.apache.parquet.format.FileMetaData._Fields.CREATED_BY; +import static org.apache.parquet.format.FileMetaData._Fields.ENCRYPTION_ALGORITHM; +import static org.apache.parquet.format.FileMetaData._Fields.FOOTER_SIGNING_KEY_METADATA; import static org.apache.parquet.format.FileMetaData._Fields.KEY_VALUE_METADATA; import static org.apache.parquet.format.FileMetaData._Fields.NUM_ROWS; import static org.apache.parquet.format.FileMetaData._Fields.ROW_GROUPS; @@ -30,9 +32,11 @@ import static org.apache.parquet.format.event.Consumers.listElementsOf; import static org.apache.parquet.format.event.Consumers.listOf; import static org.apache.parquet.format.event.Consumers.struct; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.List; import org.apache.thrift.TBase; @@ -40,7 +44,7 @@ import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TIOStreamTransport; - +import org.apache.thrift.transport.TMemoryBuffer; import org.apache.parquet.format.event.Consumers.Consumer; import org.apache.parquet.format.event.Consumers.DelegatingFieldConsumer; import org.apache.parquet.format.event.EventBasedThriftReader; @@ -54,37 +58,91 @@ import org.apache.parquet.format.event.TypedConsumer.StringConsumer; */ public class Util { + private final static int INIT_MEM_ALLOC_ENCR_BUFFER = 100; + public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream to) throws IOException { - write(columnIndex, to); + writeColumnIndex(columnIndex, to, null, null); + } + + public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream to, + BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException { + write(columnIndex, to, encryptor, AAD); } public static ColumnIndex readColumnIndex(InputStream from) throws IOException { - return read(from, new ColumnIndex()); + return readColumnIndex(from, null, null); + } + + public static ColumnIndex readColumnIndex(InputStream from, + BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { + return read(from, new ColumnIndex(), decryptor, AAD); } public static void writeOffsetIndex(OffsetIndex offsetIndex, OutputStream to) throws IOException { - write(offsetIndex, to); + writeOffsetIndex(offsetIndex, to, null, null); + } + + public static void writeOffsetIndex(OffsetIndex offsetIndex, OutputStream to, + BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException { + write(offsetIndex, to, encryptor, AAD); } public static OffsetIndex readOffsetIndex(InputStream from) throws IOException { - return read(from, new OffsetIndex()); + return readOffsetIndex(from, null, null); + } + + public static OffsetIndex readOffsetIndex(InputStream from, + BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { + return read(from, new OffsetIndex(), decryptor, AAD); } public static void writePageHeader(PageHeader pageHeader, OutputStream to) throws IOException { - write(pageHeader, to); + writePageHeader(pageHeader, to, null, null); + } + + public static void writePageHeader(PageHeader pageHeader, OutputStream to, + BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException { + write(pageHeader, to, encryptor, AAD); } public static PageHeader readPageHeader(InputStream from) throws IOException { - return read(from, new PageHeader()); + return readPageHeader(from, null, null); + } + + public static PageHeader readPageHeader(InputStream from, + BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { + return read(from, new PageHeader(), decryptor, AAD); + } + + public static void writeFileMetaData(org.apache.parquet.format.FileMetaData fileMetadata, + OutputStream to) throws IOException { + writeFileMetaData(fileMetadata, to, null, null); } - public static void writeFileMetaData(org.apache.parquet.format.FileMetaData fileMetadata, OutputStream to) throws IOException { - write(fileMetadata, to); + public static void writeFileMetaData(org.apache.parquet.format.FileMetaData fileMetadata, + OutputStream to, BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException { + write(fileMetadata, to, encryptor, AAD); } public static FileMetaData readFileMetaData(InputStream from) throws IOException { - return read(from, new FileMetaData()); + return readFileMetaData(from, null, null); + } + + public static FileMetaData readFileMetaData(InputStream from, + BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { + return read(from, new FileMetaData(), decryptor, AAD); + } + + public static void writeColumnMetaData(ColumnMetaData columnMetaData, OutputStream to, + BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException { + write(columnMetaData, to, encryptor, AAD); } + + public static ColumnMetaData readColumnMetaData(InputStream from, + BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { + return read(from, new ColumnMetaData(), decryptor, AAD); + } + /** * reads the meta data from the stream * @param from the stream to read the metadata from @@ -93,15 +151,28 @@ public class Util { * @throws IOException if any I/O error occurs during the reading */ public static FileMetaData readFileMetaData(InputStream from, boolean skipRowGroups) throws IOException { + return readFileMetaData(from, skipRowGroups, (BlockCipher.Decryptor) null, (byte[]) null); + } + + public static FileMetaData readFileMetaData(InputStream from, boolean skipRowGroups, + BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { FileMetaData md = new FileMetaData(); if (skipRowGroups) { - readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups); + readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups, decryptor, AAD); } else { - read(from, md); + read(from, md, decryptor, AAD); } return md; } + public static void writeFileCryptoMetaData(org.apache.parquet.format.FileCryptoMetaData cryptoMetadata, OutputStream to) throws IOException { + write(cryptoMetadata, to, null, null); + } + + public static FileCryptoMetaData readFileCryptoMetaData(InputStream from) throws IOException { + return read(from, new FileCryptoMetaData(), null, null); + } + /** * To read metadata in a streaming fashion. * @@ -113,6 +184,8 @@ public class Util { abstract public void addRowGroup(RowGroup rowGroup); abstract public void addKeyValueMetaData(KeyValue kv); abstract public void setCreatedBy(String createdBy); + abstract public void setEncryptionAlgorithm(EncryptionAlgorithm encryptionAlgorithm); + abstract public void setFooterSigningKeyMetadata(byte[] footerSigningKeyMetadata); } /** @@ -155,41 +228,73 @@ public class Util { public void addKeyValueMetaData(KeyValue kv) { md.addToKey_value_metadata(kv); } + + @Override + public void setEncryptionAlgorithm(EncryptionAlgorithm encryptionAlgorithm) { + md.setEncryption_algorithm(encryptionAlgorithm); + } + + @Override + public void setFooterSigningKeyMetadata(byte[] footerSigningKeyMetadata) { + md.setFooter_signing_key_metadata(footerSigningKeyMetadata); + } } public static void readFileMetaData(InputStream from, FileMetaDataConsumer consumer) throws IOException { - readFileMetaData(from, consumer, false); + readFileMetaData(from, consumer, null, null); + } + + public static void readFileMetaData(InputStream from, FileMetaDataConsumer consumer, + BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { + readFileMetaData(from, consumer, false, decryptor, AAD); } public static void readFileMetaData(InputStream from, final FileMetaDataConsumer consumer, boolean skipRowGroups) throws IOException { + readFileMetaData(from, consumer, skipRowGroups, null, null); + } + + public static void readFileMetaData(final InputStream input, final FileMetaDataConsumer consumer, + boolean skipRowGroups, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { try { DelegatingFieldConsumer eventConsumer = fieldConsumer() - .onField(VERSION, new I32Consumer() { - @Override - public void consume(int value) { - consumer.setVersion(value); - } - }).onField(SCHEMA, listOf(SchemaElement.class, new Consumer<List<SchemaElement>>() { - @Override - public void consume(List<SchemaElement> schema) { - consumer.setSchema(schema); - } - })).onField(NUM_ROWS, new I64Consumer() { - @Override - public void consume(long value) { - consumer.setNumRows(value); - } - }).onField(KEY_VALUE_METADATA, listElementsOf(struct(KeyValue.class, new Consumer<KeyValue>() { - @Override - public void consume(KeyValue kv) { - consumer.addKeyValueMetaData(kv); - } - }))).onField(CREATED_BY, new StringConsumer() { - @Override - public void consume(String value) { - consumer.setCreatedBy(value); - } - }); + .onField(VERSION, new I32Consumer() { + @Override + public void consume(int value) { + consumer.setVersion(value); + } + }).onField(SCHEMA, listOf(SchemaElement.class, new Consumer<List<SchemaElement>>() { + @Override + public void consume(List<SchemaElement> schema) { + consumer.setSchema(schema); + } + })).onField(NUM_ROWS, new I64Consumer() { + @Override + public void consume(long value) { + consumer.setNumRows(value); + } + }).onField(KEY_VALUE_METADATA, listElementsOf(struct(KeyValue.class, new Consumer<KeyValue>() { + @Override + public void consume(KeyValue kv) { + consumer.addKeyValueMetaData(kv); + } + }))).onField(CREATED_BY, new StringConsumer() { + @Override + public void consume(String value) { + consumer.setCreatedBy(value); + } + }).onField(ENCRYPTION_ALGORITHM, struct(EncryptionAlgorithm.class, new Consumer<EncryptionAlgorithm>() { + @Override + public void consume(EncryptionAlgorithm encryptionAlgorithm) { + consumer.setEncryptionAlgorithm(encryptionAlgorithm); + } + })).onField(FOOTER_SIGNING_KEY_METADATA, new StringConsumer() { + @Override + public void consume(String value) { + byte[] keyMetadata = value.getBytes(StandardCharsets.UTF_8); + consumer.setFooterSigningKeyMetadata(keyMetadata); + } + }); + if (!skipRowGroups) { eventConsumer = eventConsumer.onField(ROW_GROUPS, listElementsOf(struct(RowGroup.class, new Consumer<RowGroup>() { @Override @@ -198,8 +303,16 @@ public class Util { } }))); } - new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer); + final InputStream from; + if (null == decryptor) { + from = input; + } + else { + byte[] plainText = decryptor.decrypt(input, AAD); + from = new ByteArrayInputStream(plainText); + } + new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer); } catch (TException e) { throw new IOException("can not read FileMetaData: " + e.getMessage(), e); } @@ -217,7 +330,16 @@ public class Util { return new InterningProtocol(new TCompactProtocol(t)); } - private static <T extends TBase<?,?>> T read(InputStream from, T tbase) throws IOException { + + private static <T extends TBase<?,?>> T read(final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { + final InputStream from; + if (null == decryptor) { + from = input; + } else { + byte[] plainText = decryptor.decrypt(input, AAD); + from = new ByteArrayInputStream(plainText); + } + try { tbase.read(protocol(from)); return tbase; @@ -226,11 +348,23 @@ public class Util { } } - private static void write(TBase<?, ?> tbase, OutputStream to) throws IOException { - try { - tbase.write(protocol(to)); + private static void write(TBase<?, ?> tbase, OutputStream to, BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException { + if (null == encryptor) { + try { + tbase.write(protocol(to)); + return; + } catch (TException e) { + throw new IOException("can not write " + tbase, e); + } + } + // Serialize and encrypt the structure + try (TMemoryBuffer thriftMemoryBuffer = new TMemoryBuffer(INIT_MEM_ALLOC_ENCR_BUFFER)) { + tbase.write(new InterningProtocol(new TCompactProtocol(thriftMemoryBuffer))); + byte[] encryptedBuffer = encryptor.encrypt(thriftMemoryBuffer.getArray(), AAD); + to.write(encryptedBuffer); } catch (TException e) { throw new IOException("can not write " + tbase, e); } } } + diff --git a/pom.xml b/pom.xml index 1808dba..7bfef73 100644 --- a/pom.xml +++ b/pom.xml @@ -81,7 +81,7 @@ <hadoop1.version>1.2.1</hadoop1.version> <cascading.version>2.7.1</cascading.version> <cascading3.version>3.1.2</cascading3.version> - <parquet.format.version>2.6.0</parquet.format.version> + <parquet.format.version>2.7.0-SNAPSHOT</parquet.format.version> <previous.version>1.7.0</previous.version> <thrift.executable>thrift</thrift.executable> <format.thrift.executable>thrift</format.thrift.executable>