claudevdm commented on code in PR #36217: URL: https://github.com/apache/beam/pull/36217#discussion_r2402519329
########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.Arrays; +import javax.crypto.Cipher; +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.util.Secret; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * A {@link PTransform} that provides a secure alternative to {@link + * org.apache.beam.sdk.transforms.GroupByKey}. + * + * <p>This transform encrypts the keys of the input {@link PCollection}, performs a {@link + * org.apache.beam.sdk.transforms.GroupByKey} on the encrypted keys, and then decrypts the keys in + * the output. This is useful when the keys contain sensitive data that should not be stored at rest + * by the runner. + * + * <p>The transform requires a {@link Secret} which returns a 32 byte secret which can be used to + * generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. + * + * <p>Note the following caveats: 1) Runners can implement arbitrary materialization steps, so this + * does not guarantee that the whole pipeline will not have unencrypted data at rest by itself. 2) + * If using this transform in streaming mode, this transform may not properly handle update + * compatibility checks around coders. This means that an improper update could lead to invalid + * coders, causing pipeline failure or data corruption. If you need to update, make sure that the + * input type passed into this transform does not change. + */ +public class GroupByEncryptedKey<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { + + private final Secret hmacKey; + + private GroupByEncryptedKey(Secret hmacKey) { + this.hmacKey = hmacKey; + } + + /** + * Creates a {@link GroupByEncryptedKey} transform. + * + * @param hmacKey The {@link Secret} key to use for encryption. + * @param <K> The type of the keys in the input PCollection. + * @param <V> The type of the values in the input PCollection. + * @return A {@link GroupByEncryptedKey} transform. + */ + public static <K, V> GroupByEncryptedKey<K, V> create(Secret hmacKey) { + return new GroupByEncryptedKey<>(hmacKey); + } + + @Override + public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { + Coder<KV<K, V>> inputCoder = input.getCoder(); + if (!(inputCoder instanceof KvCoder)) { + throw new IllegalStateException("GroupByEncryptedKey requires its input to use KvCoder"); + } + KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder; + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + + try { + keyCoder.verifyDeterministic(); + } catch (NonDeterministicException e) { + throw new IllegalStateException( + "the keyCoder of a GroupByEncryptedKey must be deterministic", e); + } + + Coder<V> valueCoder = inputKvCoder.getValueCoder(); + + PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>> grouped = + input + .apply( + "EncryptMessage", + ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) + .apply(GroupByKey.create()); + + return grouped + .apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))) + .setCoder(KvCoder.of(keyCoder, IterableCoder.of(valueCoder))); + } + + /** + * A {@link PTransform} that encrypts the key and value of an element. + * + * <p>The resulting PCollection will be a KV pair with the key being the HMAC of the encoded key, + * and the value being a KV pair of the encrypted key and value. + */ + @SuppressWarnings("initialization.fields.uninitialized") + private static class EncryptMessage<K, V> extends DoFn<KV<K, V>, KV<byte[], KV<byte[], byte[]>>> { + private final Secret hmacKey; + private final Coder<K> keyCoder; + private final Coder<V> valueCoder; + private transient Mac mac; + private transient Cipher cipher; + + EncryptMessage(Secret hmacKey, Coder<K> keyCoder, Coder<V> valueCoder) { + this.hmacKey = hmacKey; + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + } + + @Setup + public void setup() { + try { + this.mac = Mac.getInstance("HmacSHA256"); + this.mac.init(new SecretKeySpec(this.hmacKey.getSecretBytes(), "HmacSHA256")); Review Comment: We only have to call getSecretBytes once and se for AES and MAC? ########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.Arrays; +import javax.crypto.Cipher; +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.util.Secret; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * A {@link PTransform} that provides a secure alternative to {@link + * org.apache.beam.sdk.transforms.GroupByKey}. + * + * <p>This transform encrypts the keys of the input {@link PCollection}, performs a {@link + * org.apache.beam.sdk.transforms.GroupByKey} on the encrypted keys, and then decrypts the keys in + * the output. This is useful when the keys contain sensitive data that should not be stored at rest + * by the runner. + * + * <p>The transform requires a {@link Secret} which returns a 32 byte secret which can be used to + * generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. + * + * <p>Note the following caveats: 1) Runners can implement arbitrary materialization steps, so this + * does not guarantee that the whole pipeline will not have unencrypted data at rest by itself. 2) + * If using this transform in streaming mode, this transform may not properly handle update + * compatibility checks around coders. This means that an improper update could lead to invalid + * coders, causing pipeline failure or data corruption. If you need to update, make sure that the + * input type passed into this transform does not change. + */ +public class GroupByEncryptedKey<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { + + private final Secret hmacKey; + + private GroupByEncryptedKey(Secret hmacKey) { + this.hmacKey = hmacKey; + } + + /** + * Creates a {@link GroupByEncryptedKey} transform. + * + * @param hmacKey The {@link Secret} key to use for encryption. + * @param <K> The type of the keys in the input PCollection. + * @param <V> The type of the values in the input PCollection. + * @return A {@link GroupByEncryptedKey} transform. + */ + public static <K, V> GroupByEncryptedKey<K, V> create(Secret hmacKey) { + return new GroupByEncryptedKey<>(hmacKey); + } + + @Override + public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { + Coder<KV<K, V>> inputCoder = input.getCoder(); + if (!(inputCoder instanceof KvCoder)) { + throw new IllegalStateException("GroupByEncryptedKey requires its input to use KvCoder"); + } + KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder; + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + + try { + keyCoder.verifyDeterministic(); + } catch (NonDeterministicException e) { + throw new IllegalStateException( + "the keyCoder of a GroupByEncryptedKey must be deterministic", e); + } + + Coder<V> valueCoder = inputKvCoder.getValueCoder(); + + PCollection<KV<byte[], Iterable<KV<byte[], byte[]>>>> grouped = + input + .apply( + "EncryptMessage", + ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) + .apply(GroupByKey.create()); + + return grouped + .apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))) + .setCoder(KvCoder.of(keyCoder, IterableCoder.of(valueCoder))); + } + + /** + * A {@link PTransform} that encrypts the key and value of an element. + * + * <p>The resulting PCollection will be a KV pair with the key being the HMAC of the encoded key, + * and the value being a KV pair of the encrypted key and value. + */ + @SuppressWarnings("initialization.fields.uninitialized") + private static class EncryptMessage<K, V> extends DoFn<KV<K, V>, KV<byte[], KV<byte[], byte[]>>> { + private final Secret hmacKey; + private final Coder<K> keyCoder; + private final Coder<V> valueCoder; + private transient Mac mac; + private transient Cipher cipher; + + EncryptMessage(Secret hmacKey, Coder<K> keyCoder, Coder<V> valueCoder) { + this.hmacKey = hmacKey; + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + } + + @Setup + public void setup() { + try { + this.mac = Mac.getInstance("HmacSHA256"); + this.mac.init(new SecretKeySpec(this.hmacKey.getSecretBytes(), "HmacSHA256")); + this.cipher = Cipher.getInstance("AES"); + this.cipher.init( + Cipher.ENCRYPT_MODE, new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES")); Review Comment: I looked up this and it seems AES is not recommended/secure by default [1][2]. Looks like recommended is AES/GCM/NoPadding with IV [1] https://stackoverflow.com/questions/71728508/what-is-the-correct-way-to-use-cipher-getinstanceaes [2] https://crypto.stackexchange.com/questions/225/should-i-use-ecb-or-cbc-encryption-mode-for-my-block-cipher -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
