This is an automated email from the ASF dual-hosted git repository. amaliujia pushed a commit to branch revert-12079-master in repository https://gitbox.apache.org/repos/asf/beam.git
commit efe93afbc10084740b5066b93befd8481dc8dad0 Author: Rui Wang <amaliu...@users.noreply.github.com> AuthorDate: Mon Jun 29 13:52:19 2020 -0700 Revert "[BEAM-9890] Support BIT_AND aggregation function in Beam SQL" --- .../impl/transform/BeamBuiltinAggregations.java | 35 ---------------------- .../extensions/sql/BeamSqlDslAggregationTest.java | 32 -------------------- .../sql/zetasql/SqlStdOperatorMappingTable.java | 3 +- .../sql/zetasql/ZetaSQLDialectSpecTest.java | 17 ----------- 4 files changed, 1 insertion(+), 86 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java index ab3786b..347fdc12 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java @@ -58,7 +58,6 @@ public class BeamBuiltinAggregations { .put("$SUM0", BeamBuiltinAggregations::createSum) .put("AVG", BeamBuiltinAggregations::createAvg) .put("BIT_OR", BeamBuiltinAggregations::createBitOr) - .put("BIT_AND", BeamBuiltinAggregations::createBitAnd) .put("VAR_POP", t -> VarianceFn.newPopulation(t.getTypeName())) .put("VAR_SAMP", t -> VarianceFn.newSample(t.getTypeName())) .put("COVAR_POP", t -> CovarianceFn.newPopulation(t.getTypeName())) @@ -186,14 +185,6 @@ public class BeamBuiltinAggregations { String.format("[%s] is not supported in BIT_OR", fieldType)); } - static CombineFn createBitAnd(Schema.FieldType fieldType) { - if (fieldType.getTypeName() == TypeName.INT64) { - return new BitAnd(); - } - throw new UnsupportedOperationException( - String.format("[%s] is not supported in BIT_AND", fieldType)); - } - static class CustMax<T extends Comparable<T>> extends Combine.BinaryCombineFn<T> { @Override public T apply(T left, T right) { @@ -392,30 +383,4 @@ public class BeamBuiltinAggregations { return accum; } } - - static class BitAnd<T extends Number> extends CombineFn<T, Long, Long> { - @Override - public Long createAccumulator() { - return -1L; - } - - @Override - public Long addInput(Long accum, T input) { - return accum & input.longValue(); - } - - @Override - public Long mergeAccumulators(Iterable<Long> accums) { - Long merged = createAccumulator(); - for (long accum : accums) { - merged = merged & accum; - } - return merged; - } - - @Override - public Long extractOutput(Long accum) { - return accum; - } - } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java index 9c365b2..40b3b63 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java @@ -314,39 +314,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollection<Row> inputRows = pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA)); PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql)); - - PAssert.that(result).containsInAnyOrder(rowResult); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testBitAndFunction() throws Exception { - pipeline.enableAbandonedNodeEnforcement(false); - - Schema schemaInTableA = - Schema.builder().addInt64Field("f_long").addInt32Field("f_int2").build(); - - Schema resultType = Schema.builder().addInt64Field("finalAnswer").build(); - - List<Row> rowsInTableA = - TestUtils.RowsBuilder.of(schemaInTableA) - .addRows( - 0xF001L, 0, - 0x00A1L, 0) - .getRows(); - - String sql = "SELECT bit_and(f_long) as bitand " + "FROM PCOLLECTION GROUP BY f_int2"; - - Row rowResult = Row.withSchema(resultType).addValues(1L).build(); - - PCollection<Row> inputRows = - pipeline.apply("longVals", Create.of(rowsInTableA).withRowSchema(schemaInTableA)); - PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql)); - PAssert.that(result).containsInAnyOrder(rowResult); - - pipeline.run().waitUntilFinish(); } private static class CheckerBigDecimalDivide diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java index 1e1d507..0dae1d8 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java @@ -35,7 +35,6 @@ public class SqlStdOperatorMappingTable { FunctionSignatureId.FN_ANY_VALUE, FunctionSignatureId.FN_STRING_AGG_STRING, FunctionSignatureId.FN_BIT_OR_INT64, - FunctionSignatureId.FN_BIT_AND_INT64, FunctionSignatureId.FN_OR, FunctionSignatureId.FN_NOT, FunctionSignatureId.FN_MULTIPLY_DOUBLE, @@ -239,7 +238,7 @@ public class SqlStdOperatorMappingTable { // .put("array_agg", ) // .put("array_concat_agg") .put("string_agg", SqlOperators.STRING_AGG_STRING_FN) // NULL values not supported - .put("bit_and", SqlStdOperatorTable.BIT_AND) + // .put("bit_and") // .put("bit_xor") // .put("logical_and") // .put("logical_or") diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java index 0d787c6..e7f442d 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java @@ -4892,23 +4892,6 @@ public class ZetaSQLDialectSpecTest extends ZetaSQLTestBase { } @Test - public void testZetaSQLBitAnd() { - String sql = "SELECT BIT_AND(row_id) FROM table_all_types GROUP BY bool_col"; - - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); - - final Schema schema = Schema.builder().addInt64Field("field1").build(); - PAssert.that(stream) - .containsInAnyOrder( - Row.withSchema(schema).addValue(1L).build(), - Row.withSchema(schema).addValue(0L).build()); - - pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); - } - - @Test public void testSimpleTableName() { String sql = "SELECT Key FROM KeyValue";