kennknowles commented on code in PR #38366:
URL: https://github.com/apache/beam/pull/38366#discussion_r3201838547


##########
examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java:
##########
@@ -357,45 +354,90 @@ public static class FailsafeElementToJsonFn
     @Nullable public final String jsonSchema;
     public final String delimiter;
     public final TupleTag<FailsafeElement<String, String>> udfDeadletterTag;
-    @Nullable private final PCollectionView<String> headersView;
     private Counter successCounter =
         Metrics.counter(FailsafeElementToJsonFn.class, 
SUCCESSFUL_TO_JSON_COUNTER);
     private Counter failedCounter =
         Metrics.counter(FailsafeElementToJsonFn.class, FAILED_TO_JSON_COUNTER);
 
     FailsafeElementToJsonFn(
-        PCollectionView<String> headersView,
         String jsonSchema,
         String delimiter,
         TupleTag<FailsafeElement<String, String>> udfDeadletterTag) {
-      this.headersView = headersView;
       this.jsonSchema = jsonSchema;
       this.delimiter = delimiter;
       this.udfDeadletterTag = udfDeadletterTag;
     }
 
     @ProcessElement
-    public void processElement(ProcessContext context) {
-      FailsafeElement<String, String> element = context.element();
+    public void processElement(
+        @Element FailsafeElement<String, String> element,
+        OutputReceiver<FailsafeElement<String, String>> receiver,
+        MultiOutputReceiver multiReceiver) {
+      List<String> header = null;
+      List<String> record = 
Arrays.asList(element.getOriginalPayload().split(this.delimiter));
+
+      try {
+        String json = buildJsonString(header, record, this.jsonSchema);
+        receiver.output(FailsafeElement.of(element.getOriginalPayload(), 
json));
+        successCounter.inc();
+      } catch (Exception e) {
+        failedCounter.inc();
+        multiReceiver
+            .get(this.udfDeadletterTag)
+            .output(
+                FailsafeElement.of(element)
+                    .setErrorMessage(e.getMessage())
+                    .setStacktrace(Throwables.getStackTraceAsString(e)));
+      }
+    }
+  }
+
+  public static class FailsafeElementToJsonWithHeadersFn

Review Comment:
   A DoFn either requires the side input or it does not take the side input. It 
is like a non-optional parameter. So, yea, I see that it is not easy to have an 
optional value on a side input. There might be a simple pattern you can use 
here, since one DoFn can call the other one. The actual DoFn(s) can be a thin 
wrapper on "normal" functions, then you can do it however you want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to