kennknowles commented on code in PR #38366:
URL: https://github.com/apache/beam/pull/38366#discussion_r3201830343


##########
examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java:
##########
@@ -357,45 +354,90 @@ public static class FailsafeElementToJsonFn
     @Nullable public final String jsonSchema;
     public final String delimiter;
     public final TupleTag<FailsafeElement<String, String>> udfDeadletterTag;
-    @Nullable private final PCollectionView<String> headersView;
     private Counter successCounter =
         Metrics.counter(FailsafeElementToJsonFn.class, 
SUCCESSFUL_TO_JSON_COUNTER);
     private Counter failedCounter =
         Metrics.counter(FailsafeElementToJsonFn.class, FAILED_TO_JSON_COUNTER);
 
     FailsafeElementToJsonFn(
-        PCollectionView<String> headersView,
         String jsonSchema,
         String delimiter,
         TupleTag<FailsafeElement<String, String>> udfDeadletterTag) {
-      this.headersView = headersView;
       this.jsonSchema = jsonSchema;
       this.delimiter = delimiter;
       this.udfDeadletterTag = udfDeadletterTag;
     }
 
     @ProcessElement
-    public void processElement(ProcessContext context) {
-      FailsafeElement<String, String> element = context.element();
+    public void processElement(
+        @Element FailsafeElement<String, String> element,
+        OutputReceiver<FailsafeElement<String, String>> receiver,
+        MultiOutputReceiver multiReceiver) {
+      List<String> header = null;
+      List<String> record = 
Arrays.asList(element.getOriginalPayload().split(this.delimiter));
+
+      try {
+        String json = buildJsonString(header, record, this.jsonSchema);
+        receiver.output(FailsafeElement.of(element.getOriginalPayload(), 
json));
+        successCounter.inc();
+      } catch (Exception e) {
+        failedCounter.inc();
+        multiReceiver
+            .get(this.udfDeadletterTag)
+            .output(
+                FailsafeElement.of(element)
+                    .setErrorMessage(e.getMessage())
+                    .setStacktrace(Throwables.getStackTraceAsString(e)));
+      }
+    }
+  }
+
+  public static class FailsafeElementToJsonWithHeadersFn

Review Comment:
   I don't really know about the null side input antipattern. a 
SideInput<@Nullable String> that is a 
`PCollection(NullableCoder<StringUtf8Coder>)` seems like it will "work". The 
main issue I guess is that the runner is probably not very smart so when it is 
unused you have to pass `View.asSingleton(Create.of(null))` which will probably 
be materialized to an empty file and then read again for no reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to