Poorvankbhatia opened a new pull request, #35:
URL: https://github.com/apache/flink-connector-cassandra/pull/35
## Summary
This PR implements complete **Apache Flink Table/SQL API support for
reading from Apache Cassandra** through a new `CassandraDynamicTableSource`.
Previously, users could only read from Cassandra using the
DataStream API with the existing `CassandraSource`. This change enables
direct SQL queries against Cassandra tables using Flink's Table API.
**Key capabilities added:**
- Full SQL query support: `SELECT * FROM cassandra_table WHERE conditions`
- Complete type mapping between all Cassandra data types and Flink logical
types
- Support for complex nested structures (UDTs, collections, arrays)
## Changes Made
### Core Table API Implementation
- **`CassandraDynamicTableSource`**: New DynamicTableSource implementation
with projection and limit pushdown support
### Comprehensive Type Mapping System
- **`RowToRowDataMapper`**: Main orchestrator for field-level conversions
- **`CassandraFieldMapperFactory`**: Factory for creating appropriate
mappers based on Flink logical types
- **`PrimitiveFieldMappers`**:
- Added `DynamicDecimalMapper` for runtime type detection (handles both
varint and decimal columns)
- Added `TimeMapper` for TIME_WITHOUT_TIME_ZONE with precision conversion
- Enhanced `StringMapper` for UUID/TimeUUID/Duration/Inet types
- Added `convertValue()` methods to `IntegerMapper` and `LongMapper` to
prevent ClassCastException
- **`CollectionFieldMappers`**:
- Fixed `SetMapper` count typing for MULTISET support
- Support for deeply nested structures (List<Row<...>>, Map<String,
Row<...>>)
### Build System Enhancements
- **Enhanced `pom.xml`**:
- Added comprehensive shaded jar configuration with detailed
documentation
- Added filters to exclude signature files and prevent conflicts
- Ensures proper DataStax driver dependency resolution in SQL client
### Testing
- **`CassandraTableE2ETest`**: End-to-end tests covering all edge cases
- **Unit test coverage** for all mapper components with mock-based testing
## Type Mapping Details
| Cassandra Type | Flink LogicalType | Mapper Used | Notes |
|---|---|---|---|
| boolean | BOOLEAN | BooleanMapper | Direct mapping |
| tinyint | TINYINT | ByteMapper | Direct mapping |
| smallint | SMALLINT | ShortMapper | Direct mapping |
| int | INTEGER | IntegerMapper | With convertValue() for type safety |
| bigint | BIGINT | LongMapper | With convertValue() for type safety |
| float | FLOAT | FloatMapper | Direct mapping |
| double | DOUBLE | DoubleMapper | Direct mapping |
| text/ascii | VARCHAR | StringMapper | Direct mapping |
| uuid/timeuuid | VARCHAR | StringMapper | Converted to string
representation |
| inet | VARCHAR | StringMapper | Converted to IP address string |
| duration | VARCHAR | StringMapper | Converted to ISO-8601 duration
string |
| decimal | DECIMAL | DynamicDecimalMapper | Runtime type detection |
| varint | DECIMAL | DynamicDecimalMapper | Runtime type detection |
| date | DATE | DateMapper | Converted to epoch days |
| time | TIME_WITHOUT_TIME_ZONE | TimeMapper | Nanoseconds → milliseconds |
| timestamp | TIMESTAMP_WITHOUT_TIME_ZONE | TimestampMapper | Direct
mapping |
| blob | VARBINARY | BinaryMapper | ByteBuffer → byte[] |
| list<T> | ARRAY<T> | ArrayMapper | Recursive mapping |
| map<K,V> | MAP<K,V> | MapMapper | Recursive mapping |
| set<T> | MULTISET<T> | SetMapper | Converted to Map<element, count> |
| UDT | ROW<...> | RowMapper | Field-by-field recursive mapping |
## Testing
### Unit Tests
- [x] `CassandraFieldMapperFactoryTest`: Tests all mapper creation
scenarios
- [x] `PrimitiveFieldMappersTest`: Tests all primitive type conversions
with edge cases
- [x] `CollectionFieldMappersTest`: Tests nested collection handling
- [x] `CassandraDynamicTableSourceTest`: Tests table source configuration
and split generation
### Integration Tests
- [x] `CassandraTableE2ETest`: Comprehensive end-to-end test that verifies:
- Correct type mapping for all primitive types
- Nested structure handling (arrays of UDTs, maps with Row values)
- Null value handling in collections
- Empty collection behavior
- Large binary data handling
- High precision numeric types
- UUID/TimeUUID string conversion
- Duration and Inet address handling
### Build Verification
- [x] All tests pass: `mvn clean package`
- [x] Code formatting: `mvn spotless:check`
- [x] Checkstyle compliance: `mvn checkstyle:check`
## Breaking Changes
None. This is a new feature that adds Table API support while maintaining
full backward compatibility with existing DataStream API usage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]