Toroidals commented on issue #10608: URL: https://github.com/apache/hudi/issues/10608#issuecomment-1925610466
> Only 1.0 release supports concurrent streaming writers. If the Flink checkpoint is closed, the writing can be done normally, but when the checkpoint is on, there will be an error. The requirement of writing to mor table by Flink is that the checkpoint must be on. public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(TimeUnit.SECONDS.toMillis(Long.parseLong(confInfo.get("checkpoint_interval"))), CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(TimeUnit.SECONDS.toMillis(Long.parseLong(confInfo.get("checkpoint_timeout")))); env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.parseInt(confInfo.get("checkpoint_max_concurrent"))); env.getCheckpointConfig().setCheckpointStorage(confInfo.get("checkpoint_path")); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.parseInt(confInfo.get("checkpoint_failure_number"))); env.setRestartStrategy(RestartStrategies.noRestart()); EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM); env.setStateBackend(embeddedRocksDBStateBackend); DataStreamSource<String> dataStreamSource = env.addSource(); HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name")); Map<String, String> options = new HashMap<>(); String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT); ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() { }); for (ArrayList<String> columnList : fieldList) { builder.column("`" + columnList.get(0) + "` " + columnList.get(1)); } builder.column("_flink_cdc_connector string"); builder.column("_flink_cdc_db string"); builder.column("_flink_cdc_table string"); builder.column("_flink_cdc_op string"); builder.column("_flink_cdc_ts_ms timestamp"); builder.pk(infoMap.get("hudi_primary_key")); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name")); options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name")); options.put(FlinkOptions.PRE_COMBINE.key(), "true"); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), infoMap.get("hudi_precombine_field")); options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true"); options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name()); options.put(FlinkOptions.WRITE_TASKS.key(), infoMap.get("hudi_write_tasks")); options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), bucketAssignTasks); options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), infoMap.get("hudi_bucket_index_num_buckets")); options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), infoMap.get("hudi_bucket_index_engine_type")); options.put(FlinkOptions.COMPACTION_TASKS.key(), compactionTasks); options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), infoMap.get("hudi_compaction_trigger_strategy")); options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), infoMap.get("hudi_compaction_delta_commits")); options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), infoMap.get("hudi_compaction_delta_seconds")); options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), infoMap.get("hudi_hive_sync_enabled")); options.put(FlinkOptions.HIVE_SYNC_MODE.key(), infoMap.get("hudi_hive_sync_mode")); options.put(FlinkOptions.HIVE_SYNC_DB.key(), infoMap.get("hudi_hive_sync_db")); options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), infoMap.get("hudi_hive_sync_table")); options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "_flink_cdc_table"); options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "_flink_cdc_table"); options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf"); options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), connectInfo.get("hive_metastore_url")); options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), connectInfo.get("conn_url")); options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true"); options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true"); options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), infoMap.get("hudi_hive_sync_partition_fields")); options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), infoMap.get("hudi_hive_sync_partition_fields")); options.put(FlinkOptions.WRITE_RATE_LIMIT.key(), infoMap.get("hudi_write_rate_limit")); options.put(FlinkOptions.WRITE_CLIENT_ID.key(), String.valueOf(System.currentTimeMillis())); builder.options(options); builder.sink(dataStreamSource, false); env.execute("kafka-to-hudi"); } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org