hudi-agent commented on code in PR #18897: URL: https://github.com/apache/hudi/pull/18897#discussion_r3346919808
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexRemotePartitioner.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.partitioner; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.util.RemotePartitionHelper; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.index.bucket.partition.NumBucketsFunction; +import org.apache.hudi.util.ViewStorageProperties; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.Configuration; + +/** + * Bucket index input partitioner backed by the embedded timeline service. + * + * @param <T> The type of object to hash + */ +public class BucketIndexRemotePartitioner<T extends HoodieKey> implements Partitioner<T> { + + private final Configuration conf; + private final String indexKeyFields; + private final NumBucketsFunction numBucketsFunction; + + private transient RemotePartitionHelper remotePartitionHelper; + + public BucketIndexRemotePartitioner(Configuration conf, String indexKeyFields) { + this.conf = conf; + this.indexKeyFields = indexKeyFields; + this.numBucketsFunction = new NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS), + conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE), conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)); + } + + @Override + public int partition(T key, int numPartitions) { + String partitionPath = normalizePartitionPath(key.getPartitionPath()); + int numBuckets = numBucketsFunction.getNumBuckets(partitionPath); + int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(), indexKeyFields, numBuckets); + return getRemotePartition(getRemotePartitionHelper(), numBucketsFunction, partitionPath, curBucket, numPartitions); + } + + public static int getRemotePartition( + RemotePartitionHelper remotePartitionHelper, + NumBucketsFunction numBucketsFunction, + String partitionPath, + int curBucket, + int numPartitions) { + String normalizedPartitionPath = normalizePartitionPath(partitionPath); + try { + int partition = remotePartitionHelper.getPartition( + numBucketsFunction.getNumBuckets(normalizedPartitionPath), + normalizedPartitionPath, + curBucket, + numPartitions); + if (partition < 0) { + throw new RuntimeException( + "Get remote partition succeeded, but the subtask id is negative: " + partition); + } + return partition; + } catch (Exception e) { Review Comment: 🤖 nit: the broad `catch (Exception e)` also catches the `RuntimeException` thrown a few lines above for the negative partition case, so the eventual stack reads as "Get remote partition failed" caused by "Get remote partition succeeded, but the subtask id is negative" — contradictory and confusing. Could you move the negative-check throw outside the try (or catch a narrower exception)? Also, consider `HoodieException` over a bare `RuntimeException`. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java: ########## @@ -242,10 +242,10 @@ public static HoodieWriteConfig getHoodieClientConfig( .build()) .withIndexConfig(StreamerUtil.getIndexConfig(conf)) .withPayloadConfig(getPayloadConfig(conf)) + .withProps(flinkConf2TypedProperties(conf)) Review Comment: 🤖 Moving `withProps(...)` before `withEmbeddedTimelineServerEnabled`, `withEmbeddedTimelineServerReuseEnabled`, and `withAllowOperationMetadataField` silently flips precedence: call-site flags now always override user-supplied props for all three keys. Forcing the timeline-server-enable flag is what makes the new integration test work even with `hoodie.embed.timeline.server=false`, but is the same implicit override intended for `hoodie.embed.timeline.server.reuse.enabled` and `hoodie.allow.operation.metadata.field` (via `CHANGELOG_ENABLED`)? Worth calling out as a behavior change in the PR description since it affects callers beyond just this feature. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexRemotePartitioner.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.partitioner; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.util.RemotePartitionHelper; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.index.bucket.partition.NumBucketsFunction; +import org.apache.hudi.util.ViewStorageProperties; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.configuration.Configuration; + +/** + * Bucket index input partitioner backed by the embedded timeline service. + * + * @param <T> The type of object to hash + */ +public class BucketIndexRemotePartitioner<T extends HoodieKey> implements Partitioner<T> { + + private final Configuration conf; + private final String indexKeyFields; + private final NumBucketsFunction numBucketsFunction; + + private transient RemotePartitionHelper remotePartitionHelper; + + public BucketIndexRemotePartitioner(Configuration conf, String indexKeyFields) { + this.conf = conf; + this.indexKeyFields = indexKeyFields; + this.numBucketsFunction = new NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS), + conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE), conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)); + } + + @Override + public int partition(T key, int numPartitions) { + String partitionPath = normalizePartitionPath(key.getPartitionPath()); + int numBuckets = numBucketsFunction.getNumBuckets(partitionPath); + int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(), indexKeyFields, numBuckets); + return getRemotePartition(getRemotePartitionHelper(), numBucketsFunction, partitionPath, curBucket, numPartitions); + } + + public static int getRemotePartition( + RemotePartitionHelper remotePartitionHelper, + NumBucketsFunction numBucketsFunction, + String partitionPath, + int curBucket, + int numPartitions) { + String normalizedPartitionPath = normalizePartitionPath(partitionPath); Review Comment: 🤖 nit: `partition()` already normalizes the path on line 54 before calling `getRemotePartition`, so this re-normalization is redundant on that path. Could you normalize once at the single entry point (the static helper) and drop it from `partition()`, or vice versa? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
