tysonjh commented on a change in pull request #11566:
URL: https://github.com/apache/beam/pull/11566#discussion_r426831619



##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better 
utilize the Cloud DLP
+ * service.
+ */
+@Experimental
+class BatchRequestForDLP extends DoFn<KV<String, Table.Row>, KV<String, 
Iterable<Table.Row>>> {
+  public static final Logger LOG = 
LoggerFactory.getLogger(BatchRequestForDLP.class);
+
+  private final Counter numberOfRowsBagged =
+      Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged");
+  private final Integer batchSize;
+
+  @StateId("elementsBag")
+  private final StateSpec<BagState<KV<String, Table.Row>>> elementsBag = 
StateSpecs.bag();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  public BatchRequestForDLP(Integer batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<String, Table.Row> element,
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      @TimerId("eventTimer") Timer eventTimer,
+      BoundedWindow w) {
+    elementsBag.add(element);
+    eventTimer.set(w.maxTimestamp());
+  }
+
+  @OnTimer("eventTimer")
+  public void onTimer(
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      OutputReceiver<KV<String, Iterable<Table.Row>>> output) {
+    String key = elementsBag.read().iterator().next().getKey();

Review comment:
       Is there a guarantee that at least one element will be in the 
elementsBag iterator or is there a chance for a NoSuchElementsException on the 
`next()` call?

##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text 
according to provided
+ * settings. The transform supports both CSV formatted input data and 
unstructured input.
+ *
+ * <p>If the csvHeader property is set, csvDelimiter also should be, else the 
results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link 
InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig 
({@link DeidentifyConfig}.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.

Review comment:
       Would you move the comments about how a method works, or what it does, 
to the method definition please?

##########
File path: 
sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.privacy.dlp.v2.CharacterMaskConfig;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.Finding;
+import com.google.privacy.dlp.v2.InfoType;
+import com.google.privacy.dlp.v2.InfoTypeTransformations;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.InspectContentResponse;
+import com.google.privacy.dlp.v2.Likelihood;
+import com.google.privacy.dlp.v2.PrimitiveTransformation;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPTextOperationsIT {
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  private static final String IDENTIFYING_TEXT = "mary....@example.com";
+  private static InfoType emailAddress = 
InfoType.newBuilder().setName("EMAIL_ADDRESS").build();
+  private static final InspectConfig inspectConfig =
+      InspectConfig.newBuilder()
+          .addInfoTypes(emailAddress)
+          .setMinLikelihood(Likelihood.LIKELY)
+          .build();
+
+  @Test
+  public void inspectsText() {
+    String projectId = 
testPipeline.getOptions().as(GcpOptions.class).getProject();
+    PCollection<KV<String, InspectContentResponse>> inspectionResult =
+        testPipeline
+            .apply(Create.of(KV.of("", IDENTIFYING_TEXT)))
+            .apply(
+                DLPInspectText.newBuilder()
+                    .setBatchSize(52400)

Review comment:
       Can you use DLP_PAYLOAD_LIMIT instead?

##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better 
utilize the Cloud DLP
+ * service.
+ */
+@Experimental
+class BatchRequestForDLP extends DoFn<KV<String, Table.Row>, KV<String, 
Iterable<Table.Row>>> {
+  public static final Logger LOG = 
LoggerFactory.getLogger(BatchRequestForDLP.class);
+
+  private final Counter numberOfRowsBagged =
+      Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged");
+  private final Integer batchSize;
+
+  @StateId("elementsBag")
+  private final StateSpec<BagState<KV<String, Table.Row>>> elementsBag = 
StateSpecs.bag();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  public BatchRequestForDLP(Integer batchSize) {

Review comment:
       Please add a comment. It would also be beneficial to name this variable 
with a unit like `batchSizeBytes`.

##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text 
according to provided
+ * settings. The transform supports both CSV formatted input data and 
unstructured input.
+ *
+ * <p>If the csvHeader property is set, csvDelimiter also should be, else the 
results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link 
InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig 
({@link DeidentifyConfig}.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>The transform outputs {@link KV} of {@link String} (eg. filename) and 
{@link
+ * DeidentifyContentResponse}, which will contain {@link Table} of results for 
the user to consume.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+    extends PTransform<
+        PCollection<KV<String, String>>, PCollection<KV<String, 
DeidentifyContentResponse>>> {
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract String deidentifyTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  @Nullable
+  public abstract DeidentifyConfig deidentifyConfig();
+
+  @Nullable
+  public abstract PCollectionView<List<String>> csvHeader();
+
+  @Nullable
+  public abstract String csvDelimiter();
+
+  public abstract Integer batchSize();
+
+  public abstract String projectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+    public abstract Builder setCsvHeader(PCollectionView<List<String>> 
csvHeader);
+
+    public abstract Builder setCsvDelimiter(String delimiter);
+
+    public abstract Builder setBatchSize(Integer batchSize);
+
+    public abstract Builder setProjectId(String projectId);
+
+    public abstract Builder setDeidentifyTemplateName(String 
deidentifyTemplateName);
+
+    public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+    public abstract Builder setDeidentifyConfig(DeidentifyConfig 
deidentifyConfig);
+
+    public abstract DLPDeidentifyText build();
+  }
+
+  public static DLPDeidentifyText.Builder newBuilder() {
+    return new AutoValue_DLPDeidentifyText.Builder();
+  }
+
+  /**
+   * The transform batches the contents of input PCollection and then calls 
Cloud DLP service to
+   * perform the deidentification.
+   *
+   * @param input input PCollection
+   * @return PCollection after transformations
+   */
+  @Override
+  public PCollection<KV<String, DeidentifyContentResponse>> expand(
+      PCollection<KV<String, String>> input) {
+    return input
+        .apply(ParDo.of(new MapStringToDlpRow(csvDelimiter())))
+        .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize())))
+        .apply(
+            "DLPDeidentify",
+            ParDo.of(
+                new DeidentifyText(
+                    projectId(),
+                    inspectTemplateName(),
+                    deidentifyTemplateName(),
+                    inspectConfig(),
+                    deidentifyConfig(),
+                    csvHeader())));
+  }
+
+  static class DeidentifyText
+      extends DoFn<KV<String, Iterable<Table.Row>>, KV<String, 
DeidentifyContentResponse>> {
+    private final String projectId;
+    private final String inspectTemplateName;
+    private final String deidentifyTemplateName;
+    private final InspectConfig inspectConfig;
+    private final DeidentifyConfig deidentifyConfig;
+    private final PCollectionView<List<String>> csvHeaders;
+    private transient DeidentifyContentRequest.Builder requestBuilder;
+
+    @Setup
+    public void setup() throws IOException {
+      requestBuilder =
+          
DeidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString());
+      if (inspectTemplateName != null) {
+        requestBuilder.setInspectTemplateName(inspectTemplateName);
+      }
+      if (inspectConfig != null) {
+        requestBuilder.setInspectConfig(inspectConfig);
+      }
+      if (inspectConfig == null && inspectTemplateName == null) {
+        throw new IllegalArgumentException(
+            "Either inspectConfig or inspectTemplateName need to be set!");
+      }

Review comment:
       See my earlier comment about moving configuration exceptions like this 
to pipeline construction time. Doing so will give earlier feedback to the user 
and avoid having to spin up workers and processing part of the pipeline before 
running into a configuration issue.

##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better 
utilize the Cloud DLP
+ * service.
+ */
+@Experimental
+class BatchRequestForDLP extends DoFn<KV<String, Table.Row>, KV<String, 
Iterable<Table.Row>>> {
+  public static final Logger LOG = 
LoggerFactory.getLogger(BatchRequestForDLP.class);
+
+  private final Counter numberOfRowsBagged =
+      Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged");
+  private final Integer batchSize;
+
+  @StateId("elementsBag")
+  private final StateSpec<BagState<KV<String, Table.Row>>> elementsBag = 
StateSpecs.bag();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  public BatchRequestForDLP(Integer batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<String, Table.Row> element,
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      @TimerId("eventTimer") Timer eventTimer,
+      BoundedWindow w) {
+    elementsBag.add(element);
+    eventTimer.set(w.maxTimestamp());
+  }
+
+  @OnTimer("eventTimer")
+  public void onTimer(
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      OutputReceiver<KV<String, Iterable<Table.Row>>> output) {
+    String key = elementsBag.read().iterator().next().getKey();
+    AtomicInteger bufferSize = new AtomicInteger();
+    List<Table.Row> rows = new ArrayList<>();
+    elementsBag
+        .read()
+        .forEach(
+            element -> {
+              int elementSize = element.getValue().getSerializedSize();
+              boolean clearBuffer = bufferSize.intValue() + elementSize > 
batchSize;
+              if (clearBuffer) {
+                numberOfRowsBagged.inc(rows.size());
+                LOG.debug("Clear Buffer {} , Key {}", bufferSize.intValue(), 
element.getKey());
+                output.output(KV.of(element.getKey(), rows));
+                rows.clear();
+                bufferSize.set(0);
+              }
+              rows.add(element.getValue());
+              bufferSize.getAndAdd(element.getValue().getSerializedSize());
+            });
+    if (!rows.isEmpty()) {
+      LOG.debug("Remaining rows {}", rows.size());

Review comment:
       Nit: This log message would be more clear if it said the action being 
taken like.. 'Outputting remaining {} rows'.

##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text 
according to provided
+ * settings. The transform supports both CSV formatted input data and 
unstructured input.
+ *
+ * <p>If the csvHeader property is set, csvDelimiter also should be, else the 
results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.

Review comment:
       An incorrect configuration should result in an exception at pipeline 
construction time if possible. The sooner an exception can be raised to the 
user the better (e.g. pipeline construction vs. pipeline runtime).
   
   A couple options are,
     1. Add precondition checks into the Builder for the AutoValue. You could 
do this by implementing your own `public DLPDeidentifyText build()` method the 
checks preconditions and then calls a generated `autoBuild` method. 
     2. Add precondition checks into the constructor of the DoFn.

##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better 
utilize the Cloud DLP
+ * service.
+ */
+@Experimental
+class BatchRequestForDLP extends DoFn<KV<String, Table.Row>, KV<String, 
Iterable<Table.Row>>> {
+  public static final Logger LOG = 
LoggerFactory.getLogger(BatchRequestForDLP.class);
+
+  private final Counter numberOfRowsBagged =
+      Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged");
+  private final Integer batchSize;
+
+  @StateId("elementsBag")
+  private final StateSpec<BagState<KV<String, Table.Row>>> elementsBag = 
StateSpecs.bag();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  public BatchRequestForDLP(Integer batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<String, Table.Row> element,
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      @TimerId("eventTimer") Timer eventTimer,
+      BoundedWindow w) {
+    elementsBag.add(element);
+    eventTimer.set(w.maxTimestamp());
+  }
+
+  @OnTimer("eventTimer")
+  public void onTimer(
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      OutputReceiver<KV<String, Iterable<Table.Row>>> output) {
+    String key = elementsBag.read().iterator().next().getKey();
+    AtomicInteger bufferSize = new AtomicInteger();
+    List<Table.Row> rows = new ArrayList<>();
+    elementsBag
+        .read()
+        .forEach(
+            element -> {
+              int elementSize = element.getValue().getSerializedSize();
+              boolean clearBuffer = bufferSize.intValue() + elementSize > 
batchSize;
+              if (clearBuffer) {
+                numberOfRowsBagged.inc(rows.size());
+                LOG.debug("Clear Buffer {} , Key {}", bufferSize.intValue(), 
element.getKey());

Review comment:
       Nit: Can you move this to the first line of the if block to avoid 
splitting the code like this? Also it would be nice to have the unit in the log 
message for the bufferSize (e.g bytes).

##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text 
according to provided
+ * settings. The transform supports both CSV formatted input data and 
unstructured input.
+ *
+ * <p>If the csvHeader property is set, csvDelimiter also should be, else the 
results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link 
InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig 
({@link DeidentifyConfig}.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>The transform outputs {@link KV} of {@link String} (eg. filename) and 
{@link
+ * DeidentifyContentResponse}, which will contain {@link Table} of results for 
the user to consume.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+    extends PTransform<
+        PCollection<KV<String, String>>, PCollection<KV<String, 
DeidentifyContentResponse>>> {
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract String deidentifyTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  @Nullable
+  public abstract DeidentifyConfig deidentifyConfig();
+
+  @Nullable
+  public abstract PCollectionView<List<String>> csvHeader();
+
+  @Nullable
+  public abstract String csvDelimiter();

Review comment:
       Since the delimiter is configurable, what about dropping the 'csv'? You 
could add details in the comments as to where the delimiter applies and the 
default value.

##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.ReidentifyContentRequest;
+import com.google.privacy.dlp.v2.ReidentifyContentResponse;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and inspecting text for 
identifying data according
+ * to provided settings.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link 
InspectConfig} need to be set, the
+ * same goes for reidentifyTemplateName or reidentifyConfig.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPReidentifyText
+    extends PTransform<
+        PCollection<KV<String, String>>, PCollection<KV<String, 
ReidentifyContentResponse>>> {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(DLPInspectText.class);
+
+  public static final Integer DLP_PAYLOAD_LIMIT = 52400;

Review comment:
       Having the units in the name is helpful when the type is 
non-descriptive. Where did this number come from?

##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text 
according to provided
+ * settings. The transform supports both CSV formatted input data and 
unstructured input.
+ *
+ * <p>If the csvHeader property is set, csvDelimiter also should be, else the 
results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link 
InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig 
({@link DeidentifyConfig}.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>The transform outputs {@link KV} of {@link String} (eg. filename) and 
{@link
+ * DeidentifyContentResponse}, which will contain {@link Table} of results for 
the user to consume.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+    extends PTransform<
+        PCollection<KV<String, String>>, PCollection<KV<String, 
DeidentifyContentResponse>>> {
+
+  @Nullable
+  public abstract String inspectTemplateName();

Review comment:
       It would be helpful if these abstract methods, and possibly those in the 
builder, had comments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to