echauchot commented on PR #35: URL: https://github.com/apache/flink-connector-cassandra/pull/35#issuecomment-3088159776
> ## Summary > This PR implements complete **Apache Flink Table/SQL API support for reading from Apache Cassandra** through a new `CassandraDynamicTableSource`. Previously, users could only read from Cassandra using the DataStream API with the existing `CassandraSource`. This change enables direct SQL queries against Cassandra tables using Flink's Table API. > > **Key capabilities added:** > > * Full SQL query support: `SELECT * FROM cassandra_table WHERE conditions` > * Complete type mapping between all Cassandra data types and Flink logical types > * Support for complex nested structures (UDTs, collections, arrays) > > ## Changes Made > ### Core Table API Implementation > * **`CassandraDynamicTableSource`**: New DynamicTableSource implementation > > ### Comprehensive Type Mapping System > * **`RowToRowDataMapper`**: Main orchestrator for field-level conversions > * **`CassandraFieldMapperFactory`**: Factory for creating appropriate mappers based on Flink logical types > * **`PrimitiveFieldMappers`**: > > * Added `DynamicDecimalMapper` for runtime type detection (handles both varint and decimal columns) > * Added `TimeMapper` for TIME_WITHOUT_TIME_ZONE with precision conversion > * Enhanced `StringMapper` for UUID/TimeUUID/Duration/Inet types > * Added `convertValue()` methods to `IntegerMapper` and `LongMapper` to prevent ClassCastException > * **`CollectionFieldMappers`**: > > * Fixed `SetMapper` count typing for MULTISET support > * Support for deeply nested structures (List<Row<...>>, Map<String, Row<...>>) > > ### Build System Enhancements > * **Enhanced `pom.xml`**: > > * Added comprehensive shaded jar configuration with detailed documentation > * Added filters to exclude signature files and prevent conflicts > * Ensures proper DataStax driver dependency resolution in SQL client > > ### Testing > * **`CassandraTableE2ETest`**: End-to-end tests covering all edge cases > * **Unit test coverage** for all mapper components with mock-based testing > > ## Type Mapping Details > Cassandra Type Flink LogicalType Mapper Used Notes > boolean BOOLEAN BooleanMapper Direct mapping > tinyint TINYINT ByteMapper Direct mapping > smallint SMALLINT ShortMapper Direct mapping > int INTEGER IntegerMapper With convertValue() for type safety > bigint BIGINT LongMapper With convertValue() for type safety > float FLOAT FloatMapper Direct mapping > double DOUBLE DoubleMapper Direct mapping > text/ascii VARCHAR StringMapper Direct mapping > uuid/timeuuid VARCHAR StringMapper Converted to string representation > inet VARCHAR StringMapper Converted to IP address string > duration VARCHAR StringMapper Converted to ISO-8601 duration string > decimal DECIMAL DynamicDecimalMapper Runtime type detection > varint DECIMAL DynamicDecimalMapper Runtime type detection > date DATE DateMapper Converted to epoch days > time TIME_WITHOUT_TIME_ZONE TimeMapper Nanoseconds → milliseconds > timestamp TIMESTAMP_WITHOUT_TIME_ZONE TimestampMapper Direct mapping > blob VARBINARY BinaryMapper ByteBuffer → byte[] > list ARRAY ArrayMapper Recursive mapping > map<K,V> MAP<K,V> MapMapper Recursive mapping > set MULTISET SetMapper Converted to Map<element, count> > UDT ROW<...> RowMapper Field-by-field recursive mapping > ## Testing > ### Unit Tests > * [x] `CassandraFieldMapperFactoryTest`: Tests all mapper creation scenarios > * [x] `PrimitiveFieldMappersTest`: Tests all primitive type conversions with edge cases > * [x] `CollectionFieldMappersTest`: Tests nested collection handling > * [x] `CassandraDynamicTableSourceTest`: Tests table source configuration and split generation > > ### Integration Tests > * [x] `CassandraDynamicTableSourceITCase`: Comprehensive end-to-end test that verifies: > > * Correct type mapping for all primitive types > * Nested structure handling (arrays of UDTs, maps with Row values) > * Null value handling in collections > * Empty collection behavior > * Large binary data handling > * High precision numeric types > * UUID/TimeUUID string conversion > * Duration and Inet address handling > > ### Build Verification > * [x] All tests pass: `mvn clean package` > * [x] Code formatting: `mvn spotless:check` > * [x] Checkstyle compliance: `mvn checkstyle:check` > > ## Breaking Changes > None. This is a new feature that adds Table API support while maintaining full backward compatibility with existing DataStream API usage. There is actually a breaking change on CassandraSource creation (constructor replaced with builder). But I agree with this change and it is ok as this class is annotated PublicEvolving -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
