[nifi-minifi-cpp] branch master updated: MINIFICPP-854: Capture RTSP Frame
This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git The following commit(s) were added to refs/heads/master by this push: new a140a72 MINIFICPP-854: Capture RTSP Frame a140a72 is described below commit a140a7291fed28f46af3c61edea6e8feb4d044a5 Author: Nghia Le AuthorDate: Tue Jul 16 11:03:18 2019 +0200 MINIFICPP-854: Capture RTSP Frame Signed-off-by: Arpad Boda This closes #612 --- CMakeLists.txt | 2 +- extensions/opencv/CaptureRTSPFrame.cpp | 33 ++--- extensions/opencv/CaptureRTSPFrame.h | 2 + extensions/opencv/OpenCVLoader.cpp | 31 extensions/opencv/OpenCVLoader.h | 85 ++ .../opencv/tests}/CMakeLists.txt | 23 -- .../opencv/tests}/CaptureRTSPFrameTest.cpp | 69 +++--- 7 files changed, 199 insertions(+), 46 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d37ebf4..4965605 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -530,7 +530,7 @@ endif() ## OpenCV Extesions option(ENABLE_OPENCV "Disables the OpenCV extensions." OFF) if (ENABLE_OPENCV) - createExtension(OPENCV-EXTENSIONS "OPENCV EXTENSIONS" "This enabled OpenCV support" "extensions/opencv" "${TEST_DIR}/opencv-tests") + createExtension(OPENCV-EXTENSIONS "OPENCV EXTENSIONS" "This enabled OpenCV support" "extensions/opencv" "extensions/opencv/tests") endif() ## Bustache/template extensions diff --git a/extensions/opencv/CaptureRTSPFrame.cpp b/extensions/opencv/CaptureRTSPFrame.cpp index 445e3fb..50fac73 100644 --- a/extensions/opencv/CaptureRTSPFrame.cpp +++ b/extensions/opencv/CaptureRTSPFrame.cpp @@ -116,17 +116,35 @@ void CaptureRTSPFrame::onSchedule(core::ProcessContext *context, core::ProcessSe rtspURI.append(rtsp_uri_); } - cv::VideoCapture capture(rtspURI.c_str()); - video_capture_ = capture; - video_backend_driver_ = video_capture_.getBackendName(); + rtsp_url_ = rtspURI; + } void CaptureRTSPFrame::onTrigger(const std::shared_ptr &context, const std::shared_ptr &session) { - auto flow_file = session->create(); - cv::Mat frame; + std::unique_lock lock(mutex_, std::try_to_lock); + if (!lock.owns_lock()) { +logger_->log_info("Cannot process due to an unfinished onTrigger"); +context->yield(); +return; + } + try { +video_capture_.open(rtsp_url_); +video_backend_driver_ = video_capture_.getBackendName(); + } catch (const cv::Exception &e) { +logger_->log_error("Unable to open RTSP stream: %s", e.what()); +context->yield(); +return; + } catch (...) { +logger_->log_error("Unable to open RTSP stream: unhandled exception"); +context->yield(); +return; + } + + auto flow_file = session->create(); + cv::Mat frame; // retrieve a frame of your source if (video_capture_.read(frame)) { if (!frame.empty()) { @@ -145,6 +163,7 @@ void CaptureRTSPFrame::onTrigger(const std::shared_ptr &co session->write(flow_file, &write_cb); session->transfer(flow_file, Success); + logger_->log_info("A frame is captured"); } else { logger_->log_error("Empty Mat frame received from capture"); session->transfer(flow_file, Failure); @@ -154,13 +173,9 @@ void CaptureRTSPFrame::onTrigger(const std::shared_ptr &co session->transfer(flow_file, Failure); } - frame.release(); - } void CaptureRTSPFrame::notifyStop() { - // Release the Capture reference and free up resources. - video_capture_.release(); } } /* namespace processors */ diff --git a/extensions/opencv/CaptureRTSPFrame.h b/extensions/opencv/CaptureRTSPFrame.h index e3ac740..537d672 100644 --- a/extensions/opencv/CaptureRTSPFrame.h +++ b/extensions/opencv/CaptureRTSPFrame.h @@ -84,11 +84,13 @@ class CaptureRTSPFrame : public core::Processor { private: std::shared_ptr logger_; + std::mutex mutex_; std::string rtsp_username_; std::string rtsp_password_; std::string rtsp_host_; std::string rtsp_port_; std::string rtsp_uri_; + std::string rtsp_url_; cv::VideoCapture video_capture_; std::string image_encoding_; std::string video_backend_driver_; diff --git a/extensions/opencv/OpenCVLoader.cpp b/extensions/opencv/OpenCVLoader.cpp new file mode 100644 index 000..eb7cfba --- /dev/null +++ b/extensions/opencv/OpenCVLoader.cpp @@ -0,0 +1,31 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a
[nifi-minifi-cpp] branch master updated: MINIFICPP-732 - Add property to expose librdkafka "debug" values for PublishKafka
This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git The following commit(s) were added to refs/heads/master by this push: new cd0be87 MINIFICPP-732 - Add property to expose librdkafka "debug" values for PublishKafka cd0be87 is described below commit cd0be87f36cd348510dd614eb786dac45b394592 Author: Nghia Le AuthorDate: Wed Jul 17 11:23:27 2019 +0200 MINIFICPP-732 - Add property to expose librdkafka "debug" values for PublishKafka Signed-off-by: Arpad Boda This closes #614 --- extensions/librdkafka/PublishKafka.cpp | 14 +++--- extensions/librdkafka/PublishKafka.h | 1 + 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp index a1887a2..75cb55c 100644 --- a/extensions/librdkafka/PublishKafka.cpp +++ b/extensions/librdkafka/PublishKafka.cpp @@ -79,6 +79,7 @@ core::Property PublishKafka::KerberosKeytabPath("Kerberos Keytab Path", core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a field in the Input Records that should be used as the Key for the Kafka message.\n" "Supports Expression Language: true (will be evaluated using flow file attributes)", ""); +core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable. Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", ""); core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship"); core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship"); @@ -107,6 +108,7 @@ void PublishKafka::initialize() { properties.insert(KerberosPrincipal); properties.insert(KerberosKeytabPath); properties.insert(MessageKeyField); + properties.insert(DebugContexts); setSupportedProperties(properties); // Set the supported relationships std::set relationships; @@ -129,9 +131,16 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr auto key = conn->getKey(); + if (context->getProperty(DebugContexts.getName(), value) && !value.empty()) { +rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr, sizeof(errstr)); +logger_->log_debug("PublishKafka: debug properties [%s]", value); +if (result != RD_KAFKA_CONF_OK) + logger_->log_error("PublishKafka: configure debug properties error result [%s]", errstr); + } + if (!key->brokers_.empty()) { result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr, sizeof(errstr)); -logger_->log_debug("PublishKafka: bootstrap.servers [%s]", value); +logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_.c_str()); if (result != RD_KAFKA_CONF_OK) logger_->log_error("PublishKafka: configure error result [%s]", errstr); } else { @@ -141,7 +150,7 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr if (!key->client_id_.empty()) { rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr, sizeof(errstr)); -logger_->log_debug("PublishKafka: client.id [%s]", value); +logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_.c_str()); if (result != RD_KAFKA_CONF_OK) logger_->log_error("PublishKafka: configure error result [%s]", errstr); } else { @@ -232,7 +241,6 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr); } value = ""; - value = ""; if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { if (value == SECURITY_PROTOCOL_SSL) { rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, sizeof(errstr)); diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h index 8a23dee..8f6806f 100644 --- a/extensions/librdkafka/PublishKafka.h +++ b/extensions/librdkafka/PublishKafka.h @@ -338,6 +338,7 @@ class PublishKafka : public core::Processor { static core::Property KerberosPrincipal; static core::Property KerberosKeytabPath; static core::Property MessageKeyField; + static core::Property DebugContexts; // Supported Relationships static core::Relationship Failure;
[nifi] branch master updated: NIFI-6407 - added support for useAvroLogicalTypes in PutBigQueryBatch
This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git The following commit(s) were added to refs/heads/master by this push: new 639e81e NIFI-6407 - added support for useAvroLogicalTypes in PutBigQueryBatch 639e81e is described below commit 639e81e5a12f5fccb35a581e009eee5bcdb4bff6 Author: Pierre Villard AuthorDate: Fri Jul 19 14:20:27 2019 +0200 NIFI-6407 - added support for useAvroLogicalTypes in PutBigQueryBatch fix Maven dep This closes #3592. Signed-off-by: Koji Kawamura --- .../nifi/processors/gcp/bigquery/BigQueryAttributes.java | 5 + .../apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java | 11 +++ .../nifi-gcp-bundle/nifi-gcp-services-api/pom.xml | 2 +- nifi-nar-bundles/nifi-gcp-bundle/pom.xml | 2 +- 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java index 842a176..81978eb 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java @@ -97,6 +97,11 @@ public class BigQueryAttributes { + "will skip when reading the data. The default value is 0. This property is useful if you have header rows in the " + "file that should be skipped."; +public static final String AVRO_USE_LOGICAL_TYPES_ATTR = "bq.avro.use.logical.types"; +public static final String AVRO_USE_LOGICAL_TYPES_DESC = "If format is set to Avro and if this option is set to true, you " ++ "can interpret logical types into their corresponding types (such as TIMESTAMP) instead of only using their raw " ++ "types (such as INTEGER)."; + // Batch Attributes diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java index 5068ab5..5446c20 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java @@ -214,6 +214,15 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); +public static final PropertyDescriptor AVRO_USE_LOGICAL_TYPES = new PropertyDescriptor.Builder() +.name(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_ATTR) +.displayName("Avro Input - Use Logical Types") +.description(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_DESC) +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + @Override public List getSupportedPropertyDescriptors() { return ImmutableList.builder() @@ -229,6 +238,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .add(CSV_FIELD_DELIMITER) .add(CSV_QUOTE) .add(CSV_SKIP_LEADING_ROWS) +.add(AVRO_USE_LOGICAL_TYPES) .build(); } @@ -280,6 +290,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue())) .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue())) .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean()) + .setUseAvroLogicalTypes(context.getProperty(AVRO_USE_LOGICAL_TYPES).asBoolean()) .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger()) .setSchema(schema) .setFormatOptions(formatOption) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml index 671474b..ea79c10 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml @@ -33,7 +33,7 @@ com.google.auth google-auth-library-oauth2-http -
[nifi] branch master updated: NIFI-6487 Add S3 User Metadata to ListS3 processor
This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git The following commit(s) were added to refs/heads/master by this push: new cdee1d8 NIFI-6487 Add S3 User Metadata to ListS3 processor cdee1d8 is described below commit cdee1d8c09c52dd2aa78311cf8c8fe9b2f449d44 Author: JF Beauvais AuthorDate: Thu Jul 25 14:20:35 2019 +0200 NIFI-6487 Add S3 User Metadata to ListS3 processor Fix imports auto formatted by intellij NIFI-6487 Fix WriteAttribute documentation This closes #3603. Signed-off-by: Koji Kawamura --- .../org/apache/nifi/processors/aws/s3/ListS3.java | 46 + .../nifi/processors/aws/s3/AbstractS3IT.java | 9 .../apache/nifi/processors/aws/s3/ITListS3.java| 32 +++ .../apache/nifi/processors/aws/s3/TestListS3.java | 48 ++ 4 files changed, 117 insertions(+), 18 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index d3bade9..fb4e49f 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -26,9 +26,19 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.GetObjectTaggingRequest; import com.amazonaws.services.s3.model.GetObjectTaggingResult; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.ListVersionsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.S3VersionSummary; import com.amazonaws.services.s3.model.Tag; +import com.amazonaws.services.s3.model.VersionListing; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; @@ -53,14 +63,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ListVersionsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.amazonaws.services.s3.model.S3VersionSummary; -import com.amazonaws.services.s3.model.VersionListing; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.ListObjectsV2Result; @PrimaryNodeOnly @TriggerSerially @@ -85,6 +87,8 @@ import com.amazonaws.services.s3.model.ListObjectsV2Result; @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"), @WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable"), @WritesAttribute(attribute = "s3.tag.___", description = "If 'Write Object Tags' is set to 'True', the tags associated to the S3 object that is being listed " + +"will be written as part of the flowfile attributes"), +@WritesAttribute(attribute = "s3.user.metadata.___", description = "If 'Write User Metadata' is set to 'True', the user defined metadata associated to the S3 object that is being listed " + "will be written as part of the flowfile attributes")}) @SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class}) public class ListS3 extends AbstractS3Processor { @@ -150,8 +154,18 @@ public class ListS3 extends AbstractS3Processor { .defaultValue("false") .build(); +public static final PropertyDescriptor WRITE_USER_METADATA = new PropertyDescriptor.Builder() +.name("write-s3-user-metadata") +.displayName("Write User Metadata") +.description("If set to 'True', the user defined metadata associated with the S3 object will be written as FlowFile attributes") +.required(true) +.allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False")) +.defaultValue("false") +.build(); + + public static final List properties = Collections.unmodifiableList( -Arrays.asList(BU
[nifi] branch master updated: NIFI-6490 MergeRecord supports Variable Registry for MIN_RECORDS and MAX_RECORDS
This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git The following commit(s) were added to refs/heads/master by this push: new 1d22e8a NIFI-6490 MergeRecord supports Variable Registry for MIN_RECORDS and MAX_RECORDS 1d22e8a is described below commit 1d22e8a86d001540bb86d017a9139393778628cb Author: Alessandro D'Armiento AuthorDate: Fri Jul 26 17:14:11 2019 +0200 NIFI-6490 MergeRecord supports Variable Registry for MIN_RECORDS and MAX_RECORDS Unified unit tests Added custom validation cases for MIN_RECORDS and MAX_RECORDS enforcing they are greater than zero. While MIN_RECORDS > 0 can fail individually, MAX_RECORDS > 0 validator cannot fail without having also at least another validation step (either the MIN_RECORDS > 0 or the MAX_RECORDS > MIN_RECORDS) to fail, since MIN_RECORDS is a required property with default value 1 This closes #3607. Signed-off-by: Koji Kawamura --- .../nifi/processors/standard/MergeRecord.java | 22 - .../standard/merge/RecordBinManager.java | 4 +- .../nifi/processors/standard/TestMergeRecord.java | 100 + 3 files changed, 122 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java index 130d6b6..797359e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java @@ -179,6 +179,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor { .required(true) .defaultValue("1") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor MAX_RECORDS = new PropertyDescriptor.Builder() .name("max-records") @@ -188,6 +189,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor { .required(false) .defaultValue("1000") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder() .name("max.bin.count") @@ -268,8 +270,8 @@ public class MergeRecord extends AbstractSessionFactoryProcessor { protected Collection customValidate(final ValidationContext validationContext) { final List results = new ArrayList<>(); -final Integer minRecords = validationContext.getProperty(MIN_RECORDS).asInteger(); -final Integer maxRecords = validationContext.getProperty(MAX_RECORDS).asInteger(); +final Integer minRecords = validationContext.getProperty(MIN_RECORDS).evaluateAttributeExpressions().asInteger(); +final Integer maxRecords = validationContext.getProperty(MAX_RECORDS).evaluateAttributeExpressions().asInteger(); if (minRecords != null && maxRecords != null && maxRecords < minRecords) { results.add(new ValidationResult.Builder() .subject("Max Records") @@ -278,6 +280,22 @@ public class MergeRecord extends AbstractSessionFactoryProcessor { .explanation(" property cannot be smaller than property") .build()); } +if (minRecords != null && minRecords <= 0) { +results.add(new ValidationResult.Builder() +.subject("Min Records") +.input(String.valueOf(minRecords)) +.valid(false) +.explanation(" property cannot be negative or zero") +.build()); +} +if (maxRecords != null && maxRecords <= 0) { +results.add(new ValidationResult.Builder() +.subject("Max Records") +.input(String.valueOf(maxRecords)) +.valid(false) +.explanation(" property cannot be negative or zero") +.build()); +} final Double minSize = validationContext.getProperty(MIN_SIZE).asDataSize(DataUnit.B); final Double maxSize = validationContext.getProperty(MAX_SIZE).asDataSize(DataUnit.B); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache