johnjcasey commented on code in PR #27039:
URL: https://github.com/apache/beam/pull/27039#discussion_r1237350046
##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java:
##########
@@ -61,6 +73,49 @@ static MapElements<Row, GenericRecord>
mapRowsToGenericRecords(Schema beamSchema
.via(AvroUtils.getRowToGenericRecordFunction(AvroUtils.toAvroSchema(beamSchema)));
}
+ // Applies generic mapping from Beam row to other data types through the
provided mapFn.
+ // Implemenets error handling with metrics and DLQ support.
+ // Arguments:
+ // name: the metric name to use.
+ // mapFn: the mapping function for mapping from Beam row to other data
types.
+ // outputTag: TupleTag for output. Used to direct output to correct
output source, or in the
+ // case of error, a DLQ.
+ static class ErrorCounterFn<OutputT extends Object> extends DoFn<Row,
OutputT> {
+ private SerializableFunction<Row, OutputT> mapFn;
+ private Counter errorCounter;
+ private TupleTag<OutputT> outputTag;
+ private long errorsInBundle = 0L;
+
+ public ErrorCounterFn(
+ String name, SerializableFunction<Row, OutputT> mapFn,
TupleTag<OutputT> outputTag) {
+ errorCounter =
Metrics.counter(FileWriteSchemaTransformFormatProvider.class, name);
+ this.mapFn = mapFn;
+ this.outputTag = outputTag;
+ }
+
+ @ProcessElement
+ public void process(@DoFn.Element Row row, MultiOutputReceiver receiver) {
+ try {
+ receiver.get(outputTag).output(mapFn.apply(row));
+ } catch (Exception e) {
+ errorsInBundle += 1;
+ LOG.warn("Error while parsing input element", e);
Review Comment:
thats fair
##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java:
##########
@@ -61,6 +73,49 @@ static MapElements<Row, GenericRecord>
mapRowsToGenericRecords(Schema beamSchema
.via(AvroUtils.getRowToGenericRecordFunction(AvroUtils.toAvroSchema(beamSchema)));
}
+ // Applies generic mapping from Beam row to other data types through the
provided mapFn.
+ // Implemenets error handling with metrics and DLQ support.
+ // Arguments:
+ // name: the metric name to use.
+ // mapFn: the mapping function for mapping from Beam row to other data
types.
+ // outputTag: TupleTag for output. Used to direct output to correct
output source, or in the
+ // case of error, a DLQ.
+ static class ErrorCounterFn<OutputT extends Object> extends DoFn<Row,
OutputT> {
Review Comment:
also, ideally we would have a unit test for this isolated from any
particular format. I do appreciate that you've added tests that use this for
all the formats.
##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java:
##########
@@ -61,6 +73,49 @@ static MapElements<Row, GenericRecord>
mapRowsToGenericRecords(Schema beamSchema
.via(AvroUtils.getRowToGenericRecordFunction(AvroUtils.toAvroSchema(beamSchema)));
}
+ // Applies generic mapping from Beam row to other data types through the
provided mapFn.
+ // Implemenets error handling with metrics and DLQ support.
+ // Arguments:
+ // name: the metric name to use.
+ // mapFn: the mapping function for mapping from Beam row to other data
types.
+ // outputTag: TupleTag for output. Used to direct output to correct
output source, or in the
+ // case of error, a DLQ.
+ static class ErrorCounterFn<OutputT extends Object> extends DoFn<Row,
OutputT> {
Review Comment:
error handling beam row mapper maybe? that or your suggestion works
##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroWriteSchemaTransformFormatProvider.java:
##########
@@ -47,22 +57,27 @@ public String identifier() {
* {@link PCollection} file names written using {@link AvroIO.Write}.
*/
@Override
- public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+ public PTransform<PCollection<Row>, PCollectionTuple> buildTransform(
FileWriteSchemaTransformConfiguration configuration, Schema schema) {
- return new PTransform<PCollection<Row>, PCollection<String>>() {
+ return new PTransform<PCollection<Row>, PCollectionTuple>() {
@Override
- public PCollection<String> expand(PCollection<Row> input) {
+ public PCollectionTuple expand(PCollection<Row> input) {
org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
AvroGenericCoder coder = AvroGenericCoder.of(avroSchema);
- PCollection<GenericRecord> avro =
- input
- .apply(
- "Row To Avro Generic Record",
-
FileWriteSchemaTransformFormatProviders.mapRowsToGenericRecords(schema))
- .setCoder(coder);
+ PCollectionTuple tuple =
+ input.apply(
+ "Row To Avro Generic Record",
+ ParDo.of(
+ new ErrorCounterFn<GenericRecord>(
+ "Avro-write-error-counter",
+
AvroUtils.getRowToGenericRecordFunction(AvroUtils.toAvroSchema(schema)),
+ ERROR_FN_OUPUT_TAG))
+ .withOutputTags(ERROR_FN_OUPUT_TAG,
TupleTagList.of(ERROR_TAG)));
Review Comment:
got it, that makes sense
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]