This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new eacae1e0dc4 [HUDI-5514] Add in support for a keyless workflow by building an ID based off of values within the record (#7640) eacae1e0dc4 is described below commit eacae1e0dc44e29bbe5cfc29475666bb16ee478d Author: Tim Brown <t...@onehouse.ai> AuthorDate: Thu Jan 12 07:04:17 2023 -0800 [HUDI-5514] Add in support for a keyless workflow by building an ID based off of values within the record (#7640) - Adds a new KeyGenerator that does not require the user to specify any fields to use for the record key and instead deterministically generates a UUID based off a subset of fields in the incoming record. --- .../apache/hudi/keygen/KeylessKeyGenerator.java | 239 +++++++++++++++++++++ .../hudi/keygen/TestKeylessKeyGenerator.java | 119 ++++++++++ .../src/test/resources/keyless_schema.avsc | 44 ++++ .../hudi/keygen/constant/KeyGeneratorOptions.java | 6 + 4 files changed, 408 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java new file mode 100644 index 00000000000..d487e7e1ff9 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * This class is used to compute a deterministic key for a record based on the contents of the field. Unlike the other KeyGenerators in Hudi, this class does not take in any field names as args to + * create a "keyless" experience for insert only workloads. The keys are guaranteed to be deterministic but not unique, so they can only be used for insert workflows with deduplication disabled. + * The class attempts to get sufficient uniqueness for keys to prevent frequent collisions by choosing the fields it uses in order of decreasing likelihood for uniqueness. The ordering is: + * <ul> + * <li>timestamp</li> + * <li>numeric values</li> + * <li>string, byte arrays, other types not mentioned</li> + * <li>date, lists, maps, booleans</li> + * </ul> + * The number of fields is capped to created predictable performance and the generator only uses non-null values to help increase uniqueness for sparse datasets. + */ +public class KeylessKeyGenerator extends CustomAvroKeyGenerator { + private static final String HOODIE_PREFIX = "_hoodie"; + private static final String DOT = "."; + private final int maxFieldsToConsider; + private final int numFieldsForKey; + private final Set<String> partitionFieldNames; + private int[][] fieldOrdering; + + public KeylessKeyGenerator(TypedProperties props) { + super(props); + this.numFieldsForKey = props.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.defaultValue()); + // cap the number of fields to order in case of large schemas + this.maxFieldsToConsider = numFieldsForKey * 3; + this.partitionFieldNames = this.getPartitionPathFields().stream().map(field -> field.split(SPLIT_REGEX)[0]).collect(Collectors.toSet()); + } + + @Override + public String getRecordKey(GenericRecord record) { + return buildKey(getFieldOrdering(record), record); + } + + int[][] getFieldOrdering(GenericRecord genericRecord) { + if (fieldOrdering == null) { + fieldOrdering = buildFieldOrdering(genericRecord.getSchema().getFields()); + } + return fieldOrdering; + } + + /** + * Deterministically builds a key for the input value based on the provided fieldOrdering. The first {@link #numFieldsForKey} non-null values will be used to generate a string that is passed to + * {@link UUID#nameUUIDFromBytes(byte[])}. + * @param fieldOrdering an array of integer arrays. The integer arrays represent paths to a single field within the input object. + * @param input the input object that needs a key + * @return a deterministically generated {@link UUID} + * @param <T> the input object type + */ + private <T> String buildKey(int[][] fieldOrdering, GenericRecord input) { + StringBuilder key = new StringBuilder(); + int nonNullFields = 0; + for (int[] index : fieldOrdering) { + Object value = getFieldForRecord(input, index); + if (value == null) { + continue; + } + nonNullFields++; + key.append(value.hashCode()); + if (nonNullFields >= numFieldsForKey) { + break; + } + } + return UUID.nameUUIDFromBytes(key.toString().getBytes(StandardCharsets.UTF_8)).toString(); + } + + /** + * Gets the value of the field at the specified path within the record. + * @param record the input record + * @param fieldPath the path to the field as an array of integers representing the field position within the object + * @return value at the path + */ + private static Object getFieldForRecord(GenericRecord record, int[] fieldPath) { + Object value = record; + for (Integer index : fieldPath) { + if (value == null) { + return null; + } + value = ((GenericRecord) value).get(index); + } + return value; + } + + private int[][] buildFieldOrdering(List<Schema.Field> initialFields) { + PriorityQueue<Pair<int[], Integer>> queue = new PriorityQueue<>(maxFieldsToConsider + 1, RankingComparator.getInstance()); + Queue<FieldToProcess> fieldsToProcess = new ArrayDeque<>(); + for (int j = 0; j < initialFields.size(); j++) { + fieldsToProcess.offer(new FieldToProcess(new int[]{j}, initialFields.get(j), initialFields.get(j).name())); + } + while (!fieldsToProcess.isEmpty()) { + FieldToProcess fieldToProcess = fieldsToProcess.poll(); + int[] existingPath = fieldToProcess.getIndexPath(); + Schema fieldSchema = fieldToProcess.getField().schema(); + if (fieldSchema.getType() == Schema.Type.UNION) { + fieldSchema = fieldSchema.getTypes().get(1); + } + if (fieldSchema.getType() == Schema.Type.RECORD) { + List<Schema.Field> nestedFields = fieldSchema.getFields(); + for (int i = 0; i < nestedFields.size(); i++) { + int[] path = Arrays.copyOf(existingPath, existingPath.length + 1); + path[existingPath.length] = i; + Schema.Field nestedField = nestedFields.get(i); + fieldsToProcess.add(new FieldToProcess(path, nestedField, fieldToProcess.getNamePath() + DOT + nestedField.name())); + } + } else { + // check that field is not used in partitioning + if (!partitionFieldNames.contains(fieldToProcess.getNamePath())) { + queue.offer(Pair.of(existingPath, getSchemaRanking(fieldToProcess.getField()))); + if (queue.size() > maxFieldsToConsider) { + queue.poll(); + } + } + } + } + Pair<int[], Integer>[] sortedPairs = queue.toArray(new Pair[queue.size()]); + Arrays.sort(sortedPairs, RankingComparator.getInstance().reversed()); + int[][] output = new int[sortedPairs.length][]; + for (int k = 0; k < sortedPairs.length; k++) { + output[k] = sortedPairs[k].getLeft(); + } + return output; + } + + private static class FieldToProcess { + final int[] indexPath; + final Schema.Field field; + final String namePath; + + public FieldToProcess(int[] indexPath, Schema.Field field, String namePath) { + this.indexPath = indexPath; + this.field = field; + this.namePath = namePath; + } + + public int[] getIndexPath() { + return indexPath; + } + + public Schema.Field getField() { + return field; + } + + public String getNamePath() { + return namePath; + } + } + + /** + * Ranks the fields by their type. + * @param field input field + * @return a score of 0 to 4 + */ + private int getSchemaRanking(Schema.Field field) { + if (field.name().startsWith(HOODIE_PREFIX)) { + return 0; + } + Schema schema = field.schema(); + if (schema.getType() == Schema.Type.UNION) { + schema = schema.getTypes().get(0).getType() == Schema.Type.NULL ? schema.getTypes().get(1) : schema.getTypes().get(0); + } + Schema.Type type = schema.getType(); + switch (type) { + case LONG: + // assumes long with logical type will be a timestamp + return schema.getLogicalType() != null ? 4 : 3; + case INT: + // assumes long with logical type will be a date which will have low variance in a batch + return schema.getLogicalType() != null ? 1 : 3; + case DOUBLE: + case FLOAT: + return 3; + case BOOLEAN: + case MAP: + case ARRAY: + return 1; + default: + return 2; + } + } + + private static class RankingComparator implements Comparator<Pair<int[], Integer>> { + private static final RankingComparator INSTANCE = new RankingComparator(); + + static RankingComparator getInstance() { + return INSTANCE; + } + + @Override + public int compare(Pair<int[], Integer> o1, Pair<int[], Integer> o2) { + int initialResult = o1.getRight().compareTo(o2.getRight()); + if (initialResult == 0) { + // favor the smaller list (less nested value) on ties + int sizeResult = Integer.compare(o2.getLeft().length, o1.getLeft().length); + if (sizeResult == 0) { + return Integer.compare(o2.getLeft()[0], o1.getLeft()[0]); + } + return sizeResult; + } + return initialResult; + } + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java new file mode 100644 index 00000000000..af6b30e3f09 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +public class TestKeylessKeyGenerator { + private static final long TIME = 1672265446090L; + private static final Schema SCHEMA; + + static { + try { + SCHEMA = new Schema.Parser().parse(TestKeylessKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc")); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Test + public void createKeyWithoutPartitionColumn() { + KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3)); + GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null); + String actualForRecord = keyGenerator.getRecordKey(record); + Assertions.assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord); + } + + @Test + public void createKeyWithPartition() { + KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("integer_field:SIMPLE,partition_field:SIMPLE,nested_struct.doubly_nested:SIMPLE", 3)); + GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null); + String actualForRecord = keyGenerator.getRecordKey(record); + Assertions.assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord); + } + + @Test + public void nullFieldsProperlyHandled() { + KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3)); + GenericRecord record = createRecord("partition1", "value1", null, null, null, null); + String actualForRecord = keyGenerator.getRecordKey(record); + Assertions.assertEquals("22dee533-e64f-3694-8242-5ec5f25e6d11", actualForRecord); + } + + @Test + public void assertOnlySubsetOfFieldsUsed() { + KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3)); + GenericRecord record1 = createRecord("partition1", "value1", 123, 456L, TIME, null); + String actualForRecord1 = keyGenerator.getRecordKey(record1); + GenericRecord record2 = createRecord("partition2", "value2", 123, 456L, TIME, null); + String actualForRecord2 = keyGenerator.getRecordKey(record2); + Assertions.assertEquals(actualForRecord2, actualForRecord1); + } + + @Test + public void numFieldsImpactsKeyGen() { + KeylessKeyGenerator keyGenerator1 = new KeylessKeyGenerator(getKeyGenProperties("", 3)); + KeylessKeyGenerator keyGenerator2 = new KeylessKeyGenerator(getKeyGenProperties("", 10)); + GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null); + Assertions.assertNotEquals(keyGenerator1.getRecordKey(record), keyGenerator2.getRecordKey(record)); + } + + @Test + public void nestedColumnsUsed() { + KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 10)); + GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, 20.1); + String actualForRecord = keyGenerator.getRecordKey(record); + Assertions.assertEquals("6bbd8811-6ea1-3ef1-840c-f7a51d8f378c", actualForRecord); + } + + protected GenericRecord createRecord(String partitionField, String stringValue, Integer integerValue, Long longValue, Long timestampValue, Double nestedDouble) { + GenericRecord nestedRecord = null; + if (nestedDouble != null) { + nestedRecord = new GenericRecordBuilder(SCHEMA.getField("nested_struct").schema().getTypes().get(1)) + .set("doubly_nested", nestedDouble) + .build(); + } + + return new GenericRecordBuilder(SCHEMA) + .set("partition_field", partitionField) + .set("string_field", stringValue) + .set("integer_field", integerValue) + .set("long_field", longValue) + .set("timestamp_field", timestampValue) + .set("nested_struct", nestedRecord) + .build(); + } + + protected TypedProperties getKeyGenProperties(String partitionPathField, int numFieldsInKeyGen) { + TypedProperties properties = new TypedProperties(); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionPathField); + properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), numFieldsInKeyGen); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), ""); + return properties; + } +} diff --git a/hudi-client/hudi-client-common/src/test/resources/keyless_schema.avsc b/hudi-client/hudi-client-common/src/test/resources/keyless_schema.avsc new file mode 100644 index 00000000000..2966841eef6 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/resources/keyless_schema.avsc @@ -0,0 +1,44 @@ +{ + "namespace": "keyless", + "type": "record", + "name": "message", + "fields": [ + { + "name": "partition_field", + "type": "string" + }, + { + "name": "string_field", + "type": "string" + }, + { + "name": "integer_field", + "type": ["null", "int"], + "default": null + }, + { + "name": "long_field", + "type": ["null", "long"], + "default": null + }, + { + "name": "timestamp_field", + "type": ["null", {"type":"long","logicalType":"timestamp-millis"}], + "default": null + }, + { + "name": "nested_struct", + "type": ["null", { + "type": "record", + "name": "nested", + "fields": [ + { + "name": "doubly_nested", + "type": "double" + } + ] + }], + "default": null + } + ] +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java index 99d40439b7c..b0a46ac0676 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java @@ -69,6 +69,12 @@ public class KeyGeneratorOptions extends HoodieConfig { + "`2016-12-29 09:54:00.0` in row-writer path, while it will be written as long value `1483023240000000` in non row-writer path. " + "If enabled, then the timestamp value will be written in both the cases."); + public static final ConfigProperty<Integer> NUM_FIELDS_IN_KEYLESS_GENERATOR = ConfigProperty + .key("hoodie.datasource.write.recordkey.keyless.field.count") + .defaultValue(5) + .withDocumentation("When using the KeylessKeyGenerator, this sets the number of fields to use when computing the UUID for the record. " + + "Increasing the value will increase the randomness of the generated key but can impact performance."); + /** * @deprecated Use {@link #URL_ENCODE_PARTITIONING} and its methods. */