This is an automated email from the ASF dual-hosted git repository. amaliujia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new fa028e6 [BEAM-10074] | implement hashing functions new 706a06c Merge pull request #11817 from darshanj/BEAM-10074 fa028e6 is described below commit fa028e68e47c2a47858783a5a5f7adc15569c654 Author: darshan jani <darshanjani...@gmail.com> AuthorDate: Tue May 26 22:36:35 2020 +0800 [BEAM-10074] | implement hashing functions --- .../sql/impl/udf/BuiltinHashFunctions.java | 139 +++++++++++++++++++++ .../beam/sdk/extensions/sql/BeamSqlDslBase.java | 6 + .../udf/BeamSalUhfSpecialTypeAndValueTest.java | 69 ++++++++++ .../sql/impl/udf/BeamSqlUdfExpressionTest.java | 41 ++++++ 4 files changed, 255 insertions(+) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinHashFunctions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinHashFunctions.java new file mode 100644 index 0000000..c3fc82b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinHashFunctions.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.udf; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.function.Strict; + +/** Hash Functions. */ +@AutoService(BeamBuiltinFunctionProvider.class) +public class BuiltinHashFunctions extends BeamBuiltinFunctionProvider { + + /** + * MD5(X) + * + * <p>Calculates the MD5 digest and returns the value as a 16 element {@code byte[]}. + */ + @UDF( + funcName = "MD5", + parameterArray = {Schema.TypeName.STRING}, + returnType = Schema.TypeName.BYTES) + @Strict + public byte[] md5String(String str) { + return org.apache.commons.codec.digest.DigestUtils.md5(str); + } + + /** + * MD5(X) + * + * <p>Calculates the MD5 digest and returns the value as a 16 element {@code byte[]}. + */ + @UDF( + funcName = "MD5", + parameterArray = {Schema.TypeName.BYTES}, + returnType = Schema.TypeName.BYTES) + @Strict + public byte[] md5Bytes(byte[] bytes) { + return org.apache.commons.codec.digest.DigestUtils.md5(bytes); + } + + /** + * SHA1(X) + * + * <p>Calculates the SHA-1 digest and returns the value as a {@code byte[]}. + */ + @UDF( + funcName = "SHA1", + parameterArray = {Schema.TypeName.STRING}, + returnType = Schema.TypeName.BYTES) + @Strict + public byte[] sha1String(String str) { + return org.apache.commons.codec.digest.DigestUtils.sha1(str); + } + + /** + * SHA1(X) + * + * <p>Calculates the SHA-1 digest and returns the value as a {@code byte[]}. + */ + @UDF( + funcName = "SHA1", + parameterArray = {Schema.TypeName.BYTES}, + returnType = Schema.TypeName.BYTES) + @Strict + public byte[] sha1Bytes(byte[] bytes) { + return org.apache.commons.codec.digest.DigestUtils.sha1(bytes); + } + + /** + * SHA256(X) + * + * <p>Calculates the SHA-1 digest and returns the value as a {@code byte[]}. + */ + @UDF( + funcName = "SHA256", + parameterArray = {Schema.TypeName.STRING}, + returnType = Schema.TypeName.BYTES) + @Strict + public byte[] sha256String(String str) { + return org.apache.commons.codec.digest.DigestUtils.sha256(str); + } + + /** + * SHA256(X) + * + * <p>Calculates the SHA-1 digest and returns the value as a {@code byte[]}. + */ + @UDF( + funcName = "SHA256", + parameterArray = {Schema.TypeName.BYTES}, + returnType = Schema.TypeName.BYTES) + @Strict + public byte[] sha256Bytes(byte[] bytes) { + return org.apache.commons.codec.digest.DigestUtils.sha256(bytes); + } + + /** + * SHA512(X) + * + * <p>Calculates the SHA-1 digest and returns the value as a {@code byte[]}. + */ + @UDF( + funcName = "SHA512", + parameterArray = {Schema.TypeName.STRING}, + returnType = Schema.TypeName.BYTES) + @Strict + public byte[] sha512String(String str) { + return org.apache.commons.codec.digest.DigestUtils.sha512(str); + } + + /** + * SHA512(X) + * + * <p>Calculates the SHA-1 digest and returns the value as a {@code byte[]}. + */ + @UDF( + funcName = "SHA512", + parameterArray = {Schema.TypeName.BYTES}, + returnType = Schema.TypeName.BYTES) + @Strict + public byte[] sha512Bytes(byte[] bytes) { + return org.apache.commons.codec.digest.DigestUtils.sha512(bytes); + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java index ad26d4a..4298c07 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java @@ -213,6 +213,12 @@ public class BeamSqlDslBase { "TO_HEX", "abcABC".getBytes(UTF_8), "TO_HEX", + "abcABCжщфЖЩФ".getBytes(UTF_8), + "HashingFn", + "foobar".getBytes(UTF_8), + "HashingFn", + " ".getBytes(UTF_8), + "HashingFn", "abcABCжщфЖЩФ".getBytes(UTF_8)) .getRows(); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSalUhfSpecialTypeAndValueTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSalUhfSpecialTypeAndValueTest.java index 1370c62..db87eb7 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSalUhfSpecialTypeAndValueTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSalUhfSpecialTypeAndValueTest.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.commons.codec.digest.DigestUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -167,4 +168,72 @@ public class BeamSalUhfSpecialTypeAndValueTest extends BeamSqlDslBase { resultRow8); pipeline.run().waitUntilFinish(); } + + @Test + public void testMd5() throws Exception { + Schema resultType = Schema.builder().addByteArrayField("field").build(); + Row resultRow1 = + Row.withSchema(resultType).addValues(DigestUtils.md5("foobar".getBytes(UTF_8))).build(); + Row resultRow2 = + Row.withSchema(resultType).addValues(DigestUtils.md5(" ".getBytes(UTF_8))).build(); + Row resultRow3 = + Row.withSchema(resultType) + .addValues(DigestUtils.md5("abcABCжщфЖЩФ".getBytes(UTF_8))) + .build(); + String sql = "SELECT MD5(f_bytes) FROM PCOLLECTION WHERE f_func = 'HashingFn'"; + PCollection<Row> result = boundedInputBytes.apply("testUdf", SqlTransform.query(sql)); + PAssert.that(result).containsInAnyOrder(resultRow1, resultRow2, resultRow3); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testSHA1() throws Exception { + Schema resultType = Schema.builder().addByteArrayField("field").build(); + Row resultRow1 = + Row.withSchema(resultType).addValues(DigestUtils.sha1("foobar".getBytes(UTF_8))).build(); + Row resultRow2 = + Row.withSchema(resultType).addValues(DigestUtils.sha1(" ".getBytes(UTF_8))).build(); + Row resultRow3 = + Row.withSchema(resultType) + .addValues(DigestUtils.sha1("abcABCжщфЖЩФ".getBytes(UTF_8))) + .build(); + String sql = "SELECT SHA1(f_bytes) FROM PCOLLECTION WHERE f_func = 'HashingFn'"; + PCollection<Row> result = boundedInputBytes.apply("testUdf", SqlTransform.query(sql)); + PAssert.that(result).containsInAnyOrder(resultRow1, resultRow2, resultRow3); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testSHA256() throws Exception { + Schema resultType = Schema.builder().addByteArrayField("field").build(); + Row resultRow1 = + Row.withSchema(resultType).addValues(DigestUtils.sha256("foobar".getBytes(UTF_8))).build(); + Row resultRow2 = + Row.withSchema(resultType).addValues(DigestUtils.sha256(" ".getBytes(UTF_8))).build(); + Row resultRow3 = + Row.withSchema(resultType) + .addValues(DigestUtils.sha256("abcABCжщфЖЩФ".getBytes(UTF_8))) + .build(); + String sql = "SELECT SHA256(f_bytes) FROM PCOLLECTION WHERE f_func = 'HashingFn'"; + PCollection<Row> result = boundedInputBytes.apply("testUdf", SqlTransform.query(sql)); + PAssert.that(result).containsInAnyOrder(resultRow1, resultRow2, resultRow3); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testSHA512() throws Exception { + Schema resultType = Schema.builder().addByteArrayField("field").build(); + Row resultRow1 = + Row.withSchema(resultType).addValues(DigestUtils.sha512("foobar".getBytes(UTF_8))).build(); + Row resultRow2 = + Row.withSchema(resultType).addValues(DigestUtils.sha512(" ".getBytes(UTF_8))).build(); + Row resultRow3 = + Row.withSchema(resultType) + .addValues(DigestUtils.sha512("abcABCжщфЖЩФ".getBytes(UTF_8))) + .build(); + String sql = "SELECT SHA512(f_bytes) FROM PCOLLECTION WHERE f_func = 'HashingFn'"; + PCollection<Row> result = boundedInputBytes.apply("testUdf", SqlTransform.query(sql)); + PAssert.that(result).containsInAnyOrder(resultRow1, resultRow2, resultRow3); + pipeline.run().waitUntilFinish(); + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfExpressionTest.java index 82ffccf..495f329 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfExpressionTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamSqlUdfExpressionTest.java @@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import org.apache.beam.sdk.extensions.sql.integrationtest.BeamSqlBuiltinFunctionsIntegrationTestBase; import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.commons.codec.digest.DigestUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -199,4 +200,44 @@ public class BeamSqlUdfExpressionTest extends BeamSqlBuiltinFunctionsIntegration checker.buildRunAndCheck(); } + + @Test + public void testMd5() throws Exception { + ExpressionChecker checker = + new ExpressionChecker() + .addExpr("MD5('foobar')", DigestUtils.md5("foobar")) + .addExpr("MD5('中文')", DigestUtils.md5("中文")) + .addExprWithNullExpectedValue("MD5(CAST(NULL AS VARCHAR(0)))", TypeName.BYTES); + checker.buildRunAndCheck(); + } + + @Test + public void testSHA1() throws Exception { + ExpressionChecker checker = + new ExpressionChecker() + .addExpr("SHA1('foobar')", DigestUtils.sha1("foobar")) + .addExpr("SHA1('中文')", DigestUtils.sha1("中文")) + .addExprWithNullExpectedValue("SHA1(CAST(NULL AS VARCHAR(0)))", TypeName.BYTES); + checker.buildRunAndCheck(); + } + + @Test + public void testSHA256() throws Exception { + ExpressionChecker checker = + new ExpressionChecker() + .addExpr("SHA256('foobar')", DigestUtils.sha256("foobar")) + .addExpr("SHA256('中文')", DigestUtils.sha256("中文")) + .addExprWithNullExpectedValue("SHA256(CAST(NULL AS VARCHAR(0)))", TypeName.BYTES); + checker.buildRunAndCheck(); + } + + @Test + public void testSHA512() throws Exception { + ExpressionChecker checker = + new ExpressionChecker() + .addExpr("SHA512('foobar')", DigestUtils.sha512("foobar")) + .addExpr("SHA512('中文')", DigestUtils.sha512("中文")) + .addExprWithNullExpectedValue("SHA512(CAST(NULL AS VARCHAR(0)))", TypeName.BYTES); + checker.buildRunAndCheck(); + } }