fightBoxing opened a new pull request, #15056: URL: https://github.com/apache/iceberg/pull/15056
## [FLINK] Implement Iceberg lookup join functionality ### Problem In production environments, there is a common need to join streaming data with dimension data stored in Iceberg tables. The dimension data needs to be periodically refreshed to ensure join accuracy. Currently, Flink lacks native support for Iceberg lookup joins, forcing users to work around this limitation or use alternative solutions. ### Solution This PR implements Iceberg lookup join functionality for Flink, enabling efficient joins between streaming data and Iceberg dimension tables. The implementation includes: - **IcebergLookupCache**: A cache mechanism for storing and managing lookup data with TTL support - **IcebergLookupReader**: A reader component for loading and refreshing lookup data from Iceberg tables - **IcebergTableSource enhancement**: Updated to support lookup join operations - **Configuration options**: New config options for customizing lookup join behavior (cache size, refresh interval, etc.) - **Integration tests**: Comprehensive test coverage (IcebergLookupJoinITCase) ### Changes - Added `IcebergLookupCache` for efficient caching of lookup data - Added `IcebergLookupReader` for reading lookup data from Iceberg tables - Added `IcebergLookupJoinITCase` for integration testing - Updated `IcebergTableSource` to support lookup join operations - Added configuration options in `FlinkConfigOptions` for lookup join settings - Updated build.gradle files for v1.16, v1.17, and v1.18 ### Benefits - Enables real-time joins with Iceberg dimension tables - Reduces data latency by avoiding frequent full table scans - Improves performance through intelligent caching strategies - Seamlessly integrates with existing Flink lookup join framework - Supports periodic data refresh to ensure data freshness ### Testing - Added integration tests to validate lookup join functionality - Tested cache refresh mechanisms - Verified correctness of join results - Ensures backward compatibility ### Versions This implementation is backported to Flink 1.16, 1.17, and 1.18 to support multiple Flink versions in production environments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
