[jira] [Commented] (FLINK-35237) Allow Sink to Choose HashFunction in PrePartitionOperator

2024-06-13 Thread LvYanquan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854655#comment-17854655
 ] 

LvYanquan commented on FLINK-35237:
---

I've met the same demand in Paimon Sink, and I agree that this is a reasonable 
requirement, and this change is acceptable to me.

> Allow Sink to Choose HashFunction in PrePartitionOperator
> -
>
> Key: FLINK-35237
> URL: https://issues.apache.org/jira/browse/FLINK-35237
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: zhangdingxin
>Priority: Major
> Fix For: cdc-3.2.0
>
>
> The {{PrePartitionOperator}} in its current implementation only supports a 
> fixed {{HashFunction}} 
> ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
>  This limits the ability of Sink implementations to customize the 
> partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of 
> partitioned tables, it would be advantageous to allow hashing based on 
> partition keys, hashing according to table names, or using the database 
> engine's internal primary key hash functions (such as with MaxCompute 
> DataSink).
> When users require such custom partitioning logic, they are compelled to 
> implement their PartitionOperator, which undermines the utility of 
> {{{}PrePartitionOperator{}}}.
> To address this limitation, it would be highly desirable to enable the 
> {{PrePartitionOperator}} to support user-specified custom 
> {{{}HashFunction{}}}s (Function). A possible 
> solution could involve a mechanism analogous to the {{DataSink}} interface, 
> allowing the specification of a {{HashFunctionProvider}} class path in the 
> configuration file. This enhancement would greatly facilitate users in 
> tailoring partition strategies to meet their specific application needs.
> In this case, I want to create new class {{HashFunctionProvider}} and 
> {{{}HashFunction{}}}:
> {code:java}
> public interface HashFunctionProvider {
> HashFunction getHashFunction(Schema schema);
> }
> public interface HashFunction extends Function {
> Integer apply(DataChangeEvent event);
> } {code}
> add {{getHashFunctionProvider}} method to {{DataSink}}
>  
> {code:java}
> public interface DataSink {
> /** Get the {@link EventSinkProvider} for writing changed data to 
> external systems. */
> EventSinkProvider getEventSinkProvider();
> /** Get the {@link MetadataApplier} for applying metadata changes to 
> external systems. */
> MetadataApplier getMetadataApplier();
> default HashFunctionProvider getHashFunctionProvider() {
> return new DefaultHashFunctionProvider();
> }
> } {code}
> and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method.
> {code:java}
> private HashFunction recreateHashFunction(TableId tableId) {
> return 
> hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId));
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35237) Allow Sink to Choose HashFunction in PrePartitionOperator

2024-05-10 Thread zhangdingxin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845267#comment-17845267
 ] 

zhangdingxin commented on FLINK-35237:
--

If the maintainers agree that this improvement is worthwhile, I would be happy 
to take it on. Please feel free to assign the issue to me.

> Allow Sink to Choose HashFunction in PrePartitionOperator
> -
>
> Key: FLINK-35237
> URL: https://issues.apache.org/jira/browse/FLINK-35237
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: zhangdingxin
>Priority: Major
>
> The {{PrePartitionOperator}} in its current implementation only supports a 
> fixed {{HashFunction}} 
> ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
>  This limits the ability of Sink implementations to customize the 
> partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of 
> partitioned tables, it would be advantageous to allow hashing based on 
> partition keys, hashing according to table names, or using the database 
> engine's internal primary key hash functions (such as with MaxCompute 
> DataSink).
> When users require such custom partitioning logic, they are compelled to 
> implement their PartitionOperator, which undermines the utility of 
> {{{}PrePartitionOperator{}}}.
> To address this limitation, it would be highly desirable to enable the 
> {{PrePartitionOperator}} to support user-specified custom 
> {{{}HashFunction{}}}s (Function). A possible 
> solution could involve a mechanism analogous to the {{DataSink}} interface, 
> allowing the specification of a {{HashFunctionProvider}} class path in the 
> configuration file. This enhancement would greatly facilitate users in 
> tailoring partition strategies to meet their specific application needs.
> In this case, I want to create new class {{HashFunctionProvider}} and 
> {{{}HashFunction{}}}:
> {code:java}
> public interface HashFunctionProvider {
> HashFunction getHashFunction(Schema schema);
> }
> public interface HashFunction extends Function {
> Integer apply(DataChangeEvent event);
> } {code}
> add {{getHashFunctionProvider}} method to {{DataSink}}
>  
> {code:java}
> public interface DataSink {
> /** Get the {@link EventSinkProvider} for writing changed data to 
> external systems. */
> EventSinkProvider getEventSinkProvider();
> /** Get the {@link MetadataApplier} for applying metadata changes to 
> external systems. */
> MetadataApplier getMetadataApplier();
> default HashFunctionProvider getHashFunctionProvider() {
> return new DefaultHashFunctionProvider();
> }
> } {code}
> and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method.
> {code:java}
> private HashFunction recreateHashFunction(TableId tableId) {
> return 
> hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId));
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)