[GitHub] [hudi] SteNicholas commented on a diff in pull request #8680: [HUDI-6192] Make HoodieFlinkCompactor and HoodieFlinkClusteringJob service mode as long runnning streaming job
SteNicholas commented on code in PR #8680: URL: https://github.com/apache/hudi/pull/8680#discussion_r1197783334 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java: ## @@ -259,93 +257,105 @@ private void cluster() throws Exception { table.getMetaClient().reloadActiveTimeline(); } - // fetch the instant based on the configured execution sequence - List instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()); - if (instants.isEmpty()) { -// do nothing. -LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); -return; - } - - final HoodieInstant clusteringInstant; - if (cfg.clusteringInstantTime != null) { -clusteringInstant = instants.stream() -.filter(i -> i.getTimestamp().equals(cfg.clusteringInstantTime)) -.findFirst() -.orElseThrow(() -> new HoodieException("Clustering instant [" + cfg.clusteringInstantTime + "] not found")); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + + int clusteringParallelism; + DataStream planStream; + HoodieInstant clusteringInstant = null; + if (serviceMode) { +clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS); +planStream = env.addSource(new ServiceSourceFunction(conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL))) +.name("clustering_service_source") +.uid("uid_clustering_service_source") +.setParallelism(1) +.transform("cluster_plan_generate", TypeInformation.of(ClusteringPlanEvent.class), new ClusteringPlanOperator(conf)) +.setParallelism(1); } else { -// check for inflight clustering plans and roll them back if required -clusteringInstant = -CompactionUtil.isLIFO(cfg.clusteringSeq) ? instants.get(instants.size() - 1) : instants.get(0); - } +// fetch the instant based on the configured execution sequence +List instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()); +if (instants.isEmpty()) { Review Comment: @danny0405, I have got the point that shares `#cluster` in the streaming source function and starts a timeline service. I will update the service source function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8680: [HUDI-6192] Make HoodieFlinkCompactor and HoodieFlinkClusteringJob service mode as long runnning streaming job
SteNicholas commented on code in PR #8680: URL: https://github.com/apache/hudi/pull/8680#discussion_r1197783334 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java: ## @@ -259,93 +257,105 @@ private void cluster() throws Exception { table.getMetaClient().reloadActiveTimeline(); } - // fetch the instant based on the configured execution sequence - List instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()); - if (instants.isEmpty()) { -// do nothing. -LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); -return; - } - - final HoodieInstant clusteringInstant; - if (cfg.clusteringInstantTime != null) { -clusteringInstant = instants.stream() -.filter(i -> i.getTimestamp().equals(cfg.clusteringInstantTime)) -.findFirst() -.orElseThrow(() -> new HoodieException("Clustering instant [" + cfg.clusteringInstantTime + "] not found")); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + + int clusteringParallelism; + DataStream planStream; + HoodieInstant clusteringInstant = null; + if (serviceMode) { +clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS); +planStream = env.addSource(new ServiceSourceFunction(conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL))) +.name("clustering_service_source") +.uid("uid_clustering_service_source") +.setParallelism(1) +.transform("cluster_plan_generate", TypeInformation.of(ClusteringPlanEvent.class), new ClusteringPlanOperator(conf)) +.setParallelism(1); } else { -// check for inflight clustering plans and roll them back if required -clusteringInstant = -CompactionUtil.isLIFO(cfg.clusteringSeq) ? instants.get(instants.size() - 1) : instants.get(0); - } +// fetch the instant based on the configured execution sequence +List instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()); +if (instants.isEmpty()) { Review Comment: @danny0405, I have got the point that shares `#cluster` in the streaming source function and starts a timeline service. I will update the service source function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8680: [HUDI-6192] Make HoodieFlinkCompactor and HoodieFlinkClusteringJob service mode as long runnning streaming job
SteNicholas commented on code in PR #8680: URL: https://github.com/apache/hudi/pull/8680#discussion_r1191890148 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java: ## @@ -259,93 +257,105 @@ private void cluster() throws Exception { table.getMetaClient().reloadActiveTimeline(); } - // fetch the instant based on the configured execution sequence - List instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()); - if (instants.isEmpty()) { -// do nothing. -LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); -return; - } - - final HoodieInstant clusteringInstant; - if (cfg.clusteringInstantTime != null) { -clusteringInstant = instants.stream() -.filter(i -> i.getTimestamp().equals(cfg.clusteringInstantTime)) -.findFirst() -.orElseThrow(() -> new HoodieException("Clustering instant [" + cfg.clusteringInstantTime + "] not found")); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + + int clusteringParallelism; + DataStream planStream; + HoodieInstant clusteringInstant = null; + if (serviceMode) { +clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS); +planStream = env.addSource(new ServiceSourceFunction(conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL))) +.name("clustering_service_source") +.uid("uid_clustering_service_source") +.setParallelism(1) +.transform("cluster_plan_generate", TypeInformation.of(ClusteringPlanEvent.class), new ClusteringPlanOperator(conf)) +.setParallelism(1); } else { -// check for inflight clustering plans and roll them back if required -clusteringInstant = -CompactionUtil.isLIFO(cfg.clusteringSeq) ? instants.get(instants.size() - 1) : instants.get(0); - } +// fetch the instant based on the configured execution sequence +List instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()); +if (instants.isEmpty()) { Review Comment: @danny0405, my idea is to keep the same clustering execution pipeline of inline clustering, which at least ensures that the problem of this streaming job is the same as that of inline clustering. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] SteNicholas commented on a diff in pull request #8680: [HUDI-6192] Make HoodieFlinkCompactor and HoodieFlinkClusteringJob service mode as long runnning streaming job
SteNicholas commented on code in PR #8680: URL: https://github.com/apache/hudi/pull/8680#discussion_r1191888621 ## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java: ## @@ -259,93 +257,105 @@ private void cluster() throws Exception { table.getMetaClient().reloadActiveTimeline(); } - // fetch the instant based on the configured execution sequence - List instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()); - if (instants.isEmpty()) { -// do nothing. -LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); -return; - } - - final HoodieInstant clusteringInstant; - if (cfg.clusteringInstantTime != null) { -clusteringInstant = instants.stream() -.filter(i -> i.getTimestamp().equals(cfg.clusteringInstantTime)) -.findFirst() -.orElseThrow(() -> new HoodieException("Clustering instant [" + cfg.clusteringInstantTime + "] not found")); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + + int clusteringParallelism; + DataStream planStream; + HoodieInstant clusteringInstant = null; + if (serviceMode) { +clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS); +planStream = env.addSource(new ServiceSourceFunction(conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL))) +.name("clustering_service_source") Review Comment: @danny0405, I don't think we need exactly-once semantic because this only requires to add a long running source to create the clustering execution pipeline. The semantic is guaranteed via `ClusteringPlanOperator`, `ClusteringOperator` and `ClusteringCommitSink`. Meanwhile, I have tested the streaming job in internal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org