gszadovszky commented on a change in pull request #615:
URL: https://github.com/apache/parquet-mr/pull/615#discussion_r439346745



##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/PropertiesDrivenCryptoFactory.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.DecryptionPropertiesFactory;
+import org.apache.parquet.crypto.EncryptionPropertiesFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;
+
+public class PropertiesDrivenCryptoFactory implements 
EncryptionPropertiesFactory, DecryptionPropertiesFactory {
+
+  public static final String COLUMN_KEYS_PROPERTY_NAME = 
"encryption.column.keys";
+  public static final String FOOTER_KEY_PROPERTY_NAME = 
"encryption.footer.key";
+  public static final String ENCRYPTION_ALGORITHM_PROPERTY_NAME = 
"encryption.algorithm";
+  public static final String PLAINTEXT_FOOTER_PROPERTY_NAME = 
"encryption.plaintext.footer";
+  
+  public static final int DEK_LENGTH = 16;
+
+  private static final SecureRandom random = new SecureRandom();

Review comment:
       As per the java naming conventions:
   ```suggestion
     private static final SecureRandom RANDOM = new SecureRandom();
   ```

##########
File path: 
parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java
##########
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.keytools.KeyToolkit;
+import org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory;
+import org.apache.parquet.crypto.keytools.samples.InMemoryKMS;
+import org.apache.parquet.crypto.mocks.RemoteKmsClientMock;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Base64;
+
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+
+
+/*
+ * This file contains samples for writing and reading encrypted Parquet files 
in different
+ * encryption and decryption configurations, set using a properties-driven 
interface.
+ *
+ * The write sample produces number of parquet files, each encrypted with a 
different
+ * encryption configuration as described below.
+ * The name of each file is in the form of:
+ * <encryption-configuration-name>.parquet.encrypted or
+ * NO_ENCRYPTION.parquet for plaintext file.
+ *
+ * The read sample creates a set of decryption configurations and then uses 
each of them
+ * to read all encrypted files in the input directory.
+ *
+ * The different encryption and decryption configurations are listed below.
+ *
+ *
+ * A detailed description of the Parquet Modular Encryption specification can 
be found
+ * here:
+ * https://github.com/apache/parquet-format/blob/encryption/Encryption.md
+ *
+ * The write sample creates files with eight columns in the following
+ * encryption configurations:
+ *
+ *  - ENCRYPT_COLUMNS_AND_FOOTER:   Encrypt two columns and the footer, with 
different
+ *                                  keys.
+ *  - ENCRYPT_COLUMNS_PLAINTEXT_FOOTER:   Encrypt two columns, with different 
keys.
+ *                                  Do not encrypt footer (to enable legacy 
readers)
+ *                                  - plaintext footer mode.
+ *  - ENCRYPT_COLUMNS_AND_FOOTER_CTR:   Encrypt two columns and the footer, 
with different
+ *                                  keys. Use the alternative (AES_GCM_CTR_V1) 
algorithm.
+ *  - NO_ENCRYPTION:   Do not encrypt anything
+ *
+ *
+ *
+ * The read sample uses each of the following decryption configurations to 
read every
+ * encrypted files in the input directory:
+ *
+ *  - DECRYPT_WITH_KEY_RETRIEVER:   Decrypt using key retriever that holds the 
keys of
+ *                                  two encrypted columns and the footer key.
+ *  - NO_DECRYPTION:   Do not decrypt anything.
+ */
+@RunWith(Parameterized.class)
+public class TestPropertiesDrivenEncryption {
+  @Parameterized.Parameters(name = "Run {index}: 
isKeyMaterialExternalStorage={0} isDoubleWrapping={1} isWrapLocally={2}")
+  public static Collection<Object[]> data() {
+    Collection<Object[]> list = new ArrayList<>(8);
+    boolean[] flagValues = { false, true };
+    for (boolean keyMaterialInternalStorage : flagValues) {
+      for (boolean doubleWrapping : flagValues) {
+        for (boolean wrapLocally : flagValues) {
+          Object[] vector = {keyMaterialInternalStorage, doubleWrapping, 
wrapLocally};
+          list.add(vector);
+        }
+      }
+    }
+    return list;
+  }
+
+  @Parameterized.Parameter // first data value (0) is default
+  public boolean isKeyMaterialInternalStorage;
+
+  @Parameterized.Parameter(value = 1)
+  public boolean isDoubleWrapping;
+
+  @Parameterized.Parameter(value = 2)
+  public boolean isWrapLocally;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestPropertiesDrivenEncryption.class);
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public ErrorCollector errorCollector = new ErrorCollector();
+
+  private static final Base64.Encoder encoder = Base64.getEncoder();
+  private static final byte[] FOOTER_MASTER_KEY = 
"0123456789012345".getBytes();
+  private static final byte[] COLUMN_MASTER_KEY1 = 
"1234567890123450".getBytes();
+  private static final byte[] COLUMN_MASTER_KEY2 = 
"1234567890123451".getBytes();
+  private static final String FOOTER_MASTER_KEY_ID = "kf";
+  private static final String COLUMN_MASTER_KEY1_ID = "kc1";
+  private static final String COLUMN_MASTER_KEY2_ID = "kc2";
+
+  private static final String KEY_LIST = String.format("%s: %s, %s: %s, %s: 
%s",
+    COLUMN_MASTER_KEY1_ID, encoder.encodeToString(COLUMN_MASTER_KEY1),
+    COLUMN_MASTER_KEY2_ID, encoder.encodeToString(COLUMN_MASTER_KEY2),
+    FOOTER_MASTER_KEY_ID, encoder.encodeToString(FOOTER_MASTER_KEY));
+  private static final String COLUMN_KEY_MAPPING =
+    COLUMN_MASTER_KEY1_ID + ": double_field; " +
+    COLUMN_MASTER_KEY2_ID + ": float_field";
+  private static final boolean plaintextFilesAllowed = true;
+
+  final WriteSupport<Group> writeSupport = new GroupWriteSupport();
+
+  public enum EncryptionConfiguration {
+    ENCRYPT_COLUMNS_AND_FOOTER("ENCRYPT_COLUMNS_AND_FOOTER"),
+    ENCRYPT_COLUMNS_PLAINTEXT_FOOTER("ENCRYPT_COLUMNS_PLAINTEXT_FOOTER"),
+    ENCRYPT_COLUMNS_AND_FOOTER_CTR("ENCRYPT_COLUMNS_AND_FOOTER_CTR"),
+    NO_ENCRYPTION("NO_ENCRYPTION");
+
+    private final String configurationName;
+
+    EncryptionConfiguration(String configurationName) {
+      this.configurationName = configurationName;
+    }
+
+    @Override
+    public String toString() {
+      return configurationName;
+    }
+  }
+
+
+  public enum DecryptionConfiguration {
+    DECRYPT_WITH_KEY_RETRIEVER("DECRYPT_WITH_KEY_RETRIEVER"),
+    NO_DECRYPTION("NO_DECRYPTION");
+
+    private final String configurationName;
+
+    DecryptionConfiguration(String configurationName) {
+      this.configurationName = configurationName;
+    }
+
+    @Override
+    public String toString() {
+      return configurationName;
+    }
+  }
+
+  @Test
+  public void testWriteReadEncryptedParquetFiles() throws IOException {
+    Path rootPath = new Path(temporaryFolder.getRoot().getPath());
+    LOG.info("======== testWriteReadEncryptedParquetFiles {} ========", 
rootPath.toString());
+    LOG.info(String.format("Run: isKeyMaterialExternalStorage=%s 
isDoubleWrapping=%s isWrapLocally=%s",
+      isKeyMaterialInternalStorage, isDoubleWrapping, isWrapLocally));
+    // This map will hold various encryption configurations.
+    Map<EncryptionConfiguration, Configuration> encryptionPropertiesMap = 
getHadoopConfigurationForEncryption();
+    testWriteEncryptedParquetFiles(rootPath, encryptionPropertiesMap);
+    // This map will hold various decryption configurations.
+    Map<DecryptionConfiguration, Configuration> decryptionPropertiesMap = 
getHadoopConfigurationForDecryption();
+    testReadEncryptedParquetFiles(rootPath, decryptionPropertiesMap);
+  }
+
+
+  private void testWriteEncryptedParquetFiles(Path root, 
Map<EncryptionConfiguration, Configuration> encryptionPropertiesMap) throws 
IOException {
+    MessageType schema = parseMessageType(
+      "message test { "
+        + "required boolean boolean_field; "
+        + "required int32 int32_field; "
+        + "required float float_field; "
+        + "required double double_field; "
+        + "} ");
+
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    for (Map.Entry<EncryptionConfiguration, Configuration> 
encryptionConfigurationEntry : encryptionPropertiesMap.entrySet()) {
+      KeyToolkit.removeCacheEntriesForAllTokens();
+      EncryptionConfiguration encryptionConfiguration = 
encryptionConfigurationEntry.getKey();
+      Configuration conf = encryptionConfigurationEntry.getValue();
+
+      String suffix = (EncryptionConfiguration.NO_ENCRYPTION == 
encryptionConfiguration) ? ".parquet" : ".parquet.encrypted";
+      Path file = new Path(root, encryptionConfiguration.toString() + suffix);
+      LOG.info("\nWrite " + file.toString());
+
+      FileEncryptionProperties fileEncryptionProperties = null;
+      if (null == conf) {
+        conf = new Configuration();
+      } else {
+        EncryptionPropertiesFactory cryptoFactory = 
EncryptionPropertiesFactory.loadFactory(conf);
+        fileEncryptionProperties = 
cryptoFactory.getFileEncryptionProperties(conf, file, null);
+      }
+      ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
+        .withConf(conf)
+        .withWriteMode(OVERWRITE)
+        .withType(schema)
+        .withEncryption(fileEncryptionProperties)
+        .build();
+      for (int i = 0; i < 100; i++) {
+        boolean expect = false;
+        if ((i % 2) == 0)
+          expect = true;
+        float float_val = (float) i * 1.1f;
+        double double_val = (i * 1.1111111);
+
+        writer.write(
+          f.newGroup()
+            .append("boolean_field", expect)
+            .append("int32_field", i)
+            .append("float_field", float_val)
+            .append("double_field", double_val));
+
+      }
+      writer.close();

Review comment:
       You may use _try-with-resources_.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/samples/VaultClient.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto.keytools.samples;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
+import org.apache.parquet.crypto.KeyAccessDeniedException;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.keytools.KmsClient;
+import org.apache.parquet.crypto.keytools.RemoteKmsClient;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+public class VaultClient extends RemoteKmsClient {
+  private static final Logger LOG = LoggerFactory.getLogger(VaultClient.class);
+  private static final MediaType JSON_MEDIA_TYPE = 
MediaType.get("application/json; charset=utf-8");
+  private static final String DEFAULT_TRANSIT_ENGINE = "/v1/transit/";
+  private static final String transitWrapEndpoint = "encrypt/";
+  private static final String transitUnwrapEndpoint = "decrypt/";
+  private static final String tokenHeader="X-Vault-Token";
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  private String transitEngine = DEFAULT_TRANSIT_ENGINE;
+  private OkHttpClient httpClient = new OkHttpClient();
+
+
+  @Override
+  protected void initializeInternal() {
+    if (kmsToken.equals(KmsClient.DEFAULT_ACCESS_TOKEN)) {
+      throw new ParquetCryptoRuntimeException("Token not provided");
+    }
+    
+    if (DEFAULT_KMS_INSTANCE_ID != kmsInstanceID) {
+      transitEngine = "/v1/" + kmsInstanceID;
+      if (!transitEngine.endsWith("/")) {
+        transitEngine += "/";
+      }
+    }
+  }
+
+  @Override
+  public String wrapKeyInServer(byte[] dataKey, String masterKeyIdentifier) {
+    Map<String, String> writeKeyMap = new HashMap<String, String>(1);
+    final String dataKeyStr = Base64.getEncoder().encodeToString(dataKey);
+    writeKeyMap.put("plaintext", dataKeyStr);
+    String response = getContentFromTransitEngine(transitEngine + 
transitWrapEndpoint, buildPayload(writeKeyMap), masterKeyIdentifier);
+    String ciphertext = parseReturn(response, "ciphertext");
+    return ciphertext;
+  }
+
+  @Override
+  public byte[] unwrapKeyInServer(String wrappedKey, String 
masterKeyIdentifier) {
+    Map<String, String> writeKeyMap = new HashMap<String, String>(1);
+    writeKeyMap.put("ciphertext", wrappedKey);
+    String response = getContentFromTransitEngine(transitEngine + 
transitUnwrapEndpoint, buildPayload(writeKeyMap), masterKeyIdentifier);
+    String plaintext = parseReturn(response, "plaintext");
+    final byte[] key = Base64.getDecoder().decode(plaintext);
+    return key;
+  }
+
+  @Override
+  protected byte[] getMasterKeyFromServer(String masterKeyIdentifier) {
+    // Vault supports in-server wrapping and unwrapping. No need to fetch 
master keys.
+    throw new UnsupportedOperationException("Use server wrap/unwrap, instead 
of fetching master keys (local wrap)");
+  }
+
+  private String buildPayload(Map<String, String> paramMap) {
+    String jsonValue;
+    try {
+      jsonValue = objectMapper.writeValueAsString(paramMap);
+    } catch (IOException e) {
+      throw new ParquetCryptoRuntimeException("Failed to build payload", e);
+    }
+    return jsonValue;
+  }
+
+  private String getContentFromTransitEngine(String endPoint, String jPayload, 
String masterKeyIdentifier) {
+    LOG.info("masterKeyIdentifier: " + masterKeyIdentifier);
+    String masterKeyID = masterKeyIdentifier;
+
+    final RequestBody requestBody = RequestBody.create(JSON_MEDIA_TYPE, 
jPayload);
+    Request request = new Request.Builder()
+        .url(this.kmsURL + endPoint + masterKeyID)
+        .header(tokenHeader,  kmsToken)
+        .post(requestBody).build();
+
+    return executeAndGetResponse(endPoint, request);
+  }
+
+  private String executeAndGetResponse(String endPoint, Request request) {
+    Response response = null;
+    try {
+      response = httpClient.newCall(request).execute();
+      final String responseBody = response.body().string();
+      if (response.isSuccessful()) {
+        return responseBody;
+      } else {
+        if ((401 == response.code()) || (403 == response.code())) {
+          throw new KeyAccessDeniedException(responseBody);
+        }
+        throw new IOException("Vault call [" + endPoint + "] didn't succeed: " 
+ responseBody);
+      }
+    } catch (IOException e) {
+      throw new ParquetCryptoRuntimeException("Vault call [" + 
request.url().toString() + endPoint + "] didn't succeed", e);
+    } finally {
+      if (null != response) {
+        response.close();

Review comment:
       You may use _try-with-resources_ instead.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/PropertiesDrivenCryptoFactory.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto.keytools;
+
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.DecryptionPropertiesFactory;
+import org.apache.parquet.crypto.EncryptionPropertiesFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import static org.apache.parquet.crypto.keytools.KeyToolkit.stringIsEmpty;
+
+public class PropertiesDrivenCryptoFactory implements 
EncryptionPropertiesFactory, DecryptionPropertiesFactory {
+
+  public static final String COLUMN_KEYS_PROPERTY_NAME = 
"encryption.column.keys";
+  public static final String FOOTER_KEY_PROPERTY_NAME = 
"encryption.footer.key";
+  public static final String ENCRYPTION_ALGORITHM_PROPERTY_NAME = 
"encryption.algorithm";
+  public static final String PLAINTEXT_FOOTER_PROPERTY_NAME = 
"encryption.plaintext.footer";
+  
+  public static final int DEK_LENGTH = 16;
+
+  private static final SecureRandom random = new SecureRandom();
+
+  @Override
+  public FileEncryptionProperties getFileEncryptionProperties(Configuration 
fileHadoopConfig, Path tempFilePath,
+      WriteContext fileWriteContext) throws ParquetCryptoRuntimeException {
+
+    String footerKeyId = 
fileHadoopConfig.getTrimmed(FOOTER_KEY_PROPERTY_NAME); 
+    String columnKeysStr = 
fileHadoopConfig.getTrimmed(COLUMN_KEYS_PROPERTY_NAME);
+
+    // File shouldn't be encrypted
+    if (stringIsEmpty(footerKeyId) && stringIsEmpty(columnKeysStr)) {
+      return null; 
+    }
+
+    if (stringIsEmpty(footerKeyId)) {
+      throw new ParquetCryptoRuntimeException("Undefined footer key");
+    }
+
+    FileKeyMaterialStore keyMaterialStore = null;
+    boolean keyMaterialInternalStorage = 
fileHadoopConfig.getBoolean(KeyToolkit.KEY_MATERIAL_INTERNAL_PROPERTY_NAME, 
true);
+    if (!keyMaterialInternalStorage) {
+      try {
+        keyMaterialStore = new 
HadoopFSKeyMaterialStore(tempFilePath.getFileSystem(fileHadoopConfig));
+        keyMaterialStore.initialize(tempFilePath, fileHadoopConfig, false);
+      } catch (IOException e) {
+        throw new ParquetCryptoRuntimeException("Failed to get key material 
store", e);
+      }
+    }
+
+    FileKeyWrapper keyWrapper = new FileKeyWrapper(fileHadoopConfig, 
keyMaterialStore);
+
+    String algo = 
fileHadoopConfig.getTrimmed(ENCRYPTION_ALGORITHM_PROPERTY_NAME);
+    ParquetCipher cipher;
+    if (stringIsEmpty(algo)) {
+      cipher = ParquetCipher.AES_GCM_V1;
+    } else {
+      if (algo.equalsIgnoreCase("AES_GCM_V1")) {
+        cipher = ParquetCipher.AES_GCM_V1;
+      } else if (algo.equalsIgnoreCase("AES_GCM_CTR_V1")) {
+        cipher = ParquetCipher.AES_GCM_CTR_V1;
+      }
+      else {
+        throw new ParquetCryptoRuntimeException("Wrong encryption algorithm: " 
+ algo);
+      }
+    }
+
+    byte[] footerKeyBytes = new byte[DEK_LENGTH];
+    random.nextBytes(footerKeyBytes);
+    byte[] footerKeyMetadata = 
keyWrapper.getEncryptionKeyMetadata(footerKeyBytes, footerKeyId, true);
+
+    Map<ColumnPath, ColumnEncryptionProperties> encryptedColumns = 
getColumnEncryptionProperties(columnKeysStr, keyWrapper);
+
+    String plaintextFooterStr = 
fileHadoopConfig.getTrimmed(PLAINTEXT_FOOTER_PROPERTY_NAME);
+    boolean plaintextFooter = Boolean.parseBoolean(plaintextFooterStr);

Review comment:
       I think, `Configuration` get return `boolean` directly.

##########
File path: 
parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java
##########
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.keytools.KeyToolkit;
+import org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory;
+import org.apache.parquet.crypto.keytools.samples.InMemoryKMS;
+import org.apache.parquet.crypto.mocks.RemoteKmsClientMock;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Base64;
+
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+
+
+/*
+ * This file contains samples for writing and reading encrypted Parquet files 
in different
+ * encryption and decryption configurations, set using a properties-driven 
interface.
+ *
+ * The write sample produces number of parquet files, each encrypted with a 
different
+ * encryption configuration as described below.
+ * The name of each file is in the form of:
+ * <encryption-configuration-name>.parquet.encrypted or
+ * NO_ENCRYPTION.parquet for plaintext file.
+ *
+ * The read sample creates a set of decryption configurations and then uses 
each of them
+ * to read all encrypted files in the input directory.
+ *
+ * The different encryption and decryption configurations are listed below.
+ *
+ *
+ * A detailed description of the Parquet Modular Encryption specification can 
be found
+ * here:
+ * https://github.com/apache/parquet-format/blob/encryption/Encryption.md
+ *
+ * The write sample creates files with eight columns in the following
+ * encryption configurations:
+ *
+ *  - ENCRYPT_COLUMNS_AND_FOOTER:   Encrypt two columns and the footer, with 
different
+ *                                  keys.
+ *  - ENCRYPT_COLUMNS_PLAINTEXT_FOOTER:   Encrypt two columns, with different 
keys.
+ *                                  Do not encrypt footer (to enable legacy 
readers)
+ *                                  - plaintext footer mode.
+ *  - ENCRYPT_COLUMNS_AND_FOOTER_CTR:   Encrypt two columns and the footer, 
with different
+ *                                  keys. Use the alternative (AES_GCM_CTR_V1) 
algorithm.
+ *  - NO_ENCRYPTION:   Do not encrypt anything
+ *
+ *
+ *
+ * The read sample uses each of the following decryption configurations to 
read every
+ * encrypted files in the input directory:
+ *
+ *  - DECRYPT_WITH_KEY_RETRIEVER:   Decrypt using key retriever that holds the 
keys of
+ *                                  two encrypted columns and the footer key.
+ *  - NO_DECRYPTION:   Do not decrypt anything.
+ */
+@RunWith(Parameterized.class)
+public class TestPropertiesDrivenEncryption {
+  @Parameterized.Parameters(name = "Run {index}: 
isKeyMaterialExternalStorage={0} isDoubleWrapping={1} isWrapLocally={2}")
+  public static Collection<Object[]> data() {
+    Collection<Object[]> list = new ArrayList<>(8);
+    boolean[] flagValues = { false, true };
+    for (boolean keyMaterialInternalStorage : flagValues) {
+      for (boolean doubleWrapping : flagValues) {
+        for (boolean wrapLocally : flagValues) {
+          Object[] vector = {keyMaterialInternalStorage, doubleWrapping, 
wrapLocally};
+          list.add(vector);
+        }
+      }
+    }
+    return list;
+  }
+
+  @Parameterized.Parameter // first data value (0) is default
+  public boolean isKeyMaterialInternalStorage;
+
+  @Parameterized.Parameter(value = 1)
+  public boolean isDoubleWrapping;
+
+  @Parameterized.Parameter(value = 2)
+  public boolean isWrapLocally;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestPropertiesDrivenEncryption.class);
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public ErrorCollector errorCollector = new ErrorCollector();
+
+  private static final Base64.Encoder encoder = Base64.getEncoder();
+  private static final byte[] FOOTER_MASTER_KEY = 
"0123456789012345".getBytes();
+  private static final byte[] COLUMN_MASTER_KEY1 = 
"1234567890123450".getBytes();
+  private static final byte[] COLUMN_MASTER_KEY2 = 
"1234567890123451".getBytes();
+  private static final String FOOTER_MASTER_KEY_ID = "kf";
+  private static final String COLUMN_MASTER_KEY1_ID = "kc1";
+  private static final String COLUMN_MASTER_KEY2_ID = "kc2";
+
+  private static final String KEY_LIST = String.format("%s: %s, %s: %s, %s: 
%s",
+    COLUMN_MASTER_KEY1_ID, encoder.encodeToString(COLUMN_MASTER_KEY1),
+    COLUMN_MASTER_KEY2_ID, encoder.encodeToString(COLUMN_MASTER_KEY2),
+    FOOTER_MASTER_KEY_ID, encoder.encodeToString(FOOTER_MASTER_KEY));
+  private static final String COLUMN_KEY_MAPPING =
+    COLUMN_MASTER_KEY1_ID + ": double_field; " +
+    COLUMN_MASTER_KEY2_ID + ": float_field";
+  private static final boolean plaintextFilesAllowed = true;
+
+  final WriteSupport<Group> writeSupport = new GroupWriteSupport();
+
+  public enum EncryptionConfiguration {
+    ENCRYPT_COLUMNS_AND_FOOTER("ENCRYPT_COLUMNS_AND_FOOTER"),
+    ENCRYPT_COLUMNS_PLAINTEXT_FOOTER("ENCRYPT_COLUMNS_PLAINTEXT_FOOTER"),
+    ENCRYPT_COLUMNS_AND_FOOTER_CTR("ENCRYPT_COLUMNS_AND_FOOTER_CTR"),
+    NO_ENCRYPTION("NO_ENCRYPTION");
+
+    private final String configurationName;
+
+    EncryptionConfiguration(String configurationName) {
+      this.configurationName = configurationName;
+    }
+
+    @Override
+    public String toString() {
+      return configurationName;
+    }
+  }
+
+
+  public enum DecryptionConfiguration {
+    DECRYPT_WITH_KEY_RETRIEVER("DECRYPT_WITH_KEY_RETRIEVER"),
+    NO_DECRYPTION("NO_DECRYPTION");
+
+    private final String configurationName;
+
+    DecryptionConfiguration(String configurationName) {
+      this.configurationName = configurationName;
+    }
+
+    @Override
+    public String toString() {
+      return configurationName;
+    }
+  }
+
+  @Test
+  public void testWriteReadEncryptedParquetFiles() throws IOException {
+    Path rootPath = new Path(temporaryFolder.getRoot().getPath());
+    LOG.info("======== testWriteReadEncryptedParquetFiles {} ========", 
rootPath.toString());
+    LOG.info(String.format("Run: isKeyMaterialExternalStorage=%s 
isDoubleWrapping=%s isWrapLocally=%s",
+      isKeyMaterialInternalStorage, isDoubleWrapping, isWrapLocally));
+    // This map will hold various encryption configurations.
+    Map<EncryptionConfiguration, Configuration> encryptionPropertiesMap = 
getHadoopConfigurationForEncryption();
+    testWriteEncryptedParquetFiles(rootPath, encryptionPropertiesMap);
+    // This map will hold various decryption configurations.
+    Map<DecryptionConfiguration, Configuration> decryptionPropertiesMap = 
getHadoopConfigurationForDecryption();
+    testReadEncryptedParquetFiles(rootPath, decryptionPropertiesMap);
+  }
+
+
+  private void testWriteEncryptedParquetFiles(Path root, 
Map<EncryptionConfiguration, Configuration> encryptionPropertiesMap) throws 
IOException {
+    MessageType schema = parseMessageType(
+      "message test { "
+        + "required boolean boolean_field; "
+        + "required int32 int32_field; "
+        + "required float float_field; "
+        + "required double double_field; "
+        + "} ");
+
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    for (Map.Entry<EncryptionConfiguration, Configuration> 
encryptionConfigurationEntry : encryptionPropertiesMap.entrySet()) {
+      KeyToolkit.removeCacheEntriesForAllTokens();
+      EncryptionConfiguration encryptionConfiguration = 
encryptionConfigurationEntry.getKey();
+      Configuration conf = encryptionConfigurationEntry.getValue();
+
+      String suffix = (EncryptionConfiguration.NO_ENCRYPTION == 
encryptionConfiguration) ? ".parquet" : ".parquet.encrypted";
+      Path file = new Path(root, encryptionConfiguration.toString() + suffix);
+      LOG.info("\nWrite " + file.toString());
+
+      FileEncryptionProperties fileEncryptionProperties = null;
+      if (null == conf) {
+        conf = new Configuration();
+      } else {
+        EncryptionPropertiesFactory cryptoFactory = 
EncryptionPropertiesFactory.loadFactory(conf);
+        fileEncryptionProperties = 
cryptoFactory.getFileEncryptionProperties(conf, file, null);
+      }
+      ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
+        .withConf(conf)
+        .withWriteMode(OVERWRITE)
+        .withType(schema)
+        .withEncryption(fileEncryptionProperties)
+        .build();
+      for (int i = 0; i < 100; i++) {
+        boolean expect = false;
+        if ((i % 2) == 0)
+          expect = true;
+        float float_val = (float) i * 1.1f;
+        double double_val = (i * 1.1111111);
+
+        writer.write(
+          f.newGroup()
+            .append("boolean_field", expect)
+            .append("int32_field", i)
+            .append("float_field", float_val)
+            .append("double_field", double_val));
+
+      }
+      writer.close();
+    }
+  }
+
+  private void testReadEncryptedParquetFiles(Path root, 
Map<DecryptionConfiguration, Configuration> decryptionPropertiesMap) throws 
IOException {
+    for (Map.Entry<DecryptionConfiguration, Configuration> 
decryptionConfigurationEntry : decryptionPropertiesMap.entrySet()) {
+      DecryptionConfiguration decryptionConfiguration = 
decryptionConfigurationEntry.getKey();
+      LOG.info("\n\n");
+      LOG.info("==> Decryption configuration {}\n", decryptionConfiguration);
+
+      File folder = new File(root.toString());
+      File[] listOfFiles = folder.listFiles();
+
+      for (int fileNum = 0; fileNum < listOfFiles.length; fileNum++) {
+        KeyToolkit.removeCacheEntriesForAllTokens();
+        Path file = new Path(listOfFiles[fileNum].getAbsolutePath());
+        if (!file.getName().endsWith(".parquet.encrypted") && 
!file.getName().endsWith(".parquet")) { // Skip non-parquet files
+          continue;
+        }
+        EncryptionConfiguration encryptionConfiguration = 
getEncryptionConfigurationFromFilename(file.getName());

Review comment:
       I think, it would be more clean and less error prone if we would iterate 
on the Encryption configurations and expect the related file to be there 
instead of skipping all the files that do not match. BTW, there should be no 
files in that temporary directory that was not created by this test.

##########
File path: 
parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java
##########
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.keytools.KeyToolkit;
+import org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory;
+import org.apache.parquet.crypto.keytools.samples.InMemoryKMS;
+import org.apache.parquet.crypto.mocks.RemoteKmsClientMock;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Base64;
+
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+
+
+/*
+ * This file contains samples for writing and reading encrypted Parquet files 
in different
+ * encryption and decryption configurations, set using a properties-driven 
interface.
+ *
+ * The write sample produces number of parquet files, each encrypted with a 
different
+ * encryption configuration as described below.
+ * The name of each file is in the form of:
+ * <encryption-configuration-name>.parquet.encrypted or
+ * NO_ENCRYPTION.parquet for plaintext file.
+ *
+ * The read sample creates a set of decryption configurations and then uses 
each of them
+ * to read all encrypted files in the input directory.
+ *
+ * The different encryption and decryption configurations are listed below.
+ *
+ *
+ * A detailed description of the Parquet Modular Encryption specification can 
be found
+ * here:
+ * https://github.com/apache/parquet-format/blob/encryption/Encryption.md
+ *
+ * The write sample creates files with eight columns in the following
+ * encryption configurations:
+ *
+ *  - ENCRYPT_COLUMNS_AND_FOOTER:   Encrypt two columns and the footer, with 
different
+ *                                  keys.
+ *  - ENCRYPT_COLUMNS_PLAINTEXT_FOOTER:   Encrypt two columns, with different 
keys.
+ *                                  Do not encrypt footer (to enable legacy 
readers)
+ *                                  - plaintext footer mode.
+ *  - ENCRYPT_COLUMNS_AND_FOOTER_CTR:   Encrypt two columns and the footer, 
with different
+ *                                  keys. Use the alternative (AES_GCM_CTR_V1) 
algorithm.
+ *  - NO_ENCRYPTION:   Do not encrypt anything
+ *
+ *
+ *
+ * The read sample uses each of the following decryption configurations to 
read every
+ * encrypted files in the input directory:
+ *
+ *  - DECRYPT_WITH_KEY_RETRIEVER:   Decrypt using key retriever that holds the 
keys of
+ *                                  two encrypted columns and the footer key.
+ *  - NO_DECRYPTION:   Do not decrypt anything.
+ */
+@RunWith(Parameterized.class)
+public class TestPropertiesDrivenEncryption {

Review comment:
       This test is similar to the initial version of `TestEncryptionOptions`. 
You may want to run through the comments and the related changes in #782.

##########
File path: 
parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java
##########
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.keytools.KeyToolkit;
+import org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory;
+import org.apache.parquet.crypto.keytools.samples.InMemoryKMS;
+import org.apache.parquet.crypto.mocks.RemoteKmsClientMock;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Base64;
+
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+
+
+/*
+ * This file contains samples for writing and reading encrypted Parquet files 
in different
+ * encryption and decryption configurations, set using a properties-driven 
interface.
+ *
+ * The write sample produces number of parquet files, each encrypted with a 
different
+ * encryption configuration as described below.
+ * The name of each file is in the form of:
+ * <encryption-configuration-name>.parquet.encrypted or
+ * NO_ENCRYPTION.parquet for plaintext file.
+ *
+ * The read sample creates a set of decryption configurations and then uses 
each of them
+ * to read all encrypted files in the input directory.
+ *
+ * The different encryption and decryption configurations are listed below.
+ *
+ *
+ * A detailed description of the Parquet Modular Encryption specification can 
be found
+ * here:
+ * https://github.com/apache/parquet-format/blob/encryption/Encryption.md
+ *
+ * The write sample creates files with eight columns in the following
+ * encryption configurations:
+ *
+ *  - ENCRYPT_COLUMNS_AND_FOOTER:   Encrypt two columns and the footer, with 
different
+ *                                  keys.
+ *  - ENCRYPT_COLUMNS_PLAINTEXT_FOOTER:   Encrypt two columns, with different 
keys.
+ *                                  Do not encrypt footer (to enable legacy 
readers)
+ *                                  - plaintext footer mode.
+ *  - ENCRYPT_COLUMNS_AND_FOOTER_CTR:   Encrypt two columns and the footer, 
with different
+ *                                  keys. Use the alternative (AES_GCM_CTR_V1) 
algorithm.
+ *  - NO_ENCRYPTION:   Do not encrypt anything
+ *
+ *
+ *
+ * The read sample uses each of the following decryption configurations to 
read every
+ * encrypted files in the input directory:
+ *
+ *  - DECRYPT_WITH_KEY_RETRIEVER:   Decrypt using key retriever that holds the 
keys of
+ *                                  two encrypted columns and the footer key.
+ *  - NO_DECRYPTION:   Do not decrypt anything.
+ */
+@RunWith(Parameterized.class)
+public class TestPropertiesDrivenEncryption {
+  @Parameterized.Parameters(name = "Run {index}: 
isKeyMaterialExternalStorage={0} isDoubleWrapping={1} isWrapLocally={2}")
+  public static Collection<Object[]> data() {
+    Collection<Object[]> list = new ArrayList<>(8);
+    boolean[] flagValues = { false, true };
+    for (boolean keyMaterialInternalStorage : flagValues) {
+      for (boolean doubleWrapping : flagValues) {
+        for (boolean wrapLocally : flagValues) {
+          Object[] vector = {keyMaterialInternalStorage, doubleWrapping, 
wrapLocally};
+          list.add(vector);
+        }
+      }
+    }
+    return list;
+  }
+
+  @Parameterized.Parameter // first data value (0) is default
+  public boolean isKeyMaterialInternalStorage;
+
+  @Parameterized.Parameter(value = 1)
+  public boolean isDoubleWrapping;
+
+  @Parameterized.Parameter(value = 2)
+  public boolean isWrapLocally;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestPropertiesDrivenEncryption.class);
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public ErrorCollector errorCollector = new ErrorCollector();
+
+  private static final Base64.Encoder encoder = Base64.getEncoder();
+  private static final byte[] FOOTER_MASTER_KEY = 
"0123456789012345".getBytes();
+  private static final byte[] COLUMN_MASTER_KEY1 = 
"1234567890123450".getBytes();
+  private static final byte[] COLUMN_MASTER_KEY2 = 
"1234567890123451".getBytes();
+  private static final String FOOTER_MASTER_KEY_ID = "kf";
+  private static final String COLUMN_MASTER_KEY1_ID = "kc1";
+  private static final String COLUMN_MASTER_KEY2_ID = "kc2";
+
+  private static final String KEY_LIST = String.format("%s: %s, %s: %s, %s: 
%s",
+    COLUMN_MASTER_KEY1_ID, encoder.encodeToString(COLUMN_MASTER_KEY1),
+    COLUMN_MASTER_KEY2_ID, encoder.encodeToString(COLUMN_MASTER_KEY2),
+    FOOTER_MASTER_KEY_ID, encoder.encodeToString(FOOTER_MASTER_KEY));
+  private static final String COLUMN_KEY_MAPPING =
+    COLUMN_MASTER_KEY1_ID + ": double_field; " +
+    COLUMN_MASTER_KEY2_ID + ": float_field";
+  private static final boolean plaintextFilesAllowed = true;
+
+  final WriteSupport<Group> writeSupport = new GroupWriteSupport();
+
+  public enum EncryptionConfiguration {
+    ENCRYPT_COLUMNS_AND_FOOTER("ENCRYPT_COLUMNS_AND_FOOTER"),
+    ENCRYPT_COLUMNS_PLAINTEXT_FOOTER("ENCRYPT_COLUMNS_PLAINTEXT_FOOTER"),
+    ENCRYPT_COLUMNS_AND_FOOTER_CTR("ENCRYPT_COLUMNS_AND_FOOTER_CTR"),
+    NO_ENCRYPTION("NO_ENCRYPTION");
+
+    private final String configurationName;
+
+    EncryptionConfiguration(String configurationName) {
+      this.configurationName = configurationName;
+    }
+
+    @Override
+    public String toString() {
+      return configurationName;
+    }
+  }
+
+
+  public enum DecryptionConfiguration {
+    DECRYPT_WITH_KEY_RETRIEVER("DECRYPT_WITH_KEY_RETRIEVER"),
+    NO_DECRYPTION("NO_DECRYPTION");
+
+    private final String configurationName;
+
+    DecryptionConfiguration(String configurationName) {
+      this.configurationName = configurationName;
+    }
+
+    @Override
+    public String toString() {
+      return configurationName;
+    }
+  }
+
+  @Test
+  public void testWriteReadEncryptedParquetFiles() throws IOException {
+    Path rootPath = new Path(temporaryFolder.getRoot().getPath());
+    LOG.info("======== testWriteReadEncryptedParquetFiles {} ========", 
rootPath.toString());
+    LOG.info(String.format("Run: isKeyMaterialExternalStorage=%s 
isDoubleWrapping=%s isWrapLocally=%s",
+      isKeyMaterialInternalStorage, isDoubleWrapping, isWrapLocally));
+    // This map will hold various encryption configurations.
+    Map<EncryptionConfiguration, Configuration> encryptionPropertiesMap = 
getHadoopConfigurationForEncryption();
+    testWriteEncryptedParquetFiles(rootPath, encryptionPropertiesMap);
+    // This map will hold various decryption configurations.
+    Map<DecryptionConfiguration, Configuration> decryptionPropertiesMap = 
getHadoopConfigurationForDecryption();
+    testReadEncryptedParquetFiles(rootPath, decryptionPropertiesMap);
+  }
+
+
+  private void testWriteEncryptedParquetFiles(Path root, 
Map<EncryptionConfiguration, Configuration> encryptionPropertiesMap) throws 
IOException {
+    MessageType schema = parseMessageType(
+      "message test { "
+        + "required boolean boolean_field; "
+        + "required int32 int32_field; "
+        + "required float float_field; "
+        + "required double double_field; "
+        + "} ");
+
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    for (Map.Entry<EncryptionConfiguration, Configuration> 
encryptionConfigurationEntry : encryptionPropertiesMap.entrySet()) {
+      KeyToolkit.removeCacheEntriesForAllTokens();
+      EncryptionConfiguration encryptionConfiguration = 
encryptionConfigurationEntry.getKey();
+      Configuration conf = encryptionConfigurationEntry.getValue();
+
+      String suffix = (EncryptionConfiguration.NO_ENCRYPTION == 
encryptionConfiguration) ? ".parquet" : ".parquet.encrypted";
+      Path file = new Path(root, encryptionConfiguration.toString() + suffix);
+      LOG.info("\nWrite " + file.toString());
+
+      FileEncryptionProperties fileEncryptionProperties = null;
+      if (null == conf) {
+        conf = new Configuration();
+      } else {
+        EncryptionPropertiesFactory cryptoFactory = 
EncryptionPropertiesFactory.loadFactory(conf);
+        fileEncryptionProperties = 
cryptoFactory.getFileEncryptionProperties(conf, file, null);
+      }
+      ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
+        .withConf(conf)
+        .withWriteMode(OVERWRITE)
+        .withType(schema)
+        .withEncryption(fileEncryptionProperties)
+        .build();
+      for (int i = 0; i < 100; i++) {
+        boolean expect = false;
+        if ((i % 2) == 0)
+          expect = true;
+        float float_val = (float) i * 1.1f;
+        double double_val = (i * 1.1111111);
+
+        writer.write(
+          f.newGroup()
+            .append("boolean_field", expect)
+            .append("int32_field", i)
+            .append("float_field", float_val)
+            .append("double_field", double_val));
+
+      }
+      writer.close();
+    }
+  }
+
+  private void testReadEncryptedParquetFiles(Path root, 
Map<DecryptionConfiguration, Configuration> decryptionPropertiesMap) throws 
IOException {
+    for (Map.Entry<DecryptionConfiguration, Configuration> 
decryptionConfigurationEntry : decryptionPropertiesMap.entrySet()) {
+      DecryptionConfiguration decryptionConfiguration = 
decryptionConfigurationEntry.getKey();
+      LOG.info("\n\n");
+      LOG.info("==> Decryption configuration {}\n", decryptionConfiguration);
+
+      File folder = new File(root.toString());
+      File[] listOfFiles = folder.listFiles();
+
+      for (int fileNum = 0; fileNum < listOfFiles.length; fileNum++) {
+        KeyToolkit.removeCacheEntriesForAllTokens();
+        Path file = new Path(listOfFiles[fileNum].getAbsolutePath());
+        if (!file.getName().endsWith(".parquet.encrypted") && 
!file.getName().endsWith(".parquet")) { // Skip non-parquet files
+          continue;
+        }
+        EncryptionConfiguration encryptionConfiguration = 
getEncryptionConfigurationFromFilename(file.getName());
+        if (null == encryptionConfiguration) {
+          continue;
+        }
+        LOG.info("--> Read file {} {}", file.toString(), 
encryptionConfiguration);
+
+        FileDecryptionProperties fileDecryptionProperties = null;
+        Configuration hadoopConfig = decryptionConfigurationEntry.getValue();
+        if (null == hadoopConfig) {
+          hadoopConfig = new Configuration();
+        } else {
+          DecryptionPropertiesFactory cryptoFactory = 
DecryptionPropertiesFactory.loadFactory(hadoopConfig);
+          fileDecryptionProperties = 
cryptoFactory.getFileDecryptionProperties(hadoopConfig, file);
+        }
+
+        // Read only the non-encrypted columns
+        if ((decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) 
&&
+          (encryptionConfiguration == 
EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) {
+          hadoopConfig.set("parquet.read.schema", Types.buildMessage()
+            .required(BOOLEAN).named("boolean_field")
+            .required(INT32).named("int32_field")
+            .named("FormatTestObject").toString());
+        }
+        ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), file)
+          .withConf(hadoopConfig)
+          .withDecryption(fileDecryptionProperties)
+          .build();
+        try {
+          for (int i = 0; i < 500; i++) {
+            Group group = null;
+            group = reader.read();
+            boolean expect = false;
+            if ((i % 2) == 0)
+              expect = true;
+            boolean bool_res = group.getBoolean("boolean_field", 0);
+            if (bool_res != expect)
+              addErrorToErrorCollectorAndLog("Wrong bool", 
encryptionConfiguration, decryptionConfiguration);
+            int int_res = group.getInteger("int32_field", 0);
+            if (int_res != i)
+              addErrorToErrorCollectorAndLog("Wrong int", 
encryptionConfiguration, decryptionConfiguration);
+            if (decryptionConfiguration != 
DecryptionConfiguration.NO_DECRYPTION) {
+              float float_res = group.getFloat("float_field", 0);
+              float tmp1 = (float) i * 1.1f;
+              if (float_res != tmp1)
+                addErrorToErrorCollectorAndLog("Wrong float", 
encryptionConfiguration, decryptionConfiguration);
+
+              double double_res = group.getDouble("double_field", 0);
+              double tmp = (i * 1.1111111);
+              if (double_res != tmp)
+                addErrorToErrorCollectorAndLog("Wrong double", 
encryptionConfiguration, decryptionConfiguration);
+            }
+          }
+        } catch (Exception e) {
+          String errorMessage = e.getMessage();
+          checkResult(file.getName(), decryptionConfiguration, (null == 
errorMessage ? "" : errorMessage));
+        }

Review comment:
       You may use _try-with-resources_.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to