Praveenkumar76 opened a new pull request, #26:
URL: https://github.com/apache/pulsar-connectors/pull/26

   Fixes [apache#23763](https://github.com/apache/pulsar/issues/23763)
   
   ### Motivation
   
   Solr sink indexing remains empty when consuming PostgreSQL CDC events 
produced by the Debezium connector.
   
   The root cause is a schema and structural incompatibility between 
Debezium-generated messages and the existing Solr sink implementation:
   
   - **KeyValue schema handling:** Debezium emits CDC events using a `KeyValue` 
schema wrapper. The existing `GenericRecord` processing could not correctly 
extract nested payload fields from this structure.
   - **Envelope structure:** Debezium wraps events inside a CDC envelope 
containing fields such as `before`, `after`, `op`, and `source`. The sink 
previously lacked logic to unwrap and process the actual row state.
   - **Type mismatch:** Solr expects consistent string-based values for 
identifiers and indexed fields, while CDC events often contain numeric types 
(`Integer`, `Long`), causing silent field drops during indexing.
   - **Record lifecycle handling:** Conversion failures and intentionally 
skipped records (such as processed DELETE events) were not clearly 
differentiated, potentially leading to silent data loss.
   
   Additionally, while CDC transformation can be handled externally using 
Pulsar Functions or intermediate processors, native support within the Solr 
sink simplifies standard CDC-to-search indexing pipelines.
   
   
   ### Modifications
   
   #### Configuration
   
   - Added `unwrapDebeziumRecord` configuration in `SolrSinkConfig` (default: 
`false`) to allow users to explicitly enable Debezium CDC payload handling.
   
   #### SolrAbstractSink
   
   Updated the parent sink lifecycle handling:
   
   - Successful processing or intentionally skipped records now trigger 
`record.ack()`
   - Processing failures now trigger `record.fail()` to support retries and DLQ 
handling
   - Prevents silent data loss during conversion failures
   
   #### SolrGenericRecordSink
   
   Implemented native Debezium CDC support:
   
   - Added `extractValueRecord()` using `getNativeObject()` for safe extraction 
of `KeyValue` payloads
   - Added `isDebeziumEnvelope()` to identify valid Debezium CDC envelopes 
using `op` and state fields (`before` / `after`)
   - Added `mapDebeziumPayload()` and `extractAfterRecord()` to recursively 
unwrap CDC payloads
   - Added explicit DELETE handling:
     - Detects delete events (`after == null`)
     - Extracts primary key from `before`
     - Calls `deleteById` in Solr
   - Added `normalizeValue()` to safely convert numeric values into Strings for 
Solr schema compatibility
   
   #### Refactoring
   
   - Simplified control flow using guard clauses
   - Reduced nested branching for improved readability and maintainability
   
   ### Highlight of changes:
   
   - Native Debezium CDC payload support in Solr sink
   - End-to-end DELETE event propagation from PostgreSQL to Solr
   - Improved data integrity through proper `ack()` / `fail()` lifecycle 
handling
   
   ### Verifying this change
   
   Verified end-to-end using:
   
   - Apache Pulsar 4.x standalone
   - Debezium PostgreSQL Connector
   - Solr 9.x
   - PostgreSQL CDC events
   
   Validation results:
   
   - INSERT events correctly populate Solr documents
   - UPDATE events correctly update indexed fields
   - DELETE events correctly remove documents from Solr
   - Numeric fields are normalized and indexed successfully
   
   Example CDC operations validated:
   
   - `"op":"c"` → INSERT
   - `"op":"u"` → UPDATE
   - `"op":"d"` → DELETE
   
   
   ### Tests added
   
   - `testDebeziumUnwrapEnvelope`  
     Validates handling of standard Debezium envelope payloads
   
   - `testDebeziumUnwrapFlatValue`  
     Ensures compatibility with non-envelope payloads
   
   - `testDebeziumUnwrapDelete`  
     Verifies DELETE event handling and Solr deletion flow
   
   - Updated mocks to support:
     - `KeyValue` schema handling
     - Debezium `op` field detection
   
   - Confirmed no regression for non-CDC Pulsar topics
   
   
   ### Does this pull request potentially affect one of the following parts:
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to