yuemeng created HUDI-4714:
-----------------------------

             Summary: HoodieFlinkWriteClient can't load callback config to 
HoodieWriteConfig
                 Key: HUDI-4714
                 URL: https://issues.apache.org/jira/browse/HUDI-4714
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: yuemeng


Currently, it doesn't load callback config to write config when call 
StreamUtil's getHoodieClientConfig method

So In hoodie Flink write client ,callback never worked

{code}

HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.FLINK)
.withPath(conf.getString(FlinkOptions.PATH))
.combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true)
.withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf))
.withClusteringConfig(
HoodieClusteringConfig.newBuilder()
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED))
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
.withClusteringPlanPartitionFilterMode(
ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
.withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
.withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS))
.withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES))
.withClusteringPlanSmallFileLimit(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT)
 * 1024 * 1024L)
.withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST))
.withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS))
.build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED))
.retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS))
.cleanerNumHoursRetained(conf.getInteger(FlinkOptions.CLEAN_RETAIN_HOURS))
.retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS))
// override and hardcode to 20,
// actually Flink cleaning is always with parallelism 1 now
.withCleanerParallelism(20)
.withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY)))
.build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), 
conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO))
.withInlineCompactionTriggerStrategy(
CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT)))
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS))
.withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS))
.build())
.withMemoryConfig(
HoodieMemoryConfig.newBuilder()
.withMaxMemoryMaxSize(
conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY) * 1024 * 1024L,
conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L
).build())
.forTable(conf.getString(FlinkOptions.TABLE_NAME))
.withStorageConfig(HoodieStorageConfig.newBuilder()
.logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 
1024 * 1024)
.logFileMaxSize(conf.getLong(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024)
.parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * 1024 
* 1024)
.parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 * 
1024)
.parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 
1024 * 1024L)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
.build())
.withLockConfig(HoodieLockConfig.newBuilder()
.withLockProvider(FileSystemBasedLockProvider.class)
.withLockWaitTimeInMillis(2000L) // 2s
.withFileSystemLockExpire(1) // 1 minute
.withClientNumRetries(30)
.withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf))
.build())
.withPayloadConfig(getPayloadConfig(conf))
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded 
timeline service singleton
.withAutoCommit(false)
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
.withProps(flinkConf2TypedProperties(conf))
.withSchema(getSourceSchema(conf).toString());

{code}

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to