Toroidals commented on issue #10608:
URL: https://github.com/apache/hudi/issues/10608#issuecomment-1925610466

   > Only 1.0 release supports concurrent streaming writers.
   
   If the Flink checkpoint is closed, the writing can be done normally, but 
when the checkpoint is on, there will be an error. The requirement of writing 
to mor table by Flink is that the checkpoint must be on.
   
       public static void main(String[] args) throws Exception {
   
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   
           
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(Long.parseLong(confInfo.get("checkpoint_interval"))),
 CheckpointingMode.EXACTLY_ONCE);
           
env.getCheckpointConfig().setCheckpointTimeout(TimeUnit.SECONDS.toMillis(Long.parseLong(confInfo.get("checkpoint_timeout"))));
           
env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.parseInt(confInfo.get("checkpoint_max_concurrent")));
           
env.getCheckpointConfig().setCheckpointStorage(confInfo.get("checkpoint_path"));
           
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.parseInt(confInfo.get("checkpoint_failure_number")));
           env.setRestartStrategy(RestartStrategies.noRestart());
   
           EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new 
EmbeddedRocksDBStateBackend(true);
           
embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
           env.setStateBackend(embeddedRocksDBStateBackend);
   
           DataStreamSource<String> dataStreamSource = env.addSource();
   
           HoodiePipeline.Builder builder = 
HoodiePipeline.builder(infoMap.get("hudi_table_name"));
   
           Map<String, String> options = new HashMap<>();
           String hudiFieldMap = 
infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
           ArrayList<ArrayList<String>> fieldList = 
JSON.parseObject(hudiFieldMap, new 
TypeReference<ArrayList<ArrayList<String>>>() {
           });
           for (ArrayList<String> columnList : fieldList) {
               builder.column("`" + columnList.get(0) + "` " + 
columnList.get(1));
           }
           builder.column("_flink_cdc_connector string");
           builder.column("_flink_cdc_db string");
           builder.column("_flink_cdc_table string");
           builder.column("_flink_cdc_op string");
           builder.column("_flink_cdc_ts_ms timestamp");
   
           builder.pk(infoMap.get("hudi_primary_key"));
   
           options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
           options.put(FlinkOptions.DATABASE_NAME.key(), 
infoMap.get("hudi_database_name"));
           options.put(FlinkOptions.TABLE_NAME.key(), 
infoMap.get("hudi_table_name"));
   
           options.put(FlinkOptions.PRE_COMBINE.key(), "true");
           options.put(FlinkOptions.PRECOMBINE_FIELD.key(), 
infoMap.get("hudi_precombine_field"));
   
           options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
           options.put(FlinkOptions.INDEX_TYPE.key(), 
HoodieIndex.IndexType.BUCKET.name());
   
           options.put(FlinkOptions.WRITE_TASKS.key(), 
infoMap.get("hudi_write_tasks"));
   
           options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), 
bucketAssignTasks);
   
           options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), 
infoMap.get("hudi_bucket_index_num_buckets"));
           options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), 
infoMap.get("hudi_bucket_index_engine_type"));
   
   
           options.put(FlinkOptions.COMPACTION_TASKS.key(), compactionTasks);
           options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), 
infoMap.get("hudi_compaction_trigger_strategy"));
           options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 
infoMap.get("hudi_compaction_delta_commits"));
           options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), 
infoMap.get("hudi_compaction_delta_seconds"));
   
   
           options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), 
infoMap.get("hudi_hive_sync_enabled"));
           options.put(FlinkOptions.HIVE_SYNC_MODE.key(), 
infoMap.get("hudi_hive_sync_mode"));
           options.put(FlinkOptions.HIVE_SYNC_DB.key(), 
infoMap.get("hudi_hive_sync_db"));
           options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), 
infoMap.get("hudi_hive_sync_table"));
   
           options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), 
"_flink_cdc_table");
           options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), 
"_flink_cdc_table");
   
   
           options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
           options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), 
connectInfo.get("hive_metastore_url"));
           options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), 
connectInfo.get("conn_url"));
           options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
           options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");
   
           options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), 
infoMap.get("hudi_hive_sync_partition_fields"));
           options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), 
infoMap.get("hudi_hive_sync_partition_fields"));
   
           options.put(FlinkOptions.WRITE_RATE_LIMIT.key(), 
infoMap.get("hudi_write_rate_limit"));
   
           options.put(FlinkOptions.WRITE_CLIENT_ID.key(), 
String.valueOf(System.currentTimeMillis()));
   
           builder.options(options);
   
           builder.sink(dataStreamSource, false);
           env.execute("kafka-to-hudi");
       }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to