This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c3be9f096f0 Add RowFilter utility (#32366)
c3be9f096f0 is described below

commit c3be9f096f0e28e6bcc5fc9ecf73015f4dd78b57
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Tue Sep 24 07:43:53 2024 -0400

    Add RowFilter utility (#32366)
---
 .../java/org/apache/beam/sdk/util/RowFilter.java   | 423 +++++++++++++++++++++
 .../org/apache/beam/sdk/util/RowFilterTest.java    | 353 +++++++++++++++++
 2 files changed, 776 insertions(+)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java
new file mode 100644
index 00000000000..4e0d9d3ff30
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowFilter.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A utility that filters fields from Beam {@link Row}s. This filter can be 
configured to indicate
+ * what fields you would like to either <strong>keep</strong> or 
<strong>drop</strong>. You may also
+ * specify a singular {@link Row} field to extract with <strong>only</strong>. 
Afterward, call
+ * {@link #filter(Row)} on a Schema-compatible Row to filter it. An 
un-configured filter will simply
+ * return the input row untouched.
+ *
+ * <p>A configured {@link RowFilter} will naturally produce {@link Row}s with 
a new Beam {@link
+ * Schema}. You can access this new Schema via the filter's {@link 
#outputSchema()}.
+ *
+ * <p>Configure a {@link RowFilter} as follows:
+ *
+ * <pre>{@code
+ * // this is an un-configured filter
+ * RowFilter unconfigured = new RowFilter(beamSchema);
+ *
+ * // this filter will exclusively keep these fields and drop everything else
+ * List<String> fields = Arrays.asList("foo", "bar", "baz");
+ * RowFilter keepingFilter = new RowFilter(beamSchema).keeping(fields);
+ *
+ * // this filter will drop these fields
+ * RowFilter droppingFilter = new RowFilter(beamSchema).dropping(fields);
+ *
+ * // this filter will only output the contents of row field "my_record"
+ * String field = "my_record";
+ * RowFilter onlyFilter = new RowFilter(beamSchema).only(field);
+ *
+ * // produces a filtered row
+ * Row outputRow = keepingFilter.filter(row);
+ * }</pre>
+ *
+ * Check the documentation for {@link #keeping(List)}, {@link 
#dropping(List)}, and {@link
+ * #only(String)} for further details on what an output Row can look like.
+ */
+public class RowFilter implements Serializable {
+  private final Schema rowSchema;
+  private @Nullable Schema transformedSchema;
+  // for 'only' case
+  private @Nullable String onlyField;
+
+  public RowFilter(Schema rowSchema) {
+    this.rowSchema = rowSchema;
+  }
+
+  /**
+   * Configures this {@link RowFilter} to filter {@link Row}s by keeping only 
the specified fields.
+   * Nested fields can be specified using dot-notation.
+   *
+   * <p>For example, if we want to keep the list of fields {@code ["foo", 
"baz"]}, for the input
+   * {@link Row}:
+   *
+   * <pre>{@code
+   * foo: 123
+   * bar: 456
+   * baz:
+   *   nested_1: abc
+   *   nested_2: xyz
+   * }</pre>
+   *
+   * we will get the following output {@link Row}:
+   *
+   * <pre>{@code
+   * foo: 123
+   * baz
+   *   nested_1: abc
+   *   nested_2: xyz
+   * }</pre>
+   */
+  public RowFilter keeping(List<String> fields) {
+    checkUnconfigured();
+    verifyNoNestedFields(fields, "keep");
+    validateSchemaContainsFields(rowSchema, fields, "keep");
+    transformedSchema = keepFields(rowSchema, fields);
+    return this;
+  }
+
+  /**
+   * Configures this {@link RowFilter} to filter {@link Row} by removing the 
specified fields.
+   * Nested fields can be specified using dot-notation.
+   *
+   * <p>For example, if we want to drop the list of fields {@code ["foo", 
"baz"]}, for this input
+   * {@link Row}:
+   *
+   * <pre>{@code
+   * foo: 123
+   * bar: 456
+   * baz:
+   *   nested_1: abc
+   *   nested_2: xyz
+   * }</pre>
+   *
+   * we will get the following output {@link Row}:
+   *
+   * <pre>{@code
+   * bar: 456
+   * }</pre>
+   */
+  public RowFilter dropping(List<String> fields) {
+    checkUnconfigured();
+    verifyNoNestedFields(fields, "drop");
+    validateSchemaContainsFields(rowSchema, fields, "drop");
+    transformedSchema = dropFields(rowSchema, fields);
+    return this;
+  }
+
+  /**
+   * Configures this {@link RowFilter} to only output the contents of a single 
row field.
+   *
+   * <p>For example, if we want to only extract the contents of field "foo" 
for this input {@link
+   * Row}:
+   *
+   * <pre>{@code
+   * abc: 123
+   * bar: my_str
+   * foo:
+   *   xyz:
+   *     baz: 456
+   *     qwe: 789
+   * }</pre>
+   *
+   * we will get the following output {@link Row}:
+   *
+   * <pre>{@code
+   * xyz:
+   *   baz: 456
+   *   qwe: 789
+   * }</pre>
+   *
+   * <p>Note that this will fail if the field is not of type {@link Row}, e.g. 
if {@code "abc"} is
+   * specified for the example above.
+   */
+  public RowFilter only(String field) {
+    checkUnconfigured();
+    validateSchemaContainsFields(rowSchema, Collections.singletonList(field), 
"only");
+    Schema.Field rowField = rowSchema.getField(field);
+    Preconditions.checkArgument(
+        rowField.getType().getTypeName().equals(Schema.TypeName.ROW),
+        "Expected type '%s' for field '%s', but instead got type '%s'.",
+        Schema.TypeName.ROW,
+        rowField.getName(),
+        rowField.getType().getTypeName());
+
+    transformedSchema = rowField.getType().getRowSchema();
+    onlyField = field;
+    return this;
+  }
+
+  /**
+   * Performs a filter operation (keep or drop) on the input {@link Row}. Must 
have already
+   * configured a filter operation with {@link #dropping(List)} or {@link 
#keeping(List)} for this
+   * {@link RowFilter}.
+   *
+   * <p>If not yet configured, will simply return the same {@link Row}.
+   */
+  public Row filter(Row row) {
+    if (transformedSchema == null) {
+      return row;
+    }
+
+    Preconditions.checkState(
+        row.getSchema().assignableTo(rowSchema),
+        "Encountered Row with schema that is incompatible with this 
RowFilter's schema."
+            + "\nRow schema: %s"
+            + "\nSchema used to initialize this RowFilter: %s",
+        row.getSchema(),
+        rowSchema);
+
+    // 'only' case
+    if (onlyField != null) {
+      return checkStateNotNull(row.getRow(onlyField));
+    }
+
+    // 'keep' and 'drop'
+    return Preconditions.checkNotNull(copyWithNewSchema(row, outputSchema()));
+  }
+
+  /** Returns the output {@link Row}'s {@link Schema}. */
+  public Schema outputSchema() {
+    return transformedSchema != null ? transformedSchema : rowSchema;
+  }
+
+  private void checkUnconfigured() {
+    Preconditions.checkState(
+        transformedSchema == null,
+        "This RowFilter has already been configured to filter to the following 
Schema: %s",
+        transformedSchema);
+  }
+
+  /** Verifies that this selection contains no nested fields. */
+  private void verifyNoNestedFields(List<String> fields, String operation) {
+    List<String> nestedFields = new ArrayList<>();
+    for (String field : fields) {
+      if (field.contains(".")) {
+        nestedFields.add(field);
+      }
+    }
+    if (!nestedFields.isEmpty()) {
+      throw new IllegalArgumentException(
+          String.format(
+              "RowFilter does not support specifying nested fields to %s: %s",
+              operation, nestedFields));
+    }
+  }
+
+  /**
+   * Checks whether a {@link Schema} contains a list of field names. Nested 
fields can be expressed
+   * with dot-notation. Throws a helpful error in the case where a field 
doesn't exist, or if a
+   * nested field could not be reached.
+   */
+  @VisibleForTesting
+  static void validateSchemaContainsFields(
+      Schema schema, List<String> specifiedFields, String operation) {
+    Set<String> notFound = new HashSet<>();
+    Set<String> notRowField = new HashSet<>();
+
+    for (String field : specifiedFields) {
+      List<String> levels = Splitter.on(".").splitToList(field);
+
+      Schema currentSchema = schema;
+
+      for (int i = 0; i < levels.size(); i++) {
+        String currentFieldName = String.join(".", levels.subList(0, i + 1));
+
+        if (!currentSchema.hasField(levels.get(i))) {
+          notFound.add(currentFieldName);
+          break;
+        }
+
+        if (i + 1 < levels.size()) {
+          Schema.Field nextField = currentSchema.getField(levels.get(i));
+          if (!nextField.getType().getTypeName().equals(Schema.TypeName.ROW)) {
+            notRowField.add(currentFieldName);
+            break;
+          }
+          currentSchema = 
Preconditions.checkNotNull(nextField.getType().getRowSchema());
+        }
+      }
+    }
+
+    if (!notFound.isEmpty() || !notRowField.isEmpty()) {
+      String message = "Validation failed for '" + operation + "'.";
+      if (!notFound.isEmpty()) {
+        message += "\nRow Schema does not contain the following specified 
fields: " + notFound;
+      }
+      if (!notRowField.isEmpty()) {
+        message +=
+            "\nThe following specified fields are not of type Row. Their 
nested fields could not be reached: "
+                + notRowField;
+      }
+      throw new IllegalArgumentException(message);
+    }
+  }
+
+  /**
+   * Creates a field tree, separating each top-level field from its 
(potential) nested fields. E.g.
+   * ["foo.bar.baz", "foo.abc", "xyz"] --> {"foo": ["bar.baz", "abc"], "xyz": 
[]}
+   */
+  @VisibleForTesting
+  static Map<String, List<String>> getFieldTree(List<String> fields) {
+    Map<String, List<String>> fieldTree = Maps.newHashMap();
+
+    for (String field : fields) {
+      List<String> components = Splitter.on(".").splitToList(field);
+      String root = components.get(0);
+      fieldTree.computeIfAbsent(root, r -> new ArrayList<>());
+
+      if (components.size() > 1) {
+        String nestedFields = String.join(".", components.subList(1, 
components.size()));
+        Preconditions.checkNotNull(fieldTree.get(root)).add(nestedFields);
+      }
+    }
+    return fieldTree;
+  }
+
+  /**
+   * Returns a new {@link Row} containing only the fields that intersect with 
the new {@link Schema}
+   * Relies on a previous step to have validated the compatibility of the new 
{@link Schema}.
+   */
+  @VisibleForTesting
+  @Nullable
+  static Row copyWithNewSchema(@Nullable Row row, Schema newSchema) {
+    if (row == null) {
+      return null;
+    }
+    Map<String, Object> values = new HashMap<>(newSchema.getFieldCount());
+
+    for (Schema.Field field : newSchema.getFields()) {
+      String name = field.getName();
+      Object value = row.getValue(name);
+      if (field.getType().getTypeName().equals(Schema.TypeName.ROW)) {
+        Schema nestedRowSchema = 
Preconditions.checkNotNull(field.getType().getRowSchema());
+        value = copyWithNewSchema(row.getRow(name), nestedRowSchema);
+      }
+      if (value != null) {
+        values.put(name, value);
+      }
+    }
+    return Row.withSchema(newSchema).withFieldValues(values).build();
+  }
+
+  /**
+   * Returns a new {@link Schema} with the specified fields removed.
+   *
+   * <p>No guarantee that field ordering will remain the same.
+   */
+  @VisibleForTesting
+  static Schema dropFields(Schema schema, List<String> fieldsToDrop) {
+    if (fieldsToDrop.isEmpty()) {
+      return schema;
+    }
+    List<Schema.Field> newFieldsList = new ArrayList<>(schema.getFields());
+    Map<String, List<String>> fieldTree = getFieldTree(fieldsToDrop);
+
+    for (Map.Entry<String, List<String>> fieldAndDescendents : 
fieldTree.entrySet()) {
+      String root = fieldAndDescendents.getKey();
+      List<String> nestedFields = fieldAndDescendents.getValue();
+      Schema.Field fieldToRemove = schema.getField(root);
+      Schema.FieldType typeToRemove = fieldToRemove.getType();
+
+      // Base case: we're at the specified field to remove.
+      if (nestedFields.isEmpty()) {
+        newFieldsList.remove(fieldToRemove);
+      } else {
+        // Otherwise, we're asked to remove a nested field. Verify current 
field is ROW type
+        Preconditions.checkArgument(
+            typeToRemove.getTypeName().equals(Schema.TypeName.ROW),
+            "Expected type %s for specified nested field '%s', but instead got 
type %s.",
+            Schema.TypeName.ROW,
+            root,
+            typeToRemove.getTypeName());
+
+        Schema nestedSchema = 
Preconditions.checkNotNull(typeToRemove.getRowSchema());
+        Schema newNestedSchema = dropFields(nestedSchema, nestedFields);
+        Schema.Field modifiedField =
+            Schema.Field.of(root, Schema.FieldType.row(newNestedSchema))
+                .withNullable(typeToRemove.getNullable());
+
+        // Replace with modified field
+        newFieldsList.set(newFieldsList.indexOf(fieldToRemove), modifiedField);
+      }
+    }
+    return new Schema(newFieldsList);
+  }
+
+  /**
+   * Returns a new {@link Schema} with only the specified fields kept.
+   *
+   * <p>No guarantee that field ordering will remain the same.
+   */
+  @VisibleForTesting
+  static Schema keepFields(Schema schema, List<String> fieldsToKeep) {
+    if (fieldsToKeep.isEmpty()) {
+      return schema;
+    }
+    List<Schema.Field> newFieldsList = new ArrayList<>(fieldsToKeep.size());
+    Map<String, List<String>> fieldTree = getFieldTree(fieldsToKeep);
+
+    for (Map.Entry<String, List<String>> fieldAndDescendents : 
fieldTree.entrySet()) {
+      String root = fieldAndDescendents.getKey();
+      List<String> nestedFields = fieldAndDescendents.getValue();
+      Schema.Field fieldToKeep = schema.getField(root);
+      Schema.FieldType typeToKeep = fieldToKeep.getType();
+
+      // Base case: we're at the specified field to keep, and we can skip this 
conditional.
+      // Otherwise: we're asked to keep a nested field, so we dig deeper to 
determine which nested
+      // fields to keep
+      if (!nestedFields.isEmpty()) {
+        Preconditions.checkArgument(
+            typeToKeep.getTypeName().equals(Schema.TypeName.ROW),
+            "Expected type %s for specified nested field '%s', but instead got 
type %s.",
+            Schema.TypeName.ROW,
+            root,
+            typeToKeep.getTypeName());
+
+        Schema nestedSchema = 
Preconditions.checkNotNull(typeToKeep.getRowSchema());
+        Schema newNestedSchema = keepFields(nestedSchema, nestedFields);
+        fieldToKeep =
+            Schema.Field.of(root, Schema.FieldType.row(newNestedSchema))
+                .withNullable(typeToKeep.getNullable());
+      }
+      newFieldsList.add(fieldToKeep);
+    }
+
+    return new Schema(newFieldsList);
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java
new file mode 100644
index 00000000000..22c17f6d07c
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/** Tests for {@link RowFilter}. */
+public class RowFilterTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
+      Schema.builder()
+          .addStringField("doubly_nested_str")
+          .addInt32Field("doubly_nested_int")
+          .build();
+
+  private static final Schema NESTED_ROW_SCHEMA =
+      Schema.builder()
+          .addStringField("nested_str")
+          .addInt32Field("nested_int")
+          .addFloatField("nested_float")
+          .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
+          .build();
+  private static final Schema ROW_SCHEMA =
+      Schema.builder()
+          .addStringField("str")
+          .addBooleanField("bool")
+          .addNullableInt32Field("nullable_int")
+          .addArrayField("arr_int", Schema.FieldType.INT32)
+          .addRowField("row", NESTED_ROW_SCHEMA)
+          .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
+          .build();
+
+  @Test
+  public void testSchemaValidation() {
+    List<List<String>> goodFields =
+        Arrays.asList(
+            Arrays.asList("str", "bool", "nullable_row"),
+            Arrays.asList("nullable_int", "arr_int"),
+            Arrays.asList("row.nested_str", 
"row.nested_row.doubly_nested_str"),
+            Arrays.asList("nullable_row.nested_row.doubly_nested_int"));
+
+    for (List<String> fields : goodFields) {
+      RowFilter.validateSchemaContainsFields(ROW_SCHEMA, fields, 
"test-operation");
+    }
+  }
+
+  @Test
+  public void testSchemaValidationFailsWithHelpfulErrorForMissingFields() {
+    List<KV<List<String>, List<String>>> nonExistentFields =
+        Arrays.asList(
+            KV.of(
+                Arrays.asList("nonexistent_1", "nonexistent_2", 
"nonexistent_3"),
+                Arrays.asList("nonexistent_1", "nonexistent_2", 
"nonexistent_3")),
+            KV.of(
+                Arrays.asList("nullable_int", "arr_int", "nonexistent"),
+                Collections.singletonList("nonexistent")),
+            KV.of(
+                Arrays.asList(
+                    "nullable_row.nested_row.nonexistent", "row.nonexistent", 
"row.nested_float"),
+                Arrays.asList("nullable_row.nested_row.nonexistent", 
"row.nonexistent")));
+
+    for (KV<List<String>, List<String>> fields : nonExistentFields) {
+      List<String> allFields = fields.getKey();
+      List<String> badFields = fields.getValue();
+
+      IllegalArgumentException e =
+          assertThrows(
+              IllegalArgumentException.class,
+              () ->
+                  RowFilter.validateSchemaContainsFields(ROW_SCHEMA, 
allFields, "test-operation"));
+
+      assertThat(e.getMessage(), containsString("Validation failed for 
'test-operation'"));
+      assertThat(
+          e.getMessage(),
+          containsString("Row Schema does not contain the following specified 
fields"));
+      for (String badField : badFields) {
+        assertThat(e.getMessage(), containsString(badField));
+      }
+    }
+  }
+
+  @Test
+  public void 
testSchemaValidationFailsWithHelpfulErrorForInvalidNestedFields() {
+    List<KV<List<String>, List<String>>> nonNestedFields =
+        Arrays.asList(
+            KV.of(
+                Arrays.asList(
+                    "row.nested_row", "row.nested_int", 
"row.nested_str.unexpected_nested"),
+                Collections.singletonList("row.nested_str")),
+            KV.of(
+                Arrays.asList(
+                    "nullable_row.nested_str",
+                    "nullable_row.nested_str.unexpected",
+                    "row.nested_int.unexpected_2"),
+                Arrays.asList("nullable_row.nested_str", "row.nested_int")));
+
+    for (KV<List<String>, List<String>> fields : nonNestedFields) {
+      List<String> allFields = fields.getKey();
+      List<String> badFields = fields.getValue();
+
+      IllegalArgumentException e =
+          assertThrows(
+              IllegalArgumentException.class,
+              () ->
+                  RowFilter.validateSchemaContainsFields(ROW_SCHEMA, 
allFields, "test-operation"));
+
+      assertThat(e.getMessage(), containsString("Validation failed for 
'test-operation'"));
+      assertThat(
+          e.getMessage(),
+          containsString(
+              "The following specified fields are not of type Row. Their 
nested fields could not be reached"));
+      for (String badField : badFields) {
+        assertThat(e.getMessage(), containsString(badField));
+      }
+    }
+  }
+
+  @Test
+  public void testGetFieldTree() {
+    List<String> fields =
+        Arrays.asList(
+            "top-level",
+            "top-level-2",
+            "top-level.nested-level",
+            "top-level.nested-level-2",
+            "top-level.nested-level.doubly-nested-level",
+            "top-level.nested-level.doubly-nested-level-2");
+    List<String> nestedLayer =
+        Arrays.asList(
+            "nested-level",
+            "nested-level-2",
+            "nested-level.doubly-nested-level",
+            "nested-level.doubly-nested-level-2");
+
+    Map<String, List<String>> expectedTree =
+        ImmutableMap.<String, List<String>>builder()
+            .put("top-level-2", Collections.emptyList())
+            .put("top-level", nestedLayer)
+            .build();
+
+    assertEquals(expectedTree, RowFilter.getFieldTree(fields));
+
+    List<String> doublyNestedLayer = Arrays.asList("doubly-nested-level", 
"doubly-nested-level-2");
+
+    Map<String, List<String>> expectedNestedTree =
+        ImmutableMap.<String, List<String>>builder()
+            .put("nested-level-2", Collections.emptyList())
+            .put("nested-level", doublyNestedLayer)
+            .build();
+
+    assertEquals(expectedNestedTree, RowFilter.getFieldTree(nestedLayer));
+  }
+
+  @Test
+  public void testDropSchemaFields() {
+    List<String> fieldsToDrop =
+        Arrays.asList(
+            "str",
+            "arr_int",
+            "nullable_int",
+            "row.nested_int",
+            "row.nested_float",
+            "row.nested_row.doubly_nested_int",
+            "nullable_row.nested_str",
+            "nullable_row.nested_row");
+
+    Schema expectedDroppedSchema =
+        Schema.builder()
+            .addBooleanField("bool")
+            .addRowField(
+                "row",
+                Schema.builder()
+                    .addStringField("nested_str")
+                    .addRowField(
+                        "nested_row", 
Schema.builder().addStringField("doubly_nested_str").build())
+                    .build())
+            .addNullableRowField(
+                "nullable_row",
+                
Schema.builder().addInt32Field("nested_int").addFloatField("nested_float").build())
+            .build();
+
+    
assertTrue(expectedDroppedSchema.equivalent(RowFilter.dropFields(ROW_SCHEMA, 
fieldsToDrop)));
+  }
+
+  @Test
+  public void testKeepSchemaFields() {
+    List<String> fieldsToKeep =
+        Arrays.asList(
+            "str",
+            "arr_int",
+            "nullable_int",
+            "row.nested_int",
+            "row.nested_float",
+            "row.nested_row.doubly_nested_int",
+            "nullable_row.nested_str",
+            "nullable_row.nested_row");
+
+    Schema expectedKeptSchema =
+        Schema.builder()
+            .addStringField("str")
+            .addArrayField("arr_int", Schema.FieldType.INT32)
+            .addNullableInt32Field("nullable_int")
+            .addRowField(
+                "row",
+                Schema.builder()
+                    .addInt32Field("nested_int")
+                    .addFloatField("nested_float")
+                    .addRowField(
+                        "nested_row", 
Schema.builder().addInt32Field("doubly_nested_int").build())
+                    .build())
+            .addNullableRowField(
+                "nullable_row",
+                Schema.builder()
+                    .addStringField("nested_str")
+                    .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
+                    .build())
+            .build();
+
+    assertTrue(expectedKeptSchema.equivalent(RowFilter.keepFields(ROW_SCHEMA, 
fieldsToKeep)));
+  }
+
+  @Test
+  public void testDropNestedFieldsFails() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("RowFilter does not support specifying nested fields 
to drop");
+
+    new RowFilter(ROW_SCHEMA)
+        .dropping(
+            Arrays.asList(
+                "bool",
+                "nullable_int",
+                "row.nested_int",
+                "row.nested_float",
+                "row.nested_row.doubly_nested_int",
+                "nullable_row"));
+  }
+
+  @Test
+  public void testKeepNestedFieldsFails() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("RowFilter does not support specifying nested fields 
to keep");
+
+    new RowFilter(ROW_SCHEMA)
+        .keeping(
+            Arrays.asList("str", "arr_int", "row.nested_str", 
"row.nested_row.doubly_nested_str"));
+  }
+
+  @Test
+  public void testOnlyFailsWhenSpecifyingNonRowField() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Expected type 'ROW' for field 'nullable_int', but instead got type 
'INT32'");
+
+    new RowFilter(ROW_SCHEMA).only("nullable_int");
+  }
+
+  private static final Row ORIGINAL_ROW =
+      Row.withSchema(ROW_SCHEMA)
+          .addValue("str_value")
+          .addValue(true)
+          .addValue(123)
+          .addValue(Arrays.asList(1, 2, 3, 4, 5))
+          .addValue(
+              Row.withSchema(NESTED_ROW_SCHEMA)
+                  .addValue("nested_str_value")
+                  .addValue(456)
+                  .addValue(1.234f)
+                  .addValue(
+                      Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA)
+                          .addValue("doubly_nested_str_value")
+                          .addValue(789)
+                          .build())
+                  .build())
+          .addValue(null)
+          .build();
+
+  private static final Schema FILTERED_DOUBLY_NESTED_SCHEMA =
+      Schema.builder().addStringField("doubly_nested_str").build();
+  private static final Schema FILTERED_NESTED_SCHEMA =
+      Schema.builder()
+          .addStringField("nested_str")
+          .addRowField("nested_row", FILTERED_DOUBLY_NESTED_SCHEMA)
+          .build();
+  private static final Schema FILTERED_SCHEMA =
+      Schema.builder()
+          .addStringField("str")
+          .addArrayField("arr_int", Schema.FieldType.INT32)
+          .addRowField("row", FILTERED_NESTED_SCHEMA)
+          .build();
+
+  private static final Row FILTERED_ROW =
+      Row.withSchema(FILTERED_SCHEMA)
+          .addValue("str_value")
+          .addValue(Arrays.asList(1, 2, 3, 4, 5))
+          .addValue(
+              Row.withSchema(FILTERED_NESTED_SCHEMA)
+                  .addValue("nested_str_value")
+                  .addValue(
+                      Row.withSchema(FILTERED_DOUBLY_NESTED_SCHEMA)
+                          .addValue("doubly_nested_str_value")
+                          .build())
+                  .build())
+          .build();
+
+  @Test
+  public void testCopyRowWithNewSchema() {
+    assertEquals(FILTERED_ROW, RowFilter.copyWithNewSchema(ORIGINAL_ROW, 
FILTERED_SCHEMA));
+  }
+
+  @Test
+  public void testOnlyRowField() {
+    RowFilter rowFilter = new RowFilter(ROW_SCHEMA).only("row");
+
+    Row expecedRow =
+        Row.withSchema(rowFilter.outputSchema())
+            .addValues(ORIGINAL_ROW.getRow("row").getValues())
+            .build();
+
+    assertEquals(expecedRow, rowFilter.filter(ORIGINAL_ROW));
+  }
+}

Reply via email to