echauchot commented on PR #35:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/35#issuecomment-3088159776

   > ## Summary
   > This PR implements complete **Apache Flink Table/SQL API support for 
reading from Apache Cassandra** through a new `CassandraDynamicTableSource`. 
Previously, users could only read from Cassandra using the DataStream API with 
the existing `CassandraSource`. This change enables direct SQL queries against 
Cassandra tables using Flink's Table API.
   > 
   > **Key capabilities added:**
   > 
   > * Full SQL query support: `SELECT * FROM cassandra_table WHERE conditions`
   > * Complete type mapping between all Cassandra data types and Flink logical 
types
   > * Support for complex nested structures (UDTs, collections, arrays)
   > 
   > ## Changes Made
   > ### Core Table API Implementation
   > * **`CassandraDynamicTableSource`**: New DynamicTableSource implementation
   > 
   > ### Comprehensive Type Mapping System
   > * **`RowToRowDataMapper`**: Main orchestrator for field-level conversions
   > * **`CassandraFieldMapperFactory`**: Factory for creating appropriate 
mappers based on Flink logical types
   > * **`PrimitiveFieldMappers`**:
   >   
   >   * Added `DynamicDecimalMapper` for runtime type detection (handles both 
varint and decimal columns)
   >   * Added `TimeMapper` for TIME_WITHOUT_TIME_ZONE with precision conversion
   >   * Enhanced `StringMapper` for UUID/TimeUUID/Duration/Inet types
   >   * Added `convertValue()` methods to `IntegerMapper` and `LongMapper` to 
prevent ClassCastException
   > * **`CollectionFieldMappers`**:
   >   
   >   * Fixed `SetMapper` count typing for MULTISET support
   >   * Support for deeply nested structures (List<Row<...>>, Map<String, 
Row<...>>)
   > 
   > ### Build System Enhancements
   > * **Enhanced `pom.xml`**:
   >   
   >   * Added comprehensive shaded jar configuration with detailed 
documentation
   >   * Added filters to exclude signature files and prevent conflicts
   >   * Ensures proper DataStax driver dependency resolution in SQL client
   > 
   > ### Testing
   > * **`CassandraTableE2ETest`**: End-to-end tests covering all edge cases
   > * **Unit test coverage** for all mapper components with mock-based testing
   > 
   > ## Type Mapping Details
   > Cassandra Type     Flink LogicalType       Mapper Used     Notes
   > boolean    BOOLEAN BooleanMapper   Direct mapping
   > tinyint    TINYINT ByteMapper      Direct mapping
   > smallint   SMALLINT        ShortMapper     Direct mapping
   > int        INTEGER IntegerMapper   With convertValue() for type safety
   > bigint     BIGINT  LongMapper      With convertValue() for type safety
   > float      FLOAT   FloatMapper     Direct mapping
   > double     DOUBLE  DoubleMapper    Direct mapping
   > text/ascii VARCHAR StringMapper    Direct mapping
   > uuid/timeuuid      VARCHAR StringMapper    Converted to string 
representation
   > inet       VARCHAR StringMapper    Converted to IP address string
   > duration   VARCHAR StringMapper    Converted to ISO-8601 duration string
   > decimal    DECIMAL DynamicDecimalMapper    Runtime type detection
   > varint     DECIMAL DynamicDecimalMapper    Runtime type detection
   > date       DATE    DateMapper      Converted to epoch days
   > time       TIME_WITHOUT_TIME_ZONE  TimeMapper      Nanoseconds → 
milliseconds
   > timestamp  TIMESTAMP_WITHOUT_TIME_ZONE     TimestampMapper Direct mapping
   > blob       VARBINARY       BinaryMapper    ByteBuffer → byte[]
   > list       ARRAY   ArrayMapper     Recursive mapping
   > map<K,V>   MAP<K,V>        MapMapper       Recursive mapping
   > set        MULTISET        SetMapper       Converted to Map<element, count>
   > UDT        ROW<...>        RowMapper       Field-by-field recursive mapping
   > ## Testing
   > ### Unit Tests
   > * [x]  `CassandraFieldMapperFactoryTest`: Tests all mapper creation 
scenarios
   > * [x]  `PrimitiveFieldMappersTest`: Tests all primitive type conversions 
with edge cases
   > * [x]  `CollectionFieldMappersTest`: Tests nested collection handling
   > * [x]  `CassandraDynamicTableSourceTest`: Tests table source configuration 
and split generation
   > 
   > ### Integration Tests
   > * [x]  `CassandraDynamicTableSourceITCase`: Comprehensive end-to-end test 
that verifies:
   >   
   >   * Correct type mapping for all primitive types
   >   * Nested structure handling (arrays of UDTs, maps with Row values)
   >   * Null value handling in collections
   >   * Empty collection behavior
   >   * Large binary data handling
   >   * High precision numeric types
   >   * UUID/TimeUUID string conversion
   >   * Duration and Inet address handling
   > 
   > ### Build Verification
   > * [x]  All tests pass: `mvn clean package`
   > * [x]  Code formatting: `mvn spotless:check`
   > * [x]  Checkstyle compliance: `mvn checkstyle:check`
   > 
   > ## Breaking Changes
   > None. This is a new feature that adds Table API support while maintaining 
full backward compatibility with existing DataStream API usage.
   
   There is actually a breaking change on CassandraSource creation (constructor 
replaced with builder). But I agree with this change and it is ok as this class 
is annotated PublicEvolving


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to