pvillard31 commented on code in PR #11299:
URL: https://github.com/apache/nifi/pull/11299#discussion_r3335818956
##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -245,6 +249,67 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
.dependsOn(INPUT_FORMAT, InputFormat.NDJSON,
InputFormat.JSON_ARRAY)
.build();
+ static final PropertyDescriptor RETAIN_IDENTIFIER_FIELD = new
PropertyDescriptor.Builder()
+ .name("Retain Identifier Field")
+ .description("""
+ Whether to keep the Identifier Field in the document body
after extracting it \
+ for use as the Elasticsearch document ID. \
+ When false (default), the field is removed from the
document before indexing.\
+ """)
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
Review Comment:
Today the Identifier Field is kept in the indexed document. Should Retain
Identifier Field default to true so existing flows keep the same document shape
after upgrade?
##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -245,6 +249,67 @@ public class PutElasticsearchJson extends
AbstractPutElasticsearch {
.dependsOn(INPUT_FORMAT, InputFormat.NDJSON,
InputFormat.JSON_ARRAY)
.build();
+ static final PropertyDescriptor RETAIN_IDENTIFIER_FIELD = new
PropertyDescriptor.Builder()
+ .name("Retain Identifier Field")
+ .description("""
+ Whether to keep the Identifier Field in the document body
after extracting it \
+ for use as the Elasticsearch document ID. \
+ When false (default), the field is removed from the
document before indexing.\
+ """)
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .dependsOn(INPUT_FORMAT, InputFormat.NDJSON,
InputFormat.JSON_ARRAY)
+ .build();
+
+ static final PropertyDescriptor INDEX_FIELD = new
PropertyDescriptor.Builder()
+ .name("Index Field")
+ .description("""
+ The name of the field within each document to use as the
Elasticsearch index name. \
+ If the field is not present in a document or this property
is left blank, \
+ the configured Index property value is used as the
fallback.\
+ """)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor RETAIN_INDEX_FIELD = new
PropertyDescriptor.Builder()
+ .name("Retain Index Field")
+ .description("""
+ Whether to keep the Index Field in the document body after
extracting it \
+ for use as the Elasticsearch index name. \
+ When false (default), the field is removed from the
document before indexing.\
+ """)
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
Review Comment:
Should Retain Index Field use dependsOn(INDEX_FIELD), and Retain Timestamp
Field use dependsOn(TIMESTAMP_FIELD), so the toggle is hidden when no source
field is set?
##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -931,6 +1131,41 @@ private String resolveId(final Map<String, Object>
contentMap, final String idAt
return StringUtils.isNotBlank(flowFileIdAttribute) ?
flowFileIdAttribute : null;
}
+ /**
+ * Extracts the index name from a pre-parsed {@link JsonNode}.
+ * Used for JSON Array Index/Create operations where the node is already
available.
+ * Falls back to {@code fallbackIndex} when the field is absent or blank.
+ */
+ private String extractIndex(final JsonNode node, final String indexField,
final String fallbackIndex) {
+ if (StringUtils.isBlank(indexField)) {
+ return fallbackIndex;
+ }
+ final JsonNode indexNode = node.get(indexField);
+ if (indexNode != null && !indexNode.isNull()) {
+ final String value = indexNode.asText();
+ return StringUtils.isNotBlank(value) ? value : fallbackIndex;
+ }
+ return fallbackIndex;
+ }
+
+ /**
+ * Resolves the index name from an already-parsed content Map.
+ * Used for Update/Delete/Upsert operations and suppression-enabled
Index/Create paths
+ * where the Map is already available. Falls back to {@code fallbackIndex}
when the
+ * field is absent or blank.
+ */
+ private String resolveIndex(final Map<String, Object> contentMap, final
String indexField, final String fallbackIndex) {
+ if (StringUtils.isBlank(indexField)) {
+ return fallbackIndex;
+ }
+ final Object indexObj = contentMap.get(indexField);
+ if (indexObj != null) {
+ final String value = indexObj.toString();
Review Comment:
resolveIndex uses toString() while extractIndex uses asText(). Could a
non-string index field value produce a different index name between the Map
path and the JsonNode path?
##########
nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java:
##########
@@ -450,17 +526,36 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
final IndexOperationRequest opRequest;
final long docBytes;
if (o == IndexOperationRequest.Operation.Index ||
o == IndexOperationRequest.Operation.Create) {
- final String id = extractId(trimmedLine,
documentIdField, flowFileIdAttribute);
final byte[] rawJsonBytes;
- if (suppressingWriter != null) {
- // Parse to Map so NON_NULL/NON_EMPTY
inclusion filters apply during serialization.
- // JsonNode tree serialization bypasses
JsonInclude filters.
- rawJsonBytes =
suppressingWriter.writeValueAsBytes(mapReader.readValue(trimmedLine));
+ final String id;
+ final String docIndex;
+ final boolean stripId = !retainIdentifierField
&& StringUtils.isNotBlank(documentIdField);
+ final boolean stripIdx = !retainIndexField &&
StringUtils.isNotBlank(documentIndexField);
+ final boolean needsTimestamp =
StringUtils.isNotBlank(documentTimestampField);
+ if (suppressingWriter != null || stripId ||
stripIdx || needsTimestamp) {
+ // Map is needed anyway — extract both
fields from the Map directly.
+ final Map<String, Object> contentMap =
mapReader.readValue(trimmedLine);
+ id = resolveId(contentMap,
documentIdField, flowFileIdAttribute);
+ docIndex = resolveIndex(contentMap,
documentIndexField, index);
+ if (stripId) {
+ contentMap.remove(documentIdField);
+ }
+ if (stripIdx) {
+ contentMap.remove(documentIndexField);
+ }
+ applyTimestamp(contentMap,
documentTimestampField, retainTimestampField);
Review Comment:
Can we add PutElasticsearchJsonTest coverage for the new Index Field,
Timestamp Field, and the three Retain toggles across NDJSON, JSON Array, and
Single JSON, so the extraction and field stripping behavior is locked in?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]