ibzib commented on a change in pull request #13817:
URL: https://github.com/apache/beam/pull/13817#discussion_r579344800
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -495,4 +504,81 @@ public Long extractOutput(Long accumulator) {
return accumulator;
}
}
+ /**
+ * Logical_And function implementation
+ *
+ * <p>Returns the logical AND of all non-NULL expressions. Returns NULL if
there are zero input
+ * rows or expression evaluates to NULL for all rows.
+ */
+ public static class LogicalAnd extends CombineFn<Boolean, LogicalAnd.Accum,
Boolean> {
+ static class Accum {
+ /** Initially, input is empty. */
+ boolean isEmpty = true;
+ /** true if any null value is seen in the input, null values are to be
ignored. */
+ boolean isNull = false;
+ /** logical_and operation result. */
+ boolean logicalAnd = true;
+ }
+
+ @Override
+ public Accum createAccumulator() {
+ return new Accum();
+ }
+
+ @Override
+ public Accum addInput(Accum accum, Boolean input) {
+ /** when accum is empty and it sees null, it remains null */
+ if (accum.isEmpty && input == null) {
+ accum.isNull = true;
Review comment:
We should set `isEmpty` to false here as well. While it doesn't make a
difference to the result in this case, it is useful for debugging to
distinguish between emptiness and nullness.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -495,4 +504,81 @@ public Long extractOutput(Long accumulator) {
return accumulator;
}
}
+ /**
+ * Logical_And function implementation
+ *
+ * <p>Returns the logical AND of all non-NULL expressions. Returns NULL if
there are zero input
+ * rows or expression evaluates to NULL for all rows.
+ */
+ public static class LogicalAnd extends CombineFn<Boolean, LogicalAnd.Accum,
Boolean> {
+ static class Accum {
+ /** Initially, input is empty. */
+ boolean isEmpty = true;
+ /** true if any null value is seen in the input, null values are to be
ignored. */
+ boolean isNull = false;
+ /** logical_and operation result. */
+ boolean logicalAnd = true;
+ }
+
+ @Override
+ public Accum createAccumulator() {
+ return new Accum();
+ }
+
+ @Override
+ public Accum addInput(Accum accum, Boolean input) {
+ /** when accum is empty and it sees null, it remains null */
+ if (accum.isEmpty && input == null) {
+ accum.isNull = true;
+ return accum;
+ }
+ /** when accum is null and it sees null, it remains null */
+ if (accum.isNull && input == null) {
+ accum.isNull = true;
+ return accum;
+ }
+ /** when accum is neither null and nor empty, it remains unchanged */
+ if (!accum.isNull && !accum.isEmpty && input == null) {
+ return accum;
+ }
+ /** when accum sees non-null value, accum becomes non-empty, non-null */
+ accum.isEmpty = false;
+ accum.isNull = false;
+ accum.logicalAnd = (accum.logicalAnd && input);
+ return accum;
+ }
+
+ @Override
+ public Accum mergeAccumulators(Iterable<Accum> accums) {
+ LogicalAnd.Accum merged = createAccumulator();
+
+ /** merged accum has isNull=true when all accums have isNull=true */
+ if (StreamSupport.stream(accums.spliterator(), false).allMatch(a ->
a.isNull)) {
Review comment:
Here you iterate over `accums` twice: once to check `isNull`, another to
check everything else. It is better to just do a single iteration, which is
more efficient.
The case for handling `accum.isNull` in your for loop would be very similar
to the logic in `addInput`:
```
if (accum.isNull) {
if (merged.isEmpty) {
merged.isEmpty = false;
merged.isNull = true;
}
continue;
}
```
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -495,4 +504,81 @@ public Long extractOutput(Long accumulator) {
return accumulator;
}
}
+ /**
+ * Logical_And function implementation
+ *
+ * <p>Returns the logical AND of all non-NULL expressions. Returns NULL if
there are zero input
+ * rows or expression evaluates to NULL for all rows.
+ */
+ public static class LogicalAnd extends CombineFn<Boolean, LogicalAnd.Accum,
Boolean> {
+ static class Accum {
+ /** Initially, input is empty. */
+ boolean isEmpty = true;
+ /** true if any null value is seen in the input, null values are to be
ignored. */
+ boolean isNull = false;
+ /** logical_and operation result. */
+ boolean logicalAnd = true;
+ }
+
+ @Override
+ public Accum createAccumulator() {
+ return new Accum();
+ }
+
+ @Override
+ public Accum addInput(Accum accum, Boolean input) {
+ /** when accum is empty and it sees null, it remains null */
+ if (accum.isEmpty && input == null) {
+ accum.isNull = true;
+ return accum;
+ }
+ /** when accum is null and it sees null, it remains null */
+ if (accum.isNull && input == null) {
+ accum.isNull = true;
Review comment:
accum.isNull is already true in this case, so no need to assign it to
null again here.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -495,4 +504,81 @@ public Long extractOutput(Long accumulator) {
return accumulator;
}
}
+ /**
+ * Logical_And function implementation
+ *
+ * <p>Returns the logical AND of all non-NULL expressions. Returns NULL if
there are zero input
+ * rows or expression evaluates to NULL for all rows.
+ */
+ public static class LogicalAnd extends CombineFn<Boolean, LogicalAnd.Accum,
Boolean> {
+ static class Accum {
+ /** Initially, input is empty. */
+ boolean isEmpty = true;
+ /** true if any null value is seen in the input, null values are to be
ignored. */
+ boolean isNull = false;
+ /** logical_and operation result. */
+ boolean logicalAnd = true;
+ }
+
+ @Override
+ public Accum createAccumulator() {
+ return new Accum();
+ }
+
+ @Override
+ public Accum addInput(Accum accum, Boolean input) {
+ /** when accum is empty and it sees null, it remains null */
+ if (accum.isEmpty && input == null) {
+ accum.isNull = true;
+ return accum;
+ }
+ /** when accum is null and it sees null, it remains null */
+ if (accum.isNull && input == null) {
+ accum.isNull = true;
+ return accum;
+ }
+ /** when accum is neither null and nor empty, it remains unchanged */
+ if (!accum.isNull && !accum.isEmpty && input == null) {
Review comment:
If you've already checked `accum.isEmpty && input == null` and
`accum.isNull && input == null`, `!accum.isNull && !accum.isEmpty && input ==
null` can be simplified to `input == null`.
You can handle all three cases with a lot less code:
```
if (input == null) {
if (accum.isEmpty) {
accum.isEmpty = false;
accum.isNull = true;
}
return accum;
}
```
##########
File path:
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
##########
@@ -4114,4 +4114,71 @@ public void testCountIfZetaSQLDialect() {
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
+
+ @Test
+ public void testLogicalAndZetaSQL() {
+ String sql = "SELECT LOGICAL_AND(x) AS logical_and FROM UNNEST([true,
false, true]) AS x";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ Schema singleField = Schema.builder().addBooleanField("field1").build();
+
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(false).build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testLogicalAndZetaSQLWithAllTrueValues() {
+ String sql = "SELECT LOGICAL_AND(x) AS logical_and FROM UNNEST([true,
true, true]) AS x";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ Schema singleField = Schema.builder().addBooleanField("field1").build();
+
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testLogicalAndZetaSQLWithAllFalseValues() {
+ String sql = "SELECT LOGICAL_AND(x) AS logical_and FROM UNNEST([false,
false, false]) AS x";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ Schema singleField = Schema.builder().addBooleanField("field1").build();
+
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(false).build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testLogicalAndWithNullAndNoNullInput() throws Exception {
+ String sql = "SELECT LOGICAL_AND(x) AS logical_and FROM UNNEST([false,
null, true]) AS x";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ Schema schema = Schema.builder().addNullableField("bool_col",
FieldType.BOOLEAN).build();
+
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testLogicalAndWithAllNullInputs() {
+ String sql =
+ "SELECT LOGICAL_AND(x) AS logical_and FROM UNNEST([CAST(null AS bool),
null, null]) AS x";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ Schema schema = Schema.builder().addNullableField("bool_col",
FieldType.BOOLEAN).build();
+ PAssert.that(stream)
+ .containsInAnyOrder(Row.withSchema(schema).addValues((Boolean)
null).build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
}
Review comment:
We should also add a test for empty input. `UNNEST(ARRAY<BOOL>[])`
should work.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]