yuemeng created HUDI-4714: ----------------------------- Summary: HoodieFlinkWriteClient can't load callback config to HoodieWriteConfig Key: HUDI-4714 URL: https://issues.apache.org/jira/browse/HUDI-4714 Project: Apache Hudi Issue Type: Bug Reporter: yuemeng
Currently, it doesn't load callback config to write config when call StreamUtil's getHoodieClientConfig method So In hoodie Flink write client ,callback never worked {code} HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() .withEngineType(EngineType.FLINK) .withPath(conf.getString(FlinkOptions.PATH)) .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true) .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf)) .withClusteringConfig( HoodieClusteringConfig.newBuilder() .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED)) .withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS)) .withClusteringPlanPartitionFilterMode( ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME))) .withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS)) .withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS)) .withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES)) .withClusteringPlanSmallFileLimit(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT) * 1024 * 1024L) .withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST)) .withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS)) .build()) .withCleanConfig(HoodieCleanConfig.newBuilder() .withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) .retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS)) .cleanerNumHoursRetained(conf.getInteger(FlinkOptions.CLEAN_RETAIN_HOURS)) .retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS)) // override and hardcode to 20, // actually Flink cleaning is always with parallelism 1 now .withCleanerParallelism(20) .withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY))) .build()) .withArchivalConfig(HoodieArchivalConfig.newBuilder() .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)) .build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO)) .withInlineCompactionTriggerStrategy( CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT))) .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS)) .withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS)) .build()) .withMemoryConfig( HoodieMemoryConfig.newBuilder() .withMaxMemoryMaxSize( conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY) * 1024 * 1024L, conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L ).build()) .forTable(conf.getString(FlinkOptions.TABLE_NAME)) .withStorageConfig(HoodieStorageConfig.newBuilder() .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024) .logFileMaxSize(conf.getLong(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024) .parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * 1024 * 1024) .parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 * 1024) .parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 1024 * 1024L) .build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder() .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED)) .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS)) .build()) .withLockConfig(HoodieLockConfig.newBuilder() .withLockProvider(FileSystemBasedLockProvider.class) .withLockWaitTimeInMillis(2000L) // 2s .withFileSystemLockExpire(1) // 1 minute .withClientNumRetries(30) .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf)) .build()) .withPayloadConfig(getPayloadConfig(conf)) .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService) .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withAutoCommit(false) .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) .withProps(flinkConf2TypedProperties(conf)) .withSchema(getSourceSchema(conf).toString()); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)