[ 
https://issues.apache.org/jira/browse/HUDI-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-4714:
----------------------------
    Component/s: flink

> HoodieFlinkWriteClient can't load callback config to HoodieWriteConfig
> ----------------------------------------------------------------------
>
>                 Key: HUDI-4714
>                 URL: https://issues.apache.org/jira/browse/HUDI-4714
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>            Reporter: yuemeng
>            Assignee: yuemeng
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, it doesn't load callback config to write config when call 
> StreamUtil's getHoodieClientConfig method
> So In hoodie Flink write client ,callback never worked
> {code}
> HoodieWriteConfig.Builder builder =
> HoodieWriteConfig.newBuilder()
> .withEngineType(EngineType.FLINK)
> .withPath(conf.getString(FlinkOptions.PATH))
> .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true)
> .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf))
> .withClusteringConfig(
> HoodieClusteringConfig.newBuilder()
> .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED))
> .withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
> .withClusteringPlanPartitionFilterMode(
> ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
> .withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
> .withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS))
> .withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES))
> .withClusteringPlanSmallFileLimit(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT)
>  * 1024 * 1024L)
> .withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST))
> .withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS))
> .build())
> .withCleanConfig(HoodieCleanConfig.newBuilder()
> .withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED))
> .retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS))
> .cleanerNumHoursRetained(conf.getInteger(FlinkOptions.CLEAN_RETAIN_HOURS))
> .retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS))
> // override and hardcode to 20,
> // actually Flink cleaning is always with parallelism 1 now
> .withCleanerParallelism(20)
> .withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY)))
> .build())
> .withArchivalConfig(HoodieArchivalConfig.newBuilder()
> .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), 
> conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
> .build())
> .withCompactionConfig(HoodieCompactionConfig.newBuilder()
> .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO))
> .withInlineCompactionTriggerStrategy(
> CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT)))
> .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS))
> .withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS))
> .build())
> .withMemoryConfig(
> HoodieMemoryConfig.newBuilder()
> .withMaxMemoryMaxSize(
> conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY) * 1024 * 1024L,
> conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L
> ).build())
> .forTable(conf.getString(FlinkOptions.TABLE_NAME))
> .withStorageConfig(HoodieStorageConfig.newBuilder()
> .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 
> 1024 * 1024)
> .logFileMaxSize(conf.getLong(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024)
> .parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * 
> 1024 * 1024)
> .parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 
> * 1024)
> .parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) 
> * 1024 * 1024L)
> .build())
> .withMetadataConfig(HoodieMetadataConfig.newBuilder()
> .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
> .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
> .build())
> .withLockConfig(HoodieLockConfig.newBuilder()
> .withLockProvider(FileSystemBasedLockProvider.class)
> .withLockWaitTimeInMillis(2000L) // 2s
> .withFileSystemLockExpire(1) // 1 minute
> .withClientNumRetries(30)
> .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
> .build())
> .withPayloadConfig(getPayloadConfig(conf))
> .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
> .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded 
> timeline service singleton
> .withAutoCommit(false)
> .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
> .withProps(flinkConf2TypedProperties(conf))
> .withSchema(getSourceSchema(conf).toString());
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to