sydneyhoran commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1524068006

   My team is trying to develop a Custom Transformer class that can skip over 
null (tombstone) records from PostgresDebezium Kafka Source to address this. We 
are attempting along the lines of:
   
   ```
   public class TombstoneTransformer implements Transformer {
   
     private static final Logger LOG = 
LoggerFactory.getLogger(TombstoneTransformer.class);
   
     @Override
     public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset,
                               TypedProperties properties) {
       LOG.info("TombstoneTransformer " + rowDataset);
       // // NullPointerException happens on the following line:
       // List<Row> rowList = rowDataset.collectAsList();
       // // NullPointerException happens on the following line:
       // newRowSet.collectAsList().stream().limit(50).forEach(x -> 
LOG.info(x.toString()));
       // // Later in DeltaSync, tombstone records appear to still be present 
and results in NullPointerException later
       // Dataset<Row> newRowSet = rowDataset.filter("_change_operation_type is 
not null");
       // // Later in DeltaSync, tombstone records appear to still be present 
and results in NullPointerException later
       Dataset<Row> newRowSet = rowDataset.filter(Objects::nonNull);
       return newRowSet;
     }
   }
   ```
   
   However, none of the attempts at filtering the rowDataset get rid of the 
NullPointerException later in the Deltastreamer ingestion. Moreso, many of the 
attempts to log/view the individual records in rowDataset result in 
NullPointerException . And so we are wondering if there is something earlier in 
the code (maybe the PostgresDebeziumSource.java that flattens messages?) that 
runs that could be allowing malformed Row objects to get passed to the Custom 
Transformer classes - that somehow is not allowing us to read/access the Rows 
and filter out the ones that are null (tombstone) records.
   Anyone that might have an idea for how to make this class work? Side note - 
we also tried SqlQueryBasedTransformer with “SELECT * FROM <SRC> a WHERE a.id 
is not null” and it also did not filter out Tombstones (still had NPE later 
during ingestion).
   
   Could someone explain what is happening with delete operations on 
`PostgresDebeziumSource` and `PostgresDebeziumAvroPayload` and why they 
potentially aren't being handled well? Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to