reuvenlax commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r436220739



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+  public static final TupleTag<Row> MAIN_TUPLE_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> DEAD_LETTER_TUPLE_TAG = new 
TupleTag<Row>() {};
+
   public static PTransform<PCollection<String>, PCollection<Row>> 
withSchema(Schema rowSchema) {
     return JsonToRowFn.forSchema(rowSchema);
   }
 
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing 
layer are returned as
+   * Row objects of form: {@link JsonToRow#ERROR_ROW_SCHEMA} line : The 
original json string err :
+   * The error message from the parsing function.
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>{@link JsonToRow#MAIN_TUPLE_TAG}
+   *
+   * <p>{@Code PCollection<Row> personRows =
+   * results.get(JsonToRow.MAIN_TUPLE_TAG).setRowSchema(personSchema)}

Review comment:
       +1. If you output.a Row, you should be setting the schema in your 
transform.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;

Review comment:
       make final

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+

Review comment:
       would be nicer to make these field names configurable, though with 
defaults.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {

Review comment:
       I think it would be cleaner to wrap this in a custom result class and 
not expose the TupleTags to users. Look 
org.apache.beam.sdk.io.gcp.bigquery.WriteResult for an example.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = 
"JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {

Review comment:
       Why not use injected parameters instead of ProcessContext?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+  public static final TupleTag<Row> MAIN_TUPLE_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> DEAD_LETTER_TUPLE_TAG = new 
TupleTag<Row>() {};
+

Review comment:
       give these tuple tags real names




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to