Praveenkumar76 opened a new pull request, #26: URL: https://github.com/apache/pulsar-connectors/pull/26
Fixes [apache#23763](https://github.com/apache/pulsar/issues/23763) ### Motivation Solr sink indexing remains empty when consuming PostgreSQL CDC events produced by the Debezium connector. The root cause is a schema and structural incompatibility between Debezium-generated messages and the existing Solr sink implementation: - **KeyValue schema handling:** Debezium emits CDC events using a `KeyValue` schema wrapper. The existing `GenericRecord` processing could not correctly extract nested payload fields from this structure. - **Envelope structure:** Debezium wraps events inside a CDC envelope containing fields such as `before`, `after`, `op`, and `source`. The sink previously lacked logic to unwrap and process the actual row state. - **Type mismatch:** Solr expects consistent string-based values for identifiers and indexed fields, while CDC events often contain numeric types (`Integer`, `Long`), causing silent field drops during indexing. - **Record lifecycle handling:** Conversion failures and intentionally skipped records (such as processed DELETE events) were not clearly differentiated, potentially leading to silent data loss. Additionally, while CDC transformation can be handled externally using Pulsar Functions or intermediate processors, native support within the Solr sink simplifies standard CDC-to-search indexing pipelines. ### Modifications #### Configuration - Added `unwrapDebeziumRecord` configuration in `SolrSinkConfig` (default: `false`) to allow users to explicitly enable Debezium CDC payload handling. #### SolrAbstractSink Updated the parent sink lifecycle handling: - Successful processing or intentionally skipped records now trigger `record.ack()` - Processing failures now trigger `record.fail()` to support retries and DLQ handling - Prevents silent data loss during conversion failures #### SolrGenericRecordSink Implemented native Debezium CDC support: - Added `extractValueRecord()` using `getNativeObject()` for safe extraction of `KeyValue` payloads - Added `isDebeziumEnvelope()` to identify valid Debezium CDC envelopes using `op` and state fields (`before` / `after`) - Added `mapDebeziumPayload()` and `extractAfterRecord()` to recursively unwrap CDC payloads - Added explicit DELETE handling: - Detects delete events (`after == null`) - Extracts primary key from `before` - Calls `deleteById` in Solr - Added `normalizeValue()` to safely convert numeric values into Strings for Solr schema compatibility #### Refactoring - Simplified control flow using guard clauses - Reduced nested branching for improved readability and maintainability ### Highlight of changes: - Native Debezium CDC payload support in Solr sink - End-to-end DELETE event propagation from PostgreSQL to Solr - Improved data integrity through proper `ack()` / `fail()` lifecycle handling ### Verifying this change Verified end-to-end using: - Apache Pulsar 4.x standalone - Debezium PostgreSQL Connector - Solr 9.x - PostgreSQL CDC events Validation results: - INSERT events correctly populate Solr documents - UPDATE events correctly update indexed fields - DELETE events correctly remove documents from Solr - Numeric fields are normalized and indexed successfully Example CDC operations validated: - `"op":"c"` → INSERT - `"op":"u"` → UPDATE - `"op":"d"` → DELETE ### Tests added - `testDebeziumUnwrapEnvelope` Validates handling of standard Debezium envelope payloads - `testDebeziumUnwrapFlatValue` Ensures compatibility with non-envelope payloads - `testDebeziumUnwrapDelete` Verifies DELETE event handling and Solr deletion flow - Updated mocks to support: - `KeyValue` schema handling - Debezium `op` field detection - Confirmed no regression for non-CDC Pulsar topics ### Does this pull request potentially affect one of the following parts: - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
