sydneyhoran commented on issue #8519: URL: https://github.com/apache/hudi/issues/8519#issuecomment-1524068006
My team is trying to develop a Custom Transformer class that can skip over null (tombstone) records from PostgresDebezium Kafka Source to address this. We are attempting along the lines of: ``` public class TombstoneTransformer implements Transformer { private static final Logger LOG = LoggerFactory.getLogger(TombstoneTransformer.class); @Override public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { LOG.info("TombstoneTransformer " + rowDataset); // // NullPointerException happens on the following line: // List<Row> rowList = rowDataset.collectAsList(); // // NullPointerException happens on the following line: // newRowSet.collectAsList().stream().limit(50).forEach(x -> LOG.info(x.toString())); // // Later in DeltaSync, tombstone records appear to still be present and results in NullPointerException later // Dataset<Row> newRowSet = rowDataset.filter("_change_operation_type is not null"); // // Later in DeltaSync, tombstone records appear to still be present and results in NullPointerException later Dataset<Row> newRowSet = rowDataset.filter(Objects::nonNull); return newRowSet; } } ``` However, none of the attempts at filtering the rowDataset get rid of the NullPointerException later in the Deltastreamer ingestion. Moreso, many of the attempts to log/view the individual records in rowDataset result in NullPointerException . And so we are wondering if there is something earlier in the code (maybe the PostgresDebeziumSource.java that flattens messages?) that runs that could be allowing malformed Row objects to get passed to the Custom Transformer classes - that somehow is not allowing us to read/access the Rows and filter out the ones that are null (tombstone) records. Anyone that might have an idea for how to make this class work? Side note - we also tried SqlQueryBasedTransformer with “SELECT * FROM <SRC> a WHERE a.id is not null” and it also did not filter out Tombstones (still had NPE later during ingestion). Could someone explain what is happening with delete operations on `PostgresDebeziumSource` and `PostgresDebeziumAvroPayload` and why they potentially aren't being handled well? Thanks in advance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org