Poorvankbhatia opened a new pull request, #35:
URL: https://github.com/apache/flink-connector-cassandra/pull/35

    ## Summary
     This PR implements complete **Apache Flink Table/SQL API support for 
reading from Apache Cassandra** through a new `CassandraDynamicTableSource`. 
Previously, users could only read from Cassandra using the
     DataStream API with the existing `CassandraSource`. This change enables 
direct SQL queries against Cassandra tables using Flink's Table API.
   
     **Key capabilities added:**
     - Full SQL query support: `SELECT * FROM cassandra_table WHERE conditions`
     - Complete type mapping between all Cassandra data types and Flink logical 
types
     - Support for complex nested structures (UDTs, collections, arrays)
   
     ## Changes Made
   
     ###  Core Table API Implementation
     - **`CassandraDynamicTableSource`**: New DynamicTableSource implementation 
with projection and limit pushdown support
   
     ###  Comprehensive Type Mapping System
     - **`RowToRowDataMapper`**: Main orchestrator for field-level conversions
     - **`CassandraFieldMapperFactory`**: Factory for creating appropriate 
mappers based on Flink logical types
     - **`PrimitiveFieldMappers`**:
       - Added `DynamicDecimalMapper` for runtime type detection (handles both 
varint and decimal columns)
       - Added `TimeMapper` for TIME_WITHOUT_TIME_ZONE with precision conversion
       - Enhanced `StringMapper` for UUID/TimeUUID/Duration/Inet types
       - Added `convertValue()` methods to `IntegerMapper` and `LongMapper` to 
prevent ClassCastException
     - **`CollectionFieldMappers`**:
       - Fixed `SetMapper` count typing for MULTISET support
       - Support for deeply nested structures (List<Row<...>>, Map<String, 
Row<...>>)
   
     ###  Build System Enhancements
     - **Enhanced `pom.xml`**:
       - Added comprehensive shaded jar configuration with detailed 
documentation
       - Added filters to exclude signature files and prevent conflicts
       - Ensures proper DataStax driver dependency resolution in SQL client
   
     ### Testing
     - **`CassandraTableE2ETest`**: End-to-end tests covering all edge cases
     - **Unit test coverage** for all mapper components with mock-based testing
   
     ## Type Mapping Details
   
     | Cassandra Type | Flink LogicalType | Mapper Used | Notes |
     |---|---|---|---|
     | boolean | BOOLEAN | BooleanMapper | Direct mapping |
     | tinyint | TINYINT | ByteMapper | Direct mapping |
     | smallint | SMALLINT | ShortMapper | Direct mapping |
     | int | INTEGER | IntegerMapper | With convertValue() for type safety |
     | bigint | BIGINT | LongMapper | With convertValue() for type safety |
     | float | FLOAT | FloatMapper | Direct mapping |
     | double | DOUBLE | DoubleMapper | Direct mapping |
     | text/ascii | VARCHAR | StringMapper | Direct mapping |
     | uuid/timeuuid | VARCHAR | StringMapper | Converted to string 
representation |
     | inet | VARCHAR | StringMapper | Converted to IP address string |
     | duration | VARCHAR | StringMapper | Converted to ISO-8601 duration 
string |
     | decimal | DECIMAL | DynamicDecimalMapper | Runtime type detection |
     | varint | DECIMAL | DynamicDecimalMapper | Runtime type detection |
     | date | DATE | DateMapper | Converted to epoch days |
     | time | TIME_WITHOUT_TIME_ZONE | TimeMapper | Nanoseconds → milliseconds |
     | timestamp | TIMESTAMP_WITHOUT_TIME_ZONE | TimestampMapper | Direct 
mapping |
     | blob | VARBINARY | BinaryMapper | ByteBuffer → byte[] |
     | list<T> | ARRAY<T> | ArrayMapper | Recursive mapping |
     | map<K,V> | MAP<K,V> | MapMapper | Recursive mapping |
     | set<T> | MULTISET<T> | SetMapper | Converted to Map<element, count> |
     | UDT | ROW<...> | RowMapper | Field-by-field recursive mapping |
   
     ## Testing
   
     ### Unit Tests
     - [x] `CassandraFieldMapperFactoryTest`: Tests all mapper creation 
scenarios
     - [x] `PrimitiveFieldMappersTest`: Tests all primitive type conversions 
with edge cases
     - [x] `CollectionFieldMappersTest`: Tests nested collection handling
     - [x] `CassandraDynamicTableSourceTest`: Tests table source configuration 
and split generation
   
     ### Integration Tests
     - [x] `CassandraTableE2ETest`: Comprehensive end-to-end test that verifies:
       - Correct type mapping for all primitive types
       - Nested structure handling (arrays of UDTs, maps with Row values)
       - Null value handling in collections
       - Empty collection behavior
       - Large binary data handling
       - High precision numeric types
       - UUID/TimeUUID string conversion
       - Duration and Inet address handling
   
     ### Build Verification
     - [x] All tests pass: `mvn clean package`
     - [x] Code formatting: `mvn spotless:check`
     - [x] Checkstyle compliance: `mvn checkstyle:check`
   
     ## Breaking Changes
   None. This is a new feature that adds Table API support while maintaining 
full backward compatibility with existing DataStream API usage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to