hudi-agent commented on code in PR #18897:
URL: https://github.com/apache/hudi/pull/18897#discussion_r3346919808


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexRemotePartitioner.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.RemotePartitionHelper;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
+import org.apache.hudi.util.ViewStorageProperties;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Bucket index input partitioner backed by the embedded timeline service.
+ *
+ * @param <T> The type of object to hash
+ */
+public class BucketIndexRemotePartitioner<T extends HoodieKey> implements 
Partitioner<T> {
+
+  private final Configuration conf;
+  private final String indexKeyFields;
+  private final NumBucketsFunction numBucketsFunction;
+
+  private transient RemotePartitionHelper remotePartitionHelper;
+
+  public BucketIndexRemotePartitioner(Configuration conf, String 
indexKeyFields) {
+    this.conf = conf;
+    this.indexKeyFields = indexKeyFields;
+    this.numBucketsFunction = new 
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
+        conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE), 
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
+  }
+
+  @Override
+  public int partition(T key, int numPartitions) {
+    String partitionPath = normalizePartitionPath(key.getPartitionPath());
+    int numBuckets = numBucketsFunction.getNumBuckets(partitionPath);
+    int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(), 
indexKeyFields, numBuckets);
+    return getRemotePartition(getRemotePartitionHelper(), numBucketsFunction, 
partitionPath, curBucket, numPartitions);
+  }
+
+  public static int getRemotePartition(
+      RemotePartitionHelper remotePartitionHelper,
+      NumBucketsFunction numBucketsFunction,
+      String partitionPath,
+      int curBucket,
+      int numPartitions) {
+    String normalizedPartitionPath = normalizePartitionPath(partitionPath);
+    try {
+      int partition = remotePartitionHelper.getPartition(
+          numBucketsFunction.getNumBuckets(normalizedPartitionPath),
+          normalizedPartitionPath,
+          curBucket,
+          numPartitions);
+      if (partition < 0) {
+        throw new RuntimeException(
+            "Get remote partition succeeded, but the subtask id is negative: " 
+ partition);
+      }
+      return partition;
+    } catch (Exception e) {

Review Comment:
   🤖 nit: the broad `catch (Exception e)` also catches the `RuntimeException` 
thrown a few lines above for the negative partition case, so the eventual stack 
reads as "Get remote partition failed" caused by "Get remote partition 
succeeded, but the subtask id is negative" — contradictory and confusing. Could 
you move the negative-check throw outside the try (or catch a narrower 
exception)? Also, consider `HoodieException` over a bare `RuntimeException`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java:
##########
@@ -242,10 +242,10 @@ public static HoodieWriteConfig getHoodieClientConfig(
                 .build())
             .withIndexConfig(StreamerUtil.getIndexConfig(conf))
             .withPayloadConfig(getPayloadConfig(conf))
+            .withProps(flinkConf2TypedProperties(conf))

Review Comment:
   🤖 Moving `withProps(...)` before `withEmbeddedTimelineServerEnabled`, 
`withEmbeddedTimelineServerReuseEnabled`, and `withAllowOperationMetadataField` 
silently flips precedence: call-site flags now always override user-supplied 
props for all three keys. Forcing the timeline-server-enable flag is what makes 
the new integration test work even with `hoodie.embed.timeline.server=false`, 
but is the same implicit override intended for 
`hoodie.embed.timeline.server.reuse.enabled` and 
`hoodie.allow.operation.metadata.field` (via `CHANGELOG_ENABLED`)? Worth 
calling out as a behavior change in the PR description since it affects callers 
beyond just this feature.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexRemotePartitioner.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.RemotePartitionHelper;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
+import org.apache.hudi.util.ViewStorageProperties;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Bucket index input partitioner backed by the embedded timeline service.
+ *
+ * @param <T> The type of object to hash
+ */
+public class BucketIndexRemotePartitioner<T extends HoodieKey> implements 
Partitioner<T> {
+
+  private final Configuration conf;
+  private final String indexKeyFields;
+  private final NumBucketsFunction numBucketsFunction;
+
+  private transient RemotePartitionHelper remotePartitionHelper;
+
+  public BucketIndexRemotePartitioner(Configuration conf, String 
indexKeyFields) {
+    this.conf = conf;
+    this.indexKeyFields = indexKeyFields;
+    this.numBucketsFunction = new 
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
+        conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE), 
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
+  }
+
+  @Override
+  public int partition(T key, int numPartitions) {
+    String partitionPath = normalizePartitionPath(key.getPartitionPath());
+    int numBuckets = numBucketsFunction.getNumBuckets(partitionPath);
+    int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(), 
indexKeyFields, numBuckets);
+    return getRemotePartition(getRemotePartitionHelper(), numBucketsFunction, 
partitionPath, curBucket, numPartitions);
+  }
+
+  public static int getRemotePartition(
+      RemotePartitionHelper remotePartitionHelper,
+      NumBucketsFunction numBucketsFunction,
+      String partitionPath,
+      int curBucket,
+      int numPartitions) {
+    String normalizedPartitionPath = normalizePartitionPath(partitionPath);

Review Comment:
   🤖 nit: `partition()` already normalizes the path on line 54 before calling 
`getRemotePartition`, so this re-normalization is redundant on that path. Could 
you normalize once at the single entry point (the static helper) and drop it 
from `partition()`, or vice versa?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to