nsivabalan commented on a change in pull request #1834: URL: https://github.com/apache/hudi/pull/1834#discussion_r469299509
########## File path: hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java ########## @@ -177,4 +191,26 @@ private long convertLongTimeToMillis(Long partitionVal) { } return MILLISECONDS.convert(partitionVal, timeUnit); } + + @Override + public String getRecordKey(Row row) { + return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), getRecordKeyPositions(), false); + } + + @Override + public String getPartitionPath(Row row) { + Object fieldVal = null; + Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, getPartitionPathPositions().get(getPartitionPathFields().get(0))); + try { + if (partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) + || partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) { + fieldVal = 1L; + } else { + fieldVal = partitionPathFieldVal; + } + return getPartitionPath(fieldVal); + } catch (ParseException e) { Review comment: Can we switch this to Exception to be in sync up with GenericRecord behavior. ########## File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.AvroConversionHelper; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.exception.HoodieKeyException; + +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructType; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import scala.Function1; + +/** + * Base class for all the built-in key generators. Contains methods structured for + * code reuse amongst them. + */ +public abstract class BuiltinKeyGenerator extends KeyGenerator { + + protected List<String> recordKeyFields; + protected List<String> partitionPathFields; + + private Map<String, List<Integer>> recordKeyPositions = new HashMap<>(); + private Map<String, List<Integer>> partitionPathPositions = new HashMap<>(); + + private transient Function1<Object, Object> converterFn = null; + protected StructType structType; + private String structName; + private String recordNamespace; + + protected BuiltinKeyGenerator(TypedProperties config) { + super(config); + } + + /** + * Generate a record Key out of provided generic record. + */ + public abstract String getRecordKey(GenericRecord record); + + /** + * Generate a partition path out of provided generic record. + */ + public abstract String getPartitionPath(GenericRecord record); + + /** + * Generate a Hoodie Key out of provided generic record. + */ + public final HoodieKey getKey(GenericRecord record) { + if (getRecordKeyFields() == null || getPartitionPathFields() == null) { + throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); + } + return new HoodieKey(getRecordKey(record), getPartitionPath(record)); + } + + @Override + public final List<String> getRecordKeyFieldNames() { + // For nested columns, pick top level column name + return getRecordKeyFields().stream().map(k -> { + int idx = k.indexOf('.'); + return idx > 0 ? k.substring(0, idx) : k; + }).collect(Collectors.toList()); + } + + @Override + public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) { + // parse simple feilds + getRecordKeyFields().stream() + .filter(f -> !(f.contains("."))) + .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())))); + // parse nested fields + getRecordKeyFields().stream() + .filter(f -> f.contains(".")) + .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true))); + // parse simple fields + if (getPartitionPathFields() != null) { + getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains("."))) + .forEach(f -> partitionPathPositions.put(f, Review comment: same here. ``` .forEach(f -> { Collections.singletonList((Integer) (structType.getFieldIndex(f).get())))); if (structType.getFieldIndex(f).isDefined()) { partitionPathPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))); } else { partitionPathPositions.put(f, Collections.singletonList(-1)); } }); ``` ########## File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.AvroConversionHelper; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.exception.HoodieKeyException; + +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructType; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import scala.Function1; + +/** + * Base class for all the built-in key generators. Contains methods structured for + * code reuse amongst them. + */ +public abstract class BuiltinKeyGenerator extends KeyGenerator { + + protected List<String> recordKeyFields; + protected List<String> partitionPathFields; + + private Map<String, List<Integer>> recordKeyPositions = new HashMap<>(); + private Map<String, List<Integer>> partitionPathPositions = new HashMap<>(); + + private transient Function1<Object, Object> converterFn = null; + protected StructType structType; + private String structName; + private String recordNamespace; + + protected BuiltinKeyGenerator(TypedProperties config) { + super(config); + } + + /** + * Generate a record Key out of provided generic record. + */ + public abstract String getRecordKey(GenericRecord record); + + /** + * Generate a partition path out of provided generic record. + */ + public abstract String getPartitionPath(GenericRecord record); + + /** + * Generate a Hoodie Key out of provided generic record. + */ + public final HoodieKey getKey(GenericRecord record) { + if (getRecordKeyFields() == null || getPartitionPathFields() == null) { + throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); + } + return new HoodieKey(getRecordKey(record), getPartitionPath(record)); + } + + @Override + public final List<String> getRecordKeyFieldNames() { + // For nested columns, pick top level column name + return getRecordKeyFields().stream().map(k -> { + int idx = k.indexOf('.'); + return idx > 0 ? k.substring(0, idx) : k; + }).collect(Collectors.toList()); + } + + @Override + public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) { + // parse simple feilds + getRecordKeyFields().stream() + .filter(f -> !(f.contains("."))) + .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())))); + // parse nested fields + getRecordKeyFields().stream() + .filter(f -> f.contains(".")) + .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true))); + // parse simple fields + if (getPartitionPathFields() != null) { + getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains("."))) + .forEach(f -> partitionPathPositions.put(f, + Collections.singletonList((Integer) (structType.getFieldIndex(f).get())))); + // parse nested fields + getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains(".")) + .forEach(f -> partitionPathPositions.put(f, + RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false))); + } + this.structName = structName; + this.structType = structType; + this.recordNamespace = recordNamespace; + } + + /** + * Fetch record key from {@link Row}. + * @param row instance of {@link Row} from which record key is requested. + * @return the record key of interest from {@link Row}. + */ + @Override + public String getRecordKey(Row row) { + if (null != converterFn) { + converterFn = AvroConversionHelper.createConverterToAvro(structType, structName, recordNamespace); + } + GenericRecord genericRecord = (GenericRecord) converterFn.apply(row); + return getKey(genericRecord).getRecordKey(); + } + + /** + * Fetch partition path from {@link Row}. + * @param row instance of {@link Row} from which partition path is requested + * @return the partition path of interest from {@link Row}. + */ + @Override + public String getPartitionPath(Row row) { + if (null != converterFn) { Review comment: would be good to add a precondition check here that init has been called. ########## File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.AvroConversionHelper; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.exception.HoodieKeyException; + +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructType; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import scala.Function1; + +/** + * Base class for all the built-in key generators. Contains methods structured for + * code reuse amongst them. + */ +public abstract class BuiltinKeyGenerator extends KeyGenerator { + + protected List<String> recordKeyFields; + protected List<String> partitionPathFields; + + private Map<String, List<Integer>> recordKeyPositions = new HashMap<>(); + private Map<String, List<Integer>> partitionPathPositions = new HashMap<>(); + + private transient Function1<Object, Object> converterFn = null; + protected StructType structType; + private String structName; + private String recordNamespace; + + protected BuiltinKeyGenerator(TypedProperties config) { + super(config); + } + + /** + * Generate a record Key out of provided generic record. + */ + public abstract String getRecordKey(GenericRecord record); + + /** + * Generate a partition path out of provided generic record. + */ + public abstract String getPartitionPath(GenericRecord record); + + /** + * Generate a Hoodie Key out of provided generic record. + */ + public final HoodieKey getKey(GenericRecord record) { + if (getRecordKeyFields() == null || getPartitionPathFields() == null) { + throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); + } + return new HoodieKey(getRecordKey(record), getPartitionPath(record)); + } + + @Override + public final List<String> getRecordKeyFieldNames() { + // For nested columns, pick top level column name + return getRecordKeyFields().stream().map(k -> { + int idx = k.indexOf('.'); + return idx > 0 ? k.substring(0, idx) : k; + }).collect(Collectors.toList()); + } + + @Override + public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) { + // parse simple feilds + getRecordKeyFields().stream() + .filter(f -> !(f.contains("."))) + .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())))); + // parse nested fields + getRecordKeyFields().stream() + .filter(f -> f.contains(".")) + .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true))); + // parse simple fields + if (getPartitionPathFields() != null) { + getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains("."))) + .forEach(f -> partitionPathPositions.put(f, + Collections.singletonList((Integer) (structType.getFieldIndex(f).get())))); + // parse nested fields + getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains(".")) + .forEach(f -> partitionPathPositions.put(f, + RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false))); + } + this.structName = structName; + this.structType = structType; + this.recordNamespace = recordNamespace; + } + + /** + * Fetch record key from {@link Row}. + * @param row instance of {@link Row} from which record key is requested. + * @return the record key of interest from {@link Row}. + */ + @Override + public String getRecordKey(Row row) { + if (null != converterFn) { Review comment: would be good to add a precondition check here that init has been called. ########## File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.AvroConversionHelper; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.exception.HoodieKeyException; + +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructType; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import scala.Function1; + +/** + * Base class for all the built-in key generators. Contains methods structured for + * code reuse amongst them. + */ +public abstract class BuiltinKeyGenerator extends KeyGenerator { + + protected List<String> recordKeyFields; + protected List<String> partitionPathFields; + + private Map<String, List<Integer>> recordKeyPositions = new HashMap<>(); + private Map<String, List<Integer>> partitionPathPositions = new HashMap<>(); + + private transient Function1<Object, Object> converterFn = null; + protected StructType structType; + private String structName; + private String recordNamespace; + + protected BuiltinKeyGenerator(TypedProperties config) { + super(config); + } + + /** + * Generate a record Key out of provided generic record. + */ + public abstract String getRecordKey(GenericRecord record); + + /** + * Generate a partition path out of provided generic record. + */ + public abstract String getPartitionPath(GenericRecord record); + + /** + * Generate a Hoodie Key out of provided generic record. + */ + public final HoodieKey getKey(GenericRecord record) { + if (getRecordKeyFields() == null || getPartitionPathFields() == null) { + throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); + } + return new HoodieKey(getRecordKey(record), getPartitionPath(record)); + } + + @Override + public final List<String> getRecordKeyFieldNames() { + // For nested columns, pick top level column name + return getRecordKeyFields().stream().map(k -> { + int idx = k.indexOf('.'); + return idx > 0 ? k.substring(0, idx) : k; + }).collect(Collectors.toList()); + } + + @Override + public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) { + // parse simple feilds + getRecordKeyFields().stream() + .filter(f -> !(f.contains("."))) + .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())))); Review comment: there could be some bug here. if field is not found in structType. I fixed it in RowKeyGeneratorHelper for nested fields, but missed it here. something like ``` .forEach(f -> { if (structType.getFieldIndex(f).isDefined()) { recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))); } else { throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\""); } }); ``` ########## File path: hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.AvroConversionHelper; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.exception.HoodieKeyException; + +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructType; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import scala.Function1; + +/** + * Base class for all the built-in key generators. Contains methods structured for + * code reuse amongst them. + */ +public abstract class BuiltinKeyGenerator extends KeyGenerator { + + protected List<String> recordKeyFields; + protected List<String> partitionPathFields; + + private Map<String, List<Integer>> recordKeyPositions = new HashMap<>(); + private Map<String, List<Integer>> partitionPathPositions = new HashMap<>(); + + private transient Function1<Object, Object> converterFn = null; + protected StructType structType; + private String structName; + private String recordNamespace; + + protected BuiltinKeyGenerator(TypedProperties config) { + super(config); + } + + /** + * Generate a record Key out of provided generic record. + */ + public abstract String getRecordKey(GenericRecord record); + + /** + * Generate a partition path out of provided generic record. + */ + public abstract String getPartitionPath(GenericRecord record); + + /** + * Generate a Hoodie Key out of provided generic record. + */ + public final HoodieKey getKey(GenericRecord record) { + if (getRecordKeyFields() == null || getPartitionPathFields() == null) { + throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); + } + return new HoodieKey(getRecordKey(record), getPartitionPath(record)); + } + + @Override + public final List<String> getRecordKeyFieldNames() { + // For nested columns, pick top level column name + return getRecordKeyFields().stream().map(k -> { + int idx = k.indexOf('.'); + return idx > 0 ? k.substring(0, idx) : k; + }).collect(Collectors.toList()); + } + + @Override + public void initializeRowKeyGenerator(StructType structType, String structName, String recordNamespace) { + // parse simple feilds + getRecordKeyFields().stream() + .filter(f -> !(f.contains("."))) + .forEach(f -> recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())))); + // parse nested fields + getRecordKeyFields().stream() + .filter(f -> f.contains(".")) + .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true))); + // parse simple fields + if (getPartitionPathFields() != null) { + getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains("."))) + .forEach(f -> partitionPathPositions.put(f, + Collections.singletonList((Integer) (structType.getFieldIndex(f).get())))); + // parse nested fields + getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains(".")) + .forEach(f -> partitionPathPositions.put(f, + RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false))); + } + this.structName = structName; + this.structType = structType; + this.recordNamespace = recordNamespace; + } + + /** + * Fetch record key from {@link Row}. + * @param row instance of {@link Row} from which record key is requested. + * @return the record key of interest from {@link Row}. + */ + @Override + public String getRecordKey(Row row) { + if (null != converterFn) { + converterFn = AvroConversionHelper.createConverterToAvro(structType, structName, recordNamespace); + } + GenericRecord genericRecord = (GenericRecord) converterFn.apply(row); + return getKey(genericRecord).getRecordKey(); + } + + /** + * Fetch partition path from {@link Row}. + * @param row instance of {@link Row} from which partition path is requested + * @return the partition path of interest from {@link Row}. + */ + @Override + public String getPartitionPath(Row row) { + if (null != converterFn) { Review comment: if you plan to add such precondition, ensure all built in key generators call it since they might override getRecordKey(Row) and getPartitionPath(Row). ``` protected void preConditionCheckForRowInit(){ if(!isRowInitCalled()){ throw new IllegalStateException("KeyGenerator#initializeRowKeyGenerator should have been invoked before this method "); } } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org