Lehel44 commented on a change in pull request #5038:
URL: https://github.com/apache/nifi/pull/5038#discussion_r644112482



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
##########
@@ -149,6 +152,241 @@ public void createRecordPaths(final ProcessContext 
context) {
         this.recordPaths = recordPaths;
     }
 
+    private static class RecordWrapper {
+        private Record originalRecord;
+        private Record currentRecord;
+        private Path path;
+        private Map<RecordField, FieldValue> actualFields = new HashMap<>(); 
// The record might have fewer fields than its schema
+
+        public RecordWrapper(Record record, String path) {
+            this.originalRecord = record;
+            this.path = new Path(path);
+
+            if (this.path.isEmpty()) {
+                this.currentRecord = originalRecord;
+            } else {
+                this.currentRecord = getRecordByPath(this.path.toString() + 
"/*");
+            }
+
+            if (this.currentRecord != null) {
+                List<FieldValue> listOfActualFields = 
getSelectedFields(this.path + "/*");
+                listOfActualFields.forEach(field -> 
actualFields.put(field.getField(), field));
+            }
+        }
+
+        public boolean recordExists() {
+            return currentRecord != null;
+        }
+
+        public Record getRecord() {
+            return currentRecord;
+        }
+
+        public boolean actuallyContainsField(RecordField field) {
+            return actualFields.containsKey(field);
+        }
+
+        public FieldValue getActualFieldValue(RecordField field) {
+            return actualFields.get(field);
+        }
+
+        public String getPath() {
+            return path.toString();
+        }
+
+        public Record getOriginalRecord() {
+            return originalRecord;
+        }
+
+        private Record getRecordByPath(String recordPath) {
+            Record resultRecord;
+            Optional<FieldValue> fieldValueOption = 
RecordPath.compile(recordPath).evaluate(originalRecord).getSelectedFields().findAny();
+            if (fieldValueOption.isPresent()) {
+                resultRecord = getParentRecordOfField(fieldValueOption.get());
+            } else {
+                Path newPath = new Path(recordPath); // Cutting the "/*" from 
the end.
+                if (newPath.isEmpty()) {
+                    resultRecord = originalRecord;
+                } else {
+                    resultRecord = getTargetAsRecord(newPath.toString());
+                }
+            }
+            return resultRecord;
+        }
+
+        private Record getParentRecordOfField(FieldValue fieldValue) {
+            Optional<Record> parentRecordOption = fieldValue.getParentRecord();
+            if (parentRecordOption.isPresent()) {
+                return parentRecordOption.get();
+            } else {
+                return originalRecord;
+            }
+        }
+
+        private Record getTargetAsRecord(String path) {
+            Optional<FieldValue> targetFieldOption = 
RecordPath.compile(path).evaluate(originalRecord).getSelectedFields().findAny();
+            //TODO: is it possible that there are two elements in a record 
with the same path ?
+            if (targetFieldOption.isPresent()) {
+                FieldValue targetField = targetFieldOption.get();
+                return getFieldValueAsRecord(targetField);
+            } else {
+                return null;
+            }
+        }
+
+        // Returns null if fieldValue is not of type Record
+        private Record getFieldValueAsRecord(FieldValue fieldValue) {
+            Object targetObject = fieldValue.getValue();
+            if (targetObject instanceof Record) {
+                return ((Record) targetObject);
+            } else {
+                return null;
+            }
+        }
+
+        private List<FieldValue> getSelectedFields(String path) {
+            RecordPath recordPath = RecordPath.compile(path);
+            RecordPathResult recordPathResult = 
recordPath.evaluate(originalRecord);
+            return 
recordPathResult.getSelectedFields().collect(Collectors.toList());
+        }
+    }
+
+    private static class Path {
+        private final String path;
+
+        public Path(String path) {
+            if (path.length() == 0 || path.equals("/") || path.equals("/*")) {
+                this.path = "";
+            }  else if (path.endsWith("/")) {
+                this.path = path.substring(0, path.length() - 1);
+            } else if (path.endsWith("/*")) {
+                this.path = path.substring(0, path.length() - 2);
+            } else {
+                this.path = path;
+            }
+        }
+
+        public boolean isEmpty() {
+            return "".equals(path);
+        }
+
+        @Override
+        public String toString() {
+            return path;
+        }
+    }
+
+
+    private Record remove(Record record, List<FieldValue> fieldsToRemove) {
+        if (fieldsToRemove.isEmpty()) {
+            return record;
+        } else {
+            RecordWrapper wrappedRecord = new RecordWrapper(record, "/");
+            List<String> fieldsToRemoveWithPath = 
mapFieldValueToPath(fieldsToRemove);
+            return remove(wrappedRecord, fieldsToRemove, 
fieldsToRemoveWithPath);
+        }
+    }
+
+    private List<String> mapFieldValueToPath(List<FieldValue> fieldValues) {
+        List<String> paths = new ArrayList<>();
+        fieldValues.forEach(field -> paths.add(getAbsolutePath(field)));
+        return paths;
+    }
+
+    private String getAbsolutePath(FieldValue fieldValue) {
+        if (!fieldValue.getParent().isPresent()) {
+            return "";
+        }
+        if 
("root".equals(fieldValue.getParent().get().getField().getFieldName())) {
+            return "/" + fieldValue.getField().getFieldName();
+        }
+        return getAbsolutePath(fieldValue.getParent().get()) + "/" + 
fieldValue.getField().getFieldName();
+    }
+
+    private Record remove(RecordWrapper wrappedRecord, List<FieldValue> 
fieldsToRemove, List<String> fieldsToRemoveWithPath) {
+        List<RecordField> newSchemaFieldList = new ArrayList<>();
+        Map<String, Object> newRecordMap = new LinkedHashMap<>();
+
+        for (RecordField schemaField : 
wrappedRecord.getRecord().getSchema().getFields()) { // Iterate the fields of 
the record's schema
+            if (wrappedRecord.actuallyContainsField(schemaField)) { // The 
record actually has data or null for that field
+                handleFieldInSchemaAndData(schemaField, wrappedRecord, 
fieldsToRemove, fieldsToRemoveWithPath, newSchemaFieldList, newRecordMap);
+            } else { // The record does not even contain the field (no data 
for it). We still need to check if it needs to be deleted from the schema.
+                handleFieldInSchema(schemaField, wrappedRecord, 
fieldsToRemove, newSchemaFieldList, fieldsToRemoveWithPath);
+            }
+        }
+
+        RecordSchema newSchema = new SimpleRecordSchema(newSchemaFieldList);
+        Record newRecord = new MapRecord(newSchema, newRecordMap);

Review comment:
       I think you can return here with the method call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to