pvillard31 commented on code in PR #11299:
URL: https://github.com/apache/nifi/pull/11299#discussion_r3335818956


##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -245,6 +249,67 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
             .dependsOn(INPUT_FORMAT, InputFormat.NDJSON, 
InputFormat.JSON_ARRAY)
             .build();
 
+    static final PropertyDescriptor RETAIN_IDENTIFIER_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Retain Identifier Field")
+            .description("""
+                    Whether to keep the Identifier Field in the document body 
after extracting it \
+                    for use as the Elasticsearch document ID. \
+                    When false (default), the field is removed from the 
document before indexing.\
+                    """)
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")

Review Comment:
   Today the Identifier Field is kept in the indexed document. Should Retain 
Identifier Field default to true so existing flows keep the same document shape 
after upgrade?



##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -245,6 +249,67 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
             .dependsOn(INPUT_FORMAT, InputFormat.NDJSON, 
InputFormat.JSON_ARRAY)
             .build();
 
+    static final PropertyDescriptor RETAIN_IDENTIFIER_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Retain Identifier Field")
+            .description("""
+                    Whether to keep the Identifier Field in the document body 
after extracting it \
+                    for use as the Elasticsearch document ID. \
+                    When false (default), the field is removed from the 
document before indexing.\
+                    """)
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .dependsOn(INPUT_FORMAT, InputFormat.NDJSON, 
InputFormat.JSON_ARRAY)
+            .build();
+
+    static final PropertyDescriptor INDEX_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Index Field")
+            .description("""
+                    The name of the field within each document to use as the 
Elasticsearch index name. \
+                    If the field is not present in a document or this property 
is left blank, \
+                    the configured Index property value is used as the 
fallback.\
+                    """)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor RETAIN_INDEX_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Retain Index Field")
+            .description("""
+                    Whether to keep the Index Field in the document body after 
extracting it \
+                    for use as the Elasticsearch index name. \
+                    When false (default), the field is removed from the 
document before indexing.\
+                    """)
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();

Review Comment:
   Should Retain Index Field use dependsOn(INDEX_FIELD), and Retain Timestamp 
Field use dependsOn(TIMESTAMP_FIELD), so the toggle is hidden when no source 
field is set?



##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -931,6 +1131,41 @@ private String resolveId(final Map<String, Object> 
contentMap, final String idAt
         return StringUtils.isNotBlank(flowFileIdAttribute) ? 
flowFileIdAttribute : null;
     }
 
+    /**
+     * Extracts the index name from a pre-parsed {@link JsonNode}.
+     * Used for JSON Array Index/Create operations where the node is already 
available.
+     * Falls back to {@code fallbackIndex} when the field is absent or blank.
+     */
+    private String extractIndex(final JsonNode node, final String indexField, 
final String fallbackIndex) {
+        if (StringUtils.isBlank(indexField)) {
+            return fallbackIndex;
+        }
+        final JsonNode indexNode = node.get(indexField);
+        if (indexNode != null && !indexNode.isNull()) {
+            final String value = indexNode.asText();
+            return StringUtils.isNotBlank(value) ? value : fallbackIndex;
+        }
+        return fallbackIndex;
+    }
+
+    /**
+     * Resolves the index name from an already-parsed content Map.
+     * Used for Update/Delete/Upsert operations and suppression-enabled 
Index/Create paths
+     * where the Map is already available. Falls back to {@code fallbackIndex} 
when the
+     * field is absent or blank.
+     */
+    private String resolveIndex(final Map<String, Object> contentMap, final 
String indexField, final String fallbackIndex) {
+        if (StringUtils.isBlank(indexField)) {
+            return fallbackIndex;
+        }
+        final Object indexObj = contentMap.get(indexField);
+        if (indexObj != null) {
+            final String value = indexObj.toString();

Review Comment:
   resolveIndex uses toString() while extractIndex uses asText(). Could a 
non-string index field value produce a different index name between the Map 
path and the JsonNode path?



##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -450,17 +526,36 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                             final IndexOperationRequest opRequest;
                             final long docBytes;
                             if (o == IndexOperationRequest.Operation.Index || 
o == IndexOperationRequest.Operation.Create) {
-                                final String id = extractId(trimmedLine, 
documentIdField, flowFileIdAttribute);
                                 final byte[] rawJsonBytes;
-                                if (suppressingWriter != null) {
-                                    // Parse to Map so NON_NULL/NON_EMPTY 
inclusion filters apply during serialization.
-                                    // JsonNode tree serialization bypasses 
JsonInclude filters.
-                                    rawJsonBytes = 
suppressingWriter.writeValueAsBytes(mapReader.readValue(trimmedLine));
+                                final String id;
+                                final String docIndex;
+                                final boolean stripId = !retainIdentifierField 
&& StringUtils.isNotBlank(documentIdField);
+                                final boolean stripIdx = !retainIndexField && 
StringUtils.isNotBlank(documentIndexField);
+                                final boolean needsTimestamp = 
StringUtils.isNotBlank(documentTimestampField);
+                                if (suppressingWriter != null || stripId || 
stripIdx || needsTimestamp) {
+                                    // Map is needed anyway — extract both 
fields from the Map directly.
+                                    final Map<String, Object> contentMap = 
mapReader.readValue(trimmedLine);
+                                    id = resolveId(contentMap, 
documentIdField, flowFileIdAttribute);
+                                    docIndex = resolveIndex(contentMap, 
documentIndexField, index);
+                                    if (stripId) {
+                                        contentMap.remove(documentIdField);
+                                    }
+                                    if (stripIdx) {
+                                        contentMap.remove(documentIndexField);
+                                    }
+                                    applyTimestamp(contentMap, 
documentTimestampField, retainTimestampField);

Review Comment:
   Can we add PutElasticsearchJsonTest coverage for the new Index Field, 
Timestamp Field, and the three Retain toggles across NDJSON, JSON Array, and 
Single JSON, so the extraction and field stripping behavior is locked in?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to