[ https://issues.apache.org/jira/browse/HUDI-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ethan Guo updated HUDI-4714: ---------------------------- Component/s: flink > HoodieFlinkWriteClient can't load callback config to HoodieWriteConfig > ---------------------------------------------------------------------- > > Key: HUDI-4714 > URL: https://issues.apache.org/jira/browse/HUDI-4714 > Project: Apache Hudi > Issue Type: Bug > Components: flink > Reporter: yuemeng > Assignee: yuemeng > Priority: Major > Labels: pull-request-available > > Currently, it doesn't load callback config to write config when call > StreamUtil's getHoodieClientConfig method > So In hoodie Flink write client ,callback never worked > {code} > HoodieWriteConfig.Builder builder = > HoodieWriteConfig.newBuilder() > .withEngineType(EngineType.FLINK) > .withPath(conf.getString(FlinkOptions.PATH)) > .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true) > .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf)) > .withClusteringConfig( > HoodieClusteringConfig.newBuilder() > .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED)) > .withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS)) > .withClusteringPlanPartitionFilterMode( > ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME))) > .withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS)) > .withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS)) > .withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES)) > .withClusteringPlanSmallFileLimit(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT) > * 1024 * 1024L) > .withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST)) > .withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS)) > .build()) > .withCleanConfig(HoodieCleanConfig.newBuilder() > .withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) > .retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS)) > .cleanerNumHoursRetained(conf.getInteger(FlinkOptions.CLEAN_RETAIN_HOURS)) > .retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS)) > // override and hardcode to 20, > // actually Flink cleaning is always with parallelism 1 now > .withCleanerParallelism(20) > .withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY))) > .build()) > .withArchivalConfig(HoodieArchivalConfig.newBuilder() > .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), > conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)) > .build()) > .withCompactionConfig(HoodieCompactionConfig.newBuilder() > .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO)) > .withInlineCompactionTriggerStrategy( > CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT))) > .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS)) > .withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS)) > .build()) > .withMemoryConfig( > HoodieMemoryConfig.newBuilder() > .withMaxMemoryMaxSize( > conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY) * 1024 * 1024L, > conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L > ).build()) > .forTable(conf.getString(FlinkOptions.TABLE_NAME)) > .withStorageConfig(HoodieStorageConfig.newBuilder() > .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * > 1024 * 1024) > .logFileMaxSize(conf.getLong(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024) > .parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * > 1024 * 1024) > .parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 > * 1024) > .parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) > * 1024 * 1024L) > .build()) > .withMetadataConfig(HoodieMetadataConfig.newBuilder() > .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED)) > .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS)) > .build()) > .withLockConfig(HoodieLockConfig.newBuilder() > .withLockProvider(FileSystemBasedLockProvider.class) > .withLockWaitTimeInMillis(2000L) // 2s > .withFileSystemLockExpire(1) // 1 minute > .withClientNumRetries(30) > .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf)) > .build()) > .withPayloadConfig(getPayloadConfig(conf)) > .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService) > .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded > timeline service singleton > .withAutoCommit(false) > .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) > .withProps(flinkConf2TypedProperties(conf)) > .withSchema(getSourceSchema(conf).toString()); > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)