This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 8ba01dc70a718ddab3044b45f96a545f0aae4084 Author: XuQianJin-Stars <forwar...@apache.com> AuthorDate: Fri Oct 28 12:33:22 2022 +0800 [HUDI-2624] Implement Non Index type for HUDI --- .../org/apache/hudi/config/HoodieIndexConfig.java | 43 ++++ .../org/apache/hudi/config/HoodieWriteConfig.java | 12 + .../java/org/apache/hudi/index/HoodieIndex.java | 2 +- .../java/org/apache/hudi/io/HoodieMergeHandle.java | 3 +- .../apache/hudi/keygen/EmptyAvroKeyGenerator.java | 65 +++++ .../hudi/table/action/commit/BucketInfo.java | 4 + .../hudi/table/action/commit/BucketType.java | 2 +- .../apache/hudi/index/FlinkHoodieIndexFactory.java | 2 + .../org/apache/hudi/index/FlinkHoodieNonIndex.java | 65 +++++ .../apache/hudi/index/SparkHoodieIndexFactory.java | 3 + .../hudi/index/nonindex/SparkHoodieNonIndex.java | 73 ++++++ .../hudi/io/storage/row/HoodieRowCreateHandle.java | 5 +- .../org/apache/hudi/keygen/EmptyKeyGenerator.java | 80 +++++++ .../commit/BaseSparkCommitActionExecutor.java | 17 ++ .../table/action/commit/UpsertPartitioner.java | 35 ++- .../org/apache/hudi/common/model/FileSlice.java | 13 + .../org/apache/hudi/common/model/HoodieKey.java | 2 + .../table/log/HoodieMergedLogRecordScanner.java | 3 +- .../apache/hudi/configuration/OptionsResolver.java | 4 + .../org/apache/hudi/sink/StreamWriteFunction.java | 32 ++- .../hudi/sink/StreamWriteOperatorCoordinator.java | 5 + .../sink/nonindex/NonIndexStreamWriteFunction.java | 265 +++++++++++++++++++++ .../sink/nonindex/NonIndexStreamWriteOperator.java | 25 +- .../java/org/apache/hudi/sink/utils/Pipelines.java | 7 + .../org/apache/hudi/table/HoodieTableFactory.java | 12 + .../org/apache/hudi/sink/TestWriteMergeOnRead.java | 54 +++++ .../hudi/sink/utils/InsertFunctionWrapper.java | 6 + .../sink/utils/StreamWriteFunctionWrapper.java | 23 +- .../hudi/sink/utils/TestFunctionWrapper.java | 6 + .../org/apache/hudi/sink/utils/TestWriteBase.java | 48 ++++ .../test/java/org/apache/hudi/utils/TestData.java | 34 +++ .../test/scala/org/apache/hudi/TestNonIndex.scala | 110 +++++++++ 32 files changed, 1030 insertions(+), 30 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index ee5b83a43a..b5edaf4abc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -270,6 +270,34 @@ public class HoodieIndexConfig extends HoodieConfig { .withDocumentation("Index key. It is used to index the record and find its file group. " + "If not set, use record key field as default"); + /** + * public static final String NON_INDEX_PARTITION_FILE_GROUP_CACHE_INTERVAL_MINUTE = "hoodie.non.index.partition.file.group.cache.interval.minute"; //minutes + * public static final String DEFAULT_NON_INDEX_PARTITION_FILE_GROUP_CACHE_INTERVAL_MINUTE = "1800"; + * + * public static final String NON_INDEX_PARTITION_FILE_GROUP_STORAGE_TYPE = "hoodie.non.index.partition.file.group.storage.type"; + * public static final String DEFAULT_NON_INDEX_PARTITION_FILE_GROUP_CACHE_STORAGE_TYPE = "IN_MEMORY"; + * + * public static final String NON_INDEX_PARTITION_FILE_GROUP_CACHE_SIZE = "hoodie.non.index.partition.file.group.cache.size"; //byte + * public static final String DEFAULT_NON_INDEX_PARTITION_FILE_GROUP_CACHE_SIZE = String.valueOf(1048576000); + */ + public static final ConfigProperty<Integer> NON_INDEX_PARTITION_FILE_GROUP_CACHE_INTERVAL_MINUTE = ConfigProperty + .key("hoodie.non.index.partition.file.group.cache.interval.minute") + .defaultValue(1800) + .withDocumentation("Only applies if index type is BUCKET. Determine the number of buckets in the hudi table, " + + "and each partition is divided to N buckets."); + + public static final ConfigProperty<String> NON_INDEX_PARTITION_FILE_GROUP_STORAGE_TYPE = ConfigProperty + .key("hoodie.non.index.partition.file.group.storage.type") + .defaultValue("IN_MEMORY") + .withDocumentation("Only applies if index type is BUCKET. Determine the number of buckets in the hudi table, " + + "and each partition is divided to N buckets."); + + public static final ConfigProperty<Long> NON_INDEX_PARTITION_FILE_GROUP_CACHE_SIZE = ConfigProperty + .key("hoodie.non.index.partition.file.group.cache.size") + .defaultValue(1024 * 1024 * 1024L) + .withDocumentation("Only applies if index type is BUCKET. Determine the number of buckets in the hudi table, " + + "and each partition is divided to N buckets."); + /** * Deprecated configs. These are now part of {@link HoodieHBaseIndexConfig}. */ @@ -606,6 +634,21 @@ public class HoodieIndexConfig extends HoodieConfig { return this; } + public Builder withPartitionFileGroupCacheInterval(int cacheInterval) { + hoodieIndexConfig.setValue(NON_INDEX_PARTITION_FILE_GROUP_CACHE_INTERVAL_MINUTE, String.valueOf(cacheInterval)); + return this; + } + + public Builder withPartitionFileGroupStorageType(String storageType) { + hoodieIndexConfig.setValue(NON_INDEX_PARTITION_FILE_GROUP_STORAGE_TYPE, storageType); + return this; + } + + public Builder withPartitionFileGroupCacheSize(long size) { + hoodieIndexConfig.setValue(NON_INDEX_PARTITION_FILE_GROUP_CACHE_SIZE, String.valueOf(size)); + return this; + } + public HoodieIndexConfig build() { hoodieIndexConfig.setDefaultValue(INDEX_TYPE, getDefaultIndexType(engineType)); hoodieIndexConfig.setDefaults(HoodieIndexConfig.class.getName()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9610ad382b..034519e64a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1673,6 +1673,18 @@ public class HoodieWriteConfig extends HoodieConfig { return getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD); } + public int getNonIndexPartitionFileGroupCacheIntervalMinute() { + return getIntOrDefault(HoodieIndexConfig.NON_INDEX_PARTITION_FILE_GROUP_CACHE_INTERVAL_MINUTE); + } + + public String getNonIndexPartitionFileGroupStorageType() { + return getString(HoodieIndexConfig.NON_INDEX_PARTITION_FILE_GROUP_STORAGE_TYPE); + } + + public long getNonIndexPartitionFileGroupCacheSize() { + return getLong(HoodieIndexConfig.NON_INDEX_PARTITION_FILE_GROUP_CACHE_SIZE); + } + /** * storage properties. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java index 7ebd94748a..074a90fe5d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java @@ -141,7 +141,7 @@ public abstract class HoodieIndex<I, O> implements Serializable { } public enum IndexType { - HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET, FLINK_STATE + HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET, FLINK_STATE, NON_INDEX } public enum BucketIndexEngineType { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 82c6de5761..e629c6a51e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -333,8 +333,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H */ public void write(GenericRecord oldRecord) { String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt); + boolean isEmptyRecordKey = key.equals(HoodieKey.EMPTY_RECORD_KEY); boolean copyOldRecord = true; - if (keyToNewRecords.containsKey(key)) { + if (!isEmptyRecordKey && keyToNewRecords.containsKey(key)) { // If we have duplicate records that we are updating, then the hoodie record will be deflated after // writing the first record. So make a copy of the record to be merged HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/EmptyAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/EmptyAvroKeyGenerator.java new file mode 100644 index 0000000000..01536f95e4 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/EmptyAvroKeyGenerator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.generic.GenericRecord; + +import org.apache.hudi.common.model.HoodieKey; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import java.util.Collections; +import java.util.List; +import java.util.Arrays; +import java.util.stream.Collectors; + +/** + * Avro key generator for empty record key Hudi tables. + */ +public class EmptyAvroKeyGenerator extends BaseKeyGenerator { + + private static final Logger LOG = LogManager.getLogger(EmptyAvroKeyGenerator.class); + public static final String EMPTY_RECORD_KEY = HoodieKey.EMPTY_RECORD_KEY; + private static final List<String> EMPTY_RECORD_KEY_FIELD_LIST = Collections.emptyList(); + + public EmptyAvroKeyGenerator(TypedProperties props) { + super(props); + if (config.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())) { + LOG.warn(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() + " will be ignored while using " + + this.getClass().getSimpleName()); + } + this.recordKeyFields = EMPTY_RECORD_KEY_FIELD_LIST; + this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) + .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); + } + + @Override + public String getRecordKey(GenericRecord record) { + return EMPTY_RECORD_KEY; + } + + @Override + public String getPartitionPath(GenericRecord record) { + return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath, isConsistentLogicalTimestampEnabled()); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java index 6547da6425..6db5b270b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java @@ -48,6 +48,10 @@ public class BucketInfo implements Serializable { return partitionPath; } + public void setBucketType(BucketType bucketType) { + this.bucketType = bucketType; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("BucketInfo {"); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java index 70ee473d24..b32d32db40 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java @@ -19,5 +19,5 @@ package org.apache.hudi.table.action.commit; public enum BucketType { - UPDATE, INSERT + UPDATE, INSERT, APPEND } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java index b10014b918..ba2f5a39fd 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java @@ -60,6 +60,8 @@ public final class FlinkHoodieIndexFactory { return new HoodieGlobalSimpleIndex(config, Option.empty()); case BUCKET: return new HoodieSimpleBucketIndex(config); + case NON_INDEX: + return new FlinkHoodieNonIndex(config); default: throw new HoodieIndexException("Unsupported index type " + config.getIndexType()); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieNonIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieNonIndex.java new file mode 100644 index 0000000000..a7eb2c2262 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieNonIndex.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index; + +import java.util.List; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; + +public class FlinkHoodieNonIndex<T extends HoodieRecordPayload> extends FlinkHoodieIndex<T> { + + public FlinkHoodieNonIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public boolean rollbackCommit(String instantTime) { + return true; + } + + @Override + public boolean isGlobal() { + throw new UnsupportedOperationException("Unsupport operation."); + } + + @Override + public boolean canIndexLogFiles() { + return false; + } + + @Override + public boolean isImplicitWithStorage() { + return true; + } + + @Override + public List<WriteStatus> updateLocation(List<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { + return null; + } + + @Override + public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> hoodieRecords, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { + return null; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java index 4525490c8d..4f07a7f2aa 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java @@ -32,6 +32,7 @@ import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex; import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex; import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex; import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex; +import org.apache.hudi.index.nonindex.SparkHoodieNonIndex; import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex; import org.apache.hudi.index.simple.HoodieSimpleIndex; import org.apache.hudi.keygen.BaseKeyGenerator; @@ -74,6 +75,8 @@ public final class SparkHoodieIndexFactory { default: throw new HoodieIndexException("Unknown bucket index engine type: " + config.getBucketIndexEngineType()); } + case NON_INDEX: + return new SparkHoodieNonIndex<>(config); default: throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/nonindex/SparkHoodieNonIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/nonindex/SparkHoodieNonIndex.java new file mode 100644 index 0000000000..d239b7d6f7 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/nonindex/SparkHoodieNonIndex.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.nonindex; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.index.SparkHoodieIndex; +import org.apache.hudi.table.HoodieTable; + +import org.apache.spark.api.java.JavaRDD; + +public class SparkHoodieNonIndex<T extends HoodieRecordPayload<T>> extends SparkHoodieIndex<T> { + + public SparkHoodieNonIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, + HoodieEngineContext context, + HoodieTable hoodieTable) + throws HoodieIndexException { + return writeStatusRDD; + } + + @Override + public boolean rollbackCommit(String instantTime) { + return true; + } + + @Override + public boolean isGlobal() { + return false; + } + + @Override + public boolean canIndexLogFiles() { + return false; + } + + @Override + public boolean isImplicitWithStorage() { + return true; + } + + @Override + public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> records, + HoodieEngineContext context, + HoodieTable hoodieTable) + throws HoodieIndexException { + throw new UnsupportedOperationException("Unsupport operation."); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java index 9da04f7260..da63a0f2c5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java @@ -169,7 +169,10 @@ public class HoodieRowCreateHandle implements Serializable { // and [[String]]) // - Repeated computations (for ex, converting file-path to [[UTF8String]] over and // over again) - UTF8String recordKey = row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD); + UTF8String recordKey = null; + if (!row.isNullAt(HoodieRecord.RECORD_KEY_META_FIELD_ORD)) { + recordKey = row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD); + } UTF8String partitionPath = row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_ORD); // This is the only meta-field that is generated dynamically, hence conversion b/w // [[String]] and [[UTF8String]] is unavoidable if preserveHoodieMetadata is false diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/EmptyKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/EmptyKeyGenerator.java new file mode 100644 index 0000000000..9e4090a537 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/EmptyKeyGenerator.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +import java.util.Arrays; +import java.util.stream.Collectors; + +/** + * Key generator for Hudi tables without record key. + */ +public class EmptyKeyGenerator extends BuiltinKeyGenerator { + + public static final String EMPTY_RECORD_KEY = EmptyAvroKeyGenerator.EMPTY_RECORD_KEY; + + private final EmptyAvroKeyGenerator emptyAvroKeyGenerator; + + public EmptyKeyGenerator(TypedProperties config) { + super(config); + this.emptyAvroKeyGenerator = new EmptyAvroKeyGenerator(config); + this.recordKeyFields = emptyAvroKeyGenerator.getRecordKeyFieldNames(); + this.partitionPathFields = Arrays.stream(config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) + .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); + } + + @Override + public String getRecordKey(GenericRecord record) { + return EMPTY_RECORD_KEY; + } + + @Override + public String getRecordKey(Row row) { + return EMPTY_RECORD_KEY; + } + + @Override + public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { + return combineCompositeRecordKeyUnsafe(EMPTY_RECORD_KEY); + } + + @Override + public String getPartitionPath(GenericRecord record) { + return emptyAvroKeyGenerator.getPartitionPath(record); + } + + @Override + public String getPartitionPath(Row row) { + tryInitRowAccessor(row.schema()); + return combinePartitionPath(rowAccessor.getRecordPartitionPathValues(row)); + } + + @Override + public UTF8String getPartitionPath(InternalRow row, StructType schema) { + tryInitRowAccessor(schema); + return combinePartitionPathUnsafe(rowAccessor.getRecordPartitionPathValues(row)); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index f8e4b31ff6..bb9e00e74e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -45,6 +45,7 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.SparkLazyInsertIterable; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.HoodieConcatHandle; +import org.apache.hudi.io.HoodieAppendHandle; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieSortedMergeHandle; import org.apache.hudi.keygen.BaseKeyGenerator; @@ -320,6 +321,8 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa return handleInsert(binfo.fileIdPrefix, recordItr); } else if (btype.equals(BucketType.UPDATE)) { return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr); + } else if (btype.equals(BucketType.APPEND)) { + return handleAppend(binfo.partitionPath, binfo.fileIdPrefix, recordItr); } else { throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition); } @@ -335,6 +338,20 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa return handleUpsertPartition(instantTime, partition, recordItr, partitioner); } + @SuppressWarnings("unchecked") + protected Iterator<List<WriteStatus>> handleAppend(String partitionPath, String fileId, + Iterator<HoodieRecord<T>> recordItr) throws IOException { + // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records + if (!recordItr.hasNext()) { + LOG.info("Empty partition with fileId => " + fileId); + return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator(); + } + HoodieAppendHandle<?, ?, ?, ?> appendHandle = new HoodieAppendHandle<>(config, instantTime, + table, partitionPath, fileId, recordItr, taskContextSupplier); + appendHandle.doAppend(); + return Collections.singletonList(appendHandle.close()).iterator(); + } + @Override public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index c2f5a43066..66cef31ff4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.util.NumericUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.nonindex.SparkHoodieNonIndex; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadStat; @@ -84,6 +85,8 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo protected final HoodieWriteConfig config; + private int recordCnt; + public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext context, HoodieTable table, HoodieWriteConfig config) { super(profile, table); @@ -106,7 +109,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionStat.getKey(), new WorkloadStat()); for (Map.Entry<String, Pair<String, Long>> updateLocEntry : partitionStat.getValue().getUpdateLocationToCount().entrySet()) { - addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey()); + addNewBucket(partitionStat.getKey(), updateLocEntry.getKey()); if (profile.hasOutputWorkLoadStats()) { HoodieRecordLocation hoodieRecordLocation = new HoodieRecordLocation(updateLocEntry.getValue().getKey(), updateLocEntry.getKey()); outputWorkloadStats.addUpdates(hoodieRecordLocation, updateLocEntry.getValue().getValue()); @@ -118,10 +121,16 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo } } - private int addUpdateBucket(String partitionPath, String fileIdHint) { + private int addNewBucket(String partitionPath, String fileIdHint) { + BucketInfo bucketInfo; + if (table.getIndex() instanceof SparkHoodieNonIndex) { + bucketInfo = new BucketInfo(BucketType.APPEND, fileIdHint, partitionPath); + } else { + bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath); + } + int bucket = totalBuckets; updateLocationToBucket.put(fileIdHint, bucket); - BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; return bucket; @@ -196,7 +205,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo bucket = updateLocationToBucket.get(smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); } else { - bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); + bucket = addNewBucket(partitionPath, smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); } if (profile.hasOutputWorkLoadStats()) { @@ -336,12 +345,18 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo } else { String partitionPath = keyLocation._1().getPartitionPath(); List<InsertBucketCumulativeWeightPair> targetBuckets = partitionPathToInsertBucketInfos.get(partitionPath); - // pick the target bucket to use based on the weights. - final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts()); - final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey()); - final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts; - - int index = Collections.binarySearch(targetBuckets, new InsertBucketCumulativeWeightPair(new InsertBucket(), r)); + int index; + if (keyLocation._1.getRecordKey().equals(HoodieKey.EMPTY_RECORD_KEY)) { + // round robing + index = (++recordCnt) % targetBuckets.size(); + } else { + // pick the target bucket to use based on the weights. + final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts()); + final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey()); + final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts; + + index = Collections.binarySearch(targetBuckets, new InsertBucketCumulativeWeightPair(new InsertBucket(), r)); + } if (index >= 0) { return targetBuckets.get(index).getKey().bucketNumber; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java index 0fc580db0b..6cb1ed90a7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java @@ -116,6 +116,19 @@ public class FileSlice implements Serializable { return (baseFile == null) && (logFiles.isEmpty()); } + public long getFileGroupSize() { + long totalSize = 0; + if (baseFile != null) { + totalSize += baseFile.getFileSize(); + } + if (logFiles != null) { + for (HoodieLogFile logFile : logFiles) { + totalSize += logFile.getFileSize(); + } + } + return totalSize; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("FileSlice {"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java index 9030204099..68ae58e5f8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java @@ -29,6 +29,8 @@ import java.util.Objects; */ public class HoodieKey implements Serializable { + public static final String EMPTY_RECORD_KEY = "_hoodie_empty_record_key_"; + private String recordKey; private String partitionPath; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index e3d8554d00..5ef0a6821f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -77,6 +77,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader private long maxMemorySizeInBytes; // Stores the total time taken to perform reading and merging of log blocks private long totalTimeTakenToReadAndMergeBlocks; + private int emptyKeySuffix; @SuppressWarnings("unchecked") protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema, @@ -158,7 +159,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader } } else { // Put the record as is - records.put(key, hoodieRecord); + records.put(key.equals(HoodieKey.EMPTY_RECORD_KEY) ? key + (emptyKeySuffix++) : key, hoodieRecord); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 00ebf09426..72f8f39011 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -119,6 +119,10 @@ public class OptionsResolver { return conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name()); } + public static boolean isNonIndexType(Configuration conf) { + return conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.NON_INDEX.name()); + } + /** * Returns whether the source should emit changelog. * diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 2748af5290..a0f994f04a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -202,7 +202,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { * <p>A {@link HoodieRecord} was firstly transformed into a {@link DataItem} * for buffering, it then transforms back to the {@link HoodieRecord} before flushing. */ - private static class DataItem { + protected static class DataItem { private final String key; // record key private final String instant; // 'U' or 'I' private final HoodieRecordPayload<?> data; // record payload @@ -283,7 +283,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { * Tool to detect if to flush out the existing buffer. * Sampling the record to compute the size with 0.01 percentage. */ - private static class BufferSizeDetector { + protected static class BufferSizeDetector { private final Random random = new Random(47); private static final int DENOMINATOR = 100; @@ -292,11 +292,11 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { private long lastRecordSize = -1L; private long totalSize = 0L; - BufferSizeDetector(double batchSizeMb) { + public BufferSizeDetector(double batchSizeMb) { this.batchSizeBytes = batchSizeMb * 1024 * 1024; } - boolean detect(Object record) { + public boolean detect(Object record) { if (lastRecordSize == -1 || sampling()) { lastRecordSize = ObjectSizeCalculator.getObjectSize(record); } @@ -304,15 +304,29 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { return totalSize > this.batchSizeBytes; } - boolean sampling() { + public boolean detect(long recordSize) { + lastRecordSize = recordSize; + totalSize += lastRecordSize; + return totalSize > this.batchSizeBytes; + } + + public boolean sampling() { // 0.01 sampling percentage return random.nextInt(DENOMINATOR) == 1; } - void reset() { + public void reset() { this.lastRecordSize = -1L; this.totalSize = 0L; } + + public void setTotalSize(long totalSize) { + this.totalSize = totalSize; + } + + public long getLastRecordSize() { + return lastRecordSize; + } } /** @@ -381,7 +395,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { bucket.records.add(item); - boolean flushBucket = bucket.detector.detect(item); + boolean flushBucket = shouldFlushBucket(bucket.detector, item, value.getPartitionPath()); boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize); if (flushBucket) { if (flushBucket(bucket)) { @@ -484,4 +498,8 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { // blocks flushing until the coordinator starts a new instant this.confirming = true; } + + protected boolean shouldFlushBucket(BufferSizeDetector detector, DataItem item, String partitionPath) { + return detector.detect(item); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index c87d5b2443..670748b90f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -562,6 +562,11 @@ public class StreamWriteOperatorCoordinator return instant; } + @VisibleForTesting + public HoodieFlinkWriteClient getWriteClient() { + return writeClient; + } + @VisibleForTesting public Context getContext() { return context; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/nonindex/NonIndexStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/nonindex/NonIndexStreamWriteFunction.java new file mode 100644 index 0000000000..517234fc92 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/nonindex/NonIndexStreamWriteFunction.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.nonindex; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.sink.StreamWriteFunction; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.util.StreamerUtil; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Used for MOR table if index type is non_index. + */ +public class NonIndexStreamWriteFunction<I> + extends StreamWriteFunction<I> { + + private static final Logger LOG = LogManager.getLogger(NonIndexStreamWriteFunction.class); + + private transient Map<String, FileGroupInfo> partitionPathToFileId; + + private transient HoodieWriteConfig writeConfig; + + private transient PartitionFileGroupHandle partitionFileGroupHandle; + + /** + * Constructs a StreamingSinkFunction. + * + * @param config The config options + */ + public NonIndexStreamWriteFunction(Configuration config) { + super(config); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + super.initializeState(context); + partitionPathToFileId = new ConcurrentHashMap<>(); + writeConfig = StreamerUtil.getHoodieClientConfig(this.config); + partitionFileGroupHandle = getPartitionHandle(PartitionFileGroupStorageType.valueOf(writeConfig.getNonIndexPartitionFileGroupStorageType())); + partitionFileGroupHandle.init(); + } + + @Override + public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + super.snapshotState(functionSnapshotContext); + partitionFileGroupHandle.reset(); + } + + @Override + public void processElement(I value, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception { + HoodieRecord<?> record = (HoodieRecord<?>) value; + assignFileId(record); + bufferRecord(record); + } + + private void assignFileId(HoodieRecord<?> record) { + String partitionPath = record.getPartitionPath(); + FileGroupInfo fileGroupInfo = partitionPathToFileId.computeIfAbsent(partitionPath, + key -> getLatestFileGroupInfo(record.getPartitionPath())); + if (fileGroupInfo == null || !fileGroupInfo.canAssign(record)) { + fileGroupInfo = new FileGroupInfo(FSUtils.getFileId(FSUtils.createNewFileIdPfx())); + partitionPathToFileId.put(partitionPath, fileGroupInfo); + } + record.unseal(); + record.setCurrentLocation(new HoodieRecordLocation("U", fileGroupInfo.getFileId())); + record.seal(); + } + + private FileGroupInfo getLatestFileGroupInfo(String partitionPath) { + return partitionFileGroupHandle.getLatestFileGroupInfo(partitionPath); + } + + @Override + public void endInput() { + super.endInput(); + partitionPathToFileId.clear(); + } + + private abstract class PartitionFileGroupHandle { + + abstract void init(); + + abstract FileGroupInfo getLatestFileGroupInfo(String partitionPath); + + abstract void reset(); + + } + + private class InMemoryPartitionFileGroupHandle extends PartitionFileGroupHandle { + + private long cacheIntervalMills; + + @Override + public void init() { + cacheIntervalMills = writeConfig.getNonIndexPartitionFileGroupCacheIntervalMinute() * 60 * 1000; + } + + @Override + public FileGroupInfo getLatestFileGroupInfo(String partitionPath) { + return null; + } + + @Override + public void reset() { + long curTime = System.currentTimeMillis(); + for (Entry<String, FileGroupInfo> entry : partitionPathToFileId.entrySet()) { + if (curTime - entry.getValue().getCreateTime() >= cacheIntervalMills) { + partitionPathToFileId.remove(entry.getKey()); + } + } + } + } + + @Override + protected boolean shouldFlushBucket(BufferSizeDetector detector, DataItem item, String partitionPath) { + FileGroupInfo fileGroupInfo = partitionPathToFileId.get(partitionPath); + return fileGroupInfo == null || fileGroupInfo.getLastRecordSize() == -1 ? detector.detect(item) + : detector.detect(fileGroupInfo.getLastRecordSize()); + } + + private class FileSystemPartitionFileGroupHandle extends PartitionFileGroupHandle { + + private String writeToken; + + private HoodieTable<?, ?, ?, ?> table; + + private HoodieFlinkEngineContext engineContext; + + private SyncableFileSystemView fileSystemView; + + private boolean needSync = false; + + @Override + public void init() { + this.engineContext = new HoodieFlinkEngineContext( + new SerializableConfiguration(HadoopConfigurations.getHadoopConf(config)), + new FlinkTaskContextSupplier(getRuntimeContext())); + this.writeToken = FSUtils.makeWriteToken( + engineContext.getTaskContextSupplier().getPartitionIdSupplier().get(), + engineContext.getTaskContextSupplier().getStageIdSupplier().get(), + engineContext.getTaskContextSupplier().getAttemptIdSupplier().get()); + this.table = HoodieFlinkTable.create(writeConfig, engineContext); + this.fileSystemView = table.getHoodieView(); + } + + @Override + public FileGroupInfo getLatestFileGroupInfo(String partitionPath) { + if (needSync) { + fileSystemView.sync(); + needSync = false; + } + List<HoodieLogFile> hoodieLogFiles = fileSystemView.getAllFileSlices(partitionPath) + .map(FileSlice::getLatestLogFile) + .filter(Option::isPresent).map(Option::get) + .filter(logFile -> FSUtils.getWriteTokenFromLogPath(logFile.getPath()).equals(writeToken)) + .sorted(HoodieLogFile.getReverseLogFileComparator()).collect(Collectors.toList()); + if (hoodieLogFiles.size() > 0) { + HoodieLogFile hoodieLogFile = hoodieLogFiles.get(0); + Option<FileSlice> fileSlice = fileSystemView.getLatestFileSlice(partitionPath, hoodieLogFile.getFileId()); + if (fileSlice.isPresent()) { + return new FileGroupInfo(FSUtils.getFileIdFromFilePath(hoodieLogFile.getPath()), + fileSlice.get().getFileGroupSize()); + } + LOG.warn("Can location fileSlice, partitionPath: " + partitionPath + ", fileId: " + + hoodieLogFile.getFileId()); + } + return null; + } + + @Override + public void reset() { + partitionPathToFileId.clear(); + needSync = true; + } + } + + private PartitionFileGroupHandle getPartitionHandle(PartitionFileGroupStorageType storageType) { + switch (storageType) { + case IN_MEMORY: + return new InMemoryPartitionFileGroupHandle(); + case FILE_SYSTEM: + return new FileSystemPartitionFileGroupHandle(); + default: + throw new IllegalArgumentException("UnSupport storage type :" + storageType.name()); + } + } + + private enum PartitionFileGroupStorageType { + IN_MEMORY, FILE_SYSTEM + } + + private class FileGroupInfo { + + private final String fileId; + private final long createTime; + private final BufferSizeDetector detector; + + public FileGroupInfo(String fileId) { + this.fileId = fileId; + this.createTime = System.currentTimeMillis(); + this.detector = new BufferSizeDetector((double) writeConfig.getNonIndexPartitionFileGroupCacheSize() / 1024 / 1024); + } + + public FileGroupInfo(String fileId, long initFileSize) { + this(fileId); + detector.setTotalSize(initFileSize); + } + + public boolean canAssign(HoodieRecord<?> record) { + return !detector.detect(record); + } + + public String getFileId() { + return fileId; + } + + public long getCreateTime() { + return createTime; + } + + public long getLastRecordSize() { + return detector.getLastRecordSize(); + } + } + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/nonindex/NonIndexStreamWriteOperator.java similarity index 51% copy from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java copy to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/nonindex/NonIndexStreamWriteOperator.java index 70ee473d24..560716aa9c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/nonindex/NonIndexStreamWriteOperator.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,8 +16,25 @@ * limitations under the License. */ -package org.apache.hudi.table.action.commit; +package org.apache.hudi.sink.nonindex; -public enum BucketType { - UPDATE, INSERT +import org.apache.hudi.sink.common.AbstractWriteOperator; +import org.apache.hudi.sink.common.WriteOperatorFactory; + +import org.apache.flink.configuration.Configuration; + +/** + * Operator for {@link NonIndexStreamWriteFunction}. + * + * @param <I> The input type + */ +public class NonIndexStreamWriteOperator<I> extends AbstractWriteOperator<I> { + + public NonIndexStreamWriteOperator(Configuration conf) { + super(new NonIndexStreamWriteFunction<>(conf)); + } + + public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) { + return WriteOperatorFactory.instance(conf, new NonIndexStreamWriteOperator<>(conf)); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 82761adf73..55428610d9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -44,6 +44,7 @@ import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; +import org.apache.hudi.sink.nonindex.NonIndexStreamWriteOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketIndexPartitioner; import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; @@ -323,6 +324,12 @@ public class Pipelines { .transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory) .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + } else if (OptionsResolver.isNonIndexType(conf)) { + WriteOperatorFactory<HoodieRecord> operatorFactory = NonIndexStreamWriteOperator.getFactory(conf); + return dataStream.transform("non_index_write", + TypeInformation.of(Object.class), operatorFactory) + .uid("uid_non_index_write" + conf.getString(FlinkOptions.TABLE_NAME)) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } else { WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf); return dataStream diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 1718175240..b0380c5878 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -19,12 +19,14 @@ package org.apache.hudi.table; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; +import org.apache.hudi.keygen.EmptyAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; @@ -56,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; +import static org.apache.hudi.configuration.FlinkOptions.REALTIME_SKIP_MERGE; /** * Hoodie data source/sink factory. @@ -185,6 +188,15 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab setupWriteOptions(conf); // infer avro schema from physical DDL schema inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType()); + setupDefaultOptionsForNonIndex(conf); + } + + private static void setupDefaultOptionsForNonIndex(Configuration conf) { + if (OptionsResolver.isNonIndexType(conf)) { + conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, EmptyAvroKeyGenerator.class.getName()); + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); + conf.setString(FlinkOptions.MERGE_TYPE, REALTIME_SKIP_MERGE); + } } /** diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index df01fc9076..7efdc181e9 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -20,7 +20,13 @@ package org.apache.hudi.sink; import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.keygen.EmptyAvroKeyGenerator; import org.apache.hudi.utils.TestData; import org.apache.flink.configuration.Configuration; @@ -141,6 +147,54 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { .end(); } + @Test + public void testInsertNonIndexOnMemoryMode() throws Exception { + testInsertNonIndex("IN_MEMORY"); + } + + @Test + public void testInsertNonIndexOnFileSystemMode() throws Exception { + testInsertNonIndex("FILE_SYSTEM"); + } + + public void testInsertNonIndex(String storageType) throws Exception { + // open the function and ingest data + conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.NON_INDEX.name()); + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.name()); + conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, EmptyAvroKeyGenerator.class.getName()); + conf.setLong(HoodieIndexConfig.NON_INDEX_PARTITION_FILE_GROUP_CACHE_SIZE.key(), 2000L); + conf.setInteger(HoodieIndexConfig.NON_INDEX_PARTITION_FILE_GROUP_CACHE_INTERVAL_MINUTE.key(), 0); + conf.setString(HoodieIndexConfig.NON_INDEX_PARTITION_FILE_GROUP_STORAGE_TYPE.key(), storageType); + conf.setString(FileSystemViewStorageConfig.VIEW_TYPE.key(), FileSystemViewStorageConfig.VIEW_TYPE.defaultValue().name()); + conf.setString(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(),"false"); + + TestHarness harness = TestHarness.instance().preparePipeline(tempFile, conf) + .consume(TestData.DATA_SET_INSERT_NOM_INDEX) + .emptyEventBuffer() + .checkpoint(1) + .assertPartitionFileGroups("par1", 2) + .assertPartitionFileGroups("par2", 1) + .assertNextEvent() + .checkpointComplete(1) + .consume(TestData.DATA_SET_INSERT_NOM_INDEX) + .emptyEventBuffer() + .checkpoint(2); + + if (storageType.equals("FILE_SYSTEM")) { + harness = harness.assertPartitionFileGroups("par2", 1); + } else if (storageType.equals("IN_MEMORY")) { + harness = harness + .assertPartitionFileGroups("par1", 4) + .assertPartitionFileGroups("par2", 2); + } + + harness + .assertNextEvent() + .checkpointComplete(2) + .checkWrittenData(EXPECTED6, 4) + .end(); + } + @Override public void testInsertClustering() { // insert clustering is only valid for cow table. diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java index 707fe45c47..3666940b4e 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.utils; +import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; @@ -144,6 +145,11 @@ public class InsertFunctionWrapper<I> implements TestFunctionWrapper<I> { return coordinator; } + @Override + public HoodieFlinkWriteClient getWriteClient() { + return coordinator.getWriteClient(); + } + @Override public void close() throws Exception { this.coordinator.close(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index a1a14456e3..b83f3cc478 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.utils; +import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; @@ -27,6 +28,7 @@ import org.apache.hudi.sink.StreamWriteFunction; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.bootstrap.BootstrapOperator; import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.nonindex.NonIndexStreamWriteFunction; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.utils.TestConfigurations; @@ -164,9 +166,13 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> { public void invoke(I record) throws Exception { HoodieRecord<?> hoodieRecord = toHoodieFunction.map((RowData) record); ScalaCollector<HoodieRecord<?>> collector = ScalaCollector.getInstance(); - bucketAssignerFunction.processElement(hoodieRecord, null, collector); - bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey()); - writeFunction.processElement(collector.getVal(), null, null); + if (OptionsResolver.isNonIndexType(conf)) { + writeFunction.processElement(hoodieRecord, null, null); + } else { + bucketAssignerFunction.processElement(hoodieRecord, null, collector); + bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey()); + writeFunction.processElement(collector.getVal(), null, null); + } } public WriteMetadataEvent[] getEventBuffer() { @@ -233,6 +239,11 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> { return coordinator; } + @Override + public HoodieFlinkWriteClient getWriteClient() { + return coordinator.getWriteClient(); + } + public MockOperatorCoordinatorContext getCoordinatorContext() { return coordinatorContext; } @@ -254,7 +265,11 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> { // ------------------------------------------------------------------------- private void setupWriteFunction() throws Exception { - writeFunction = new StreamWriteFunction<>(conf); + if (OptionsResolver.isNonIndexType(conf)) { + writeFunction = new NonIndexStreamWriteFunction<>(conf); + } else { + writeFunction = new StreamWriteFunction<>(conf); + } writeFunction.setRuntimeContext(runtimeContext); writeFunction.setOperatorEventGateway(gateway); writeFunction.initializeState(this.stateInitializationContext); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java index d2fe819650..967fdd3b24 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.utils; +import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; @@ -68,6 +69,11 @@ public interface TestFunctionWrapper<I> { */ StreamWriteOperatorCoordinator getCoordinator(); + /** + * Returns the write client. + */ + HoodieFlinkWriteClient getWriteClient(); + /** * Returns the data buffer of the write task. */ diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java index b6ae0767d6..05ce879bcb 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java @@ -18,12 +18,15 @@ package org.apache.hudi.sink.utils; +import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.WriteMetadataEvent; @@ -69,6 +72,8 @@ public class TestWriteBase { protected static final Map<String, List<String>> EXPECTED5 = new HashMap<>(); + protected static final Map<String, String> EXPECTED6 = new HashMap<>(); + static { EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]"); EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]"); @@ -102,6 +107,38 @@ public class TestWriteBase { "id1,par1,id1,Danny,23,3,par1", "id1,par1,id1,Danny,23,4,par1", "id1,par1,id1,Danny,23,4,par1")); + + EXPECTED6.put("par1", "[" + + "_hoodie_empty_record_key_,par1,id1,Danny,23,1,par1, " + + "_hoodie_empty_record_key_,par1,id1,Danny,23,1,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, " + + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1]"); + + EXPECTED6.put("par2", "[" + + "_hoodie_empty_record_key_,par2,id3,Julian,53,3,par2, " + + "_hoodie_empty_record_key_,par2,id3,Julian,53,3,par2]"); + + EXPECTED6.put("par3", "[" + + "_hoodie_empty_record_key_,par3,id5,Sophia,18,5,par3, " + + "_hoodie_empty_record_key_,par3,id5,Sophia,18,5,par3]"); + + EXPECTED6.put("par4", "[" + + "_hoodie_empty_record_key_,par4,id7,Bob,44,7,par4, " + + "_hoodie_empty_record_key_,par4,id7,Bob,44,7,par4]"); } // ------------------------------------------------------------------------- @@ -224,6 +261,17 @@ public class TestWriteBase { return this; } + public TestHarness assertPartitionFileGroups(String partitionPath, int numFileGroups) { + HoodieFlinkWriteClient writeClient = this.pipeline.getWriteClient(); + SyncableFileSystemView fileSystemView = writeClient.getHoodieTable().getHoodieView(); + fileSystemView.sync(); + //check part1 + List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect( + Collectors.toList()); + assertEquals(numFileGroups, fileGroups.size()); + return this; + } + /** * Checkpoints the pipeline, which triggers the data write and event send. */ diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 8e1dd9964c..6d0183053f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -286,6 +286,40 @@ public class TestData { TimestampData.fromEpochMillis(5), StringData.fromString("par1")) ); + public static List<RowData> DATA_SET_INSERT_NOM_INDEX = Arrays.asList( + //partition1 + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")), + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33, + TimestampData.fromEpochMillis(2), StringData.fromString("par1")), + + //partition2 + insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53, + TimestampData.fromEpochMillis(3), StringData.fromString("par2")), + + //partition3 + insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + TimestampData.fromEpochMillis(5), StringData.fromString("par3")), + + //partition4 + insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, + TimestampData.fromEpochMillis(7), StringData.fromString("par4")) + ); + public static List<RowData> DATA_SET_SINGLE_INSERT = Collections.singletonList( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), StringData.fromString("par1"))); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestNonIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestNonIndex.scala new file mode 100644 index 0000000000..cfe9564a35 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestNonIndex.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.index.HoodieIndex +import org.apache.hudi.keygen.EmptyKeyGenerator +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase} +import org.apache.spark.sql.{Dataset, Row, SaveMode} +import org.junit.jupiter.api.Test + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters + +class TestNonIndex extends HoodieClientTestBase { + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp", + HoodieIndexConfig.INDEX_TYPE.key() -> HoodieIndex.IndexType.NON_INDEX.name(), + HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() -> classOf[EmptyKeyGenerator].getName + ) + + @Test + def testNonIndexMORInsert(): Unit = { + val spark = sqlContext.sparkSession + + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + // first insert, parquet files + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + val records2 = recordsToStrings(dataGen.generateInserts("002", 100)).toList + // second insert, log files + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + + val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) + assert(fs.globStatus(new Path(basePath, "201*/*/*/.*.log*")).length > 0) + assert(fs.globStatus(new Path(basePath, "201*/*/*/*.parquet")).length > 0) + + // check data + val result = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*") + .selectExpr(inputDF2.schema.map(_.name): _*) + val inputAll = inputDF1.unionAll(inputDF2) + assert(result.except(inputAll).count() == 0) + assert(inputAll.except(result).count == 0) + } + + @Test + def testBulkInsertDatasetWithOutIndex(): Unit = { + val spark = sqlContext.sparkSession + val schema = DataSourceTestUtils.getStructTypeExampleSchema + + // create a new table + val fooTableModifier = commonOpts + .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") + .updated(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.NON_INDEX.toString) + .updated(HoodieWriteConfig.AVRO_SCHEMA_STRING.key(), schema.toString) + .updated(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), "4") + .updated("path", basePath) + val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) + + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + for (_ <- 0 to 0) { + // generate the inserts + val records = DataSourceTestUtils.generateRandomRows(200) + val recordsSeq = JavaConverters.asScalaIteratorConverter(records.iterator).asScala.toSeq + val df = spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType) + // write to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams ++ Seq(), df) + + // Fetch records from entire dataset + val actualDf = sqlContext.read.format("org.apache.hudi").load(basePath + "/*/*/*") + + assert(actualDf.where("_hoodie_record_key = '_hoodie_empty_record_key_'").count() == 200) + } + } +}