[nifi-minifi-cpp] branch master updated: MINIFICPP-854: Capture RTSP Frame

2019-07-31 Thread aboda
This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
 new a140a72  MINIFICPP-854: Capture RTSP Frame
a140a72 is described below

commit a140a7291fed28f46af3c61edea6e8feb4d044a5
Author: Nghia Le 
AuthorDate: Tue Jul 16 11:03:18 2019 +0200

MINIFICPP-854: Capture RTSP Frame

Signed-off-by: Arpad Boda 

This closes #612
---
 CMakeLists.txt |  2 +-
 extensions/opencv/CaptureRTSPFrame.cpp | 33 ++---
 extensions/opencv/CaptureRTSPFrame.h   |  2 +
 extensions/opencv/OpenCVLoader.cpp | 31 
 extensions/opencv/OpenCVLoader.h   | 85 ++
 .../opencv/tests}/CMakeLists.txt   | 23 --
 .../opencv/tests}/CaptureRTSPFrameTest.cpp | 69 +++---
 7 files changed, 199 insertions(+), 46 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index d37ebf4..4965605 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -530,7 +530,7 @@ endif()
 ## OpenCV Extesions
 option(ENABLE_OPENCV "Disables the OpenCV extensions." OFF)
 if (ENABLE_OPENCV)
-   createExtension(OPENCV-EXTENSIONS "OPENCV EXTENSIONS" "This enabled 
OpenCV support" "extensions/opencv" "${TEST_DIR}/opencv-tests")
+   createExtension(OPENCV-EXTENSIONS "OPENCV EXTENSIONS" "This enabled 
OpenCV support" "extensions/opencv" "extensions/opencv/tests")
 endif()
 
 ## Bustache/template extensions
diff --git a/extensions/opencv/CaptureRTSPFrame.cpp 
b/extensions/opencv/CaptureRTSPFrame.cpp
index 445e3fb..50fac73 100644
--- a/extensions/opencv/CaptureRTSPFrame.cpp
+++ b/extensions/opencv/CaptureRTSPFrame.cpp
@@ -116,17 +116,35 @@ void CaptureRTSPFrame::onSchedule(core::ProcessContext 
*context, core::ProcessSe
 rtspURI.append(rtsp_uri_);
   }
 
-  cv::VideoCapture capture(rtspURI.c_str());
-  video_capture_ = capture;
-  video_backend_driver_ = video_capture_.getBackendName();
+  rtsp_url_ = rtspURI;
+
 }
 
 void CaptureRTSPFrame::onTrigger(const std::shared_ptr 
&context,
  const std::shared_ptr 
&session) {
-  auto flow_file = session->create();
 
-  cv::Mat frame;
+  std::unique_lock lock(mutex_, std::try_to_lock);
+  if (!lock.owns_lock()) {
+logger_->log_info("Cannot process due to an unfinished onTrigger");
+context->yield();
+return;
+  }
 
+  try {
+video_capture_.open(rtsp_url_);
+video_backend_driver_ = video_capture_.getBackendName();
+  } catch (const cv::Exception &e) {
+logger_->log_error("Unable to open RTSP stream: %s", e.what());
+context->yield();
+return;
+  } catch (...) {
+logger_->log_error("Unable to open RTSP stream: unhandled exception");
+context->yield();
+return;
+  }
+
+  auto flow_file = session->create();
+  cv::Mat frame;
   // retrieve a frame of your source
   if (video_capture_.read(frame)) {
 if (!frame.empty()) {
@@ -145,6 +163,7 @@ void CaptureRTSPFrame::onTrigger(const 
std::shared_ptr &co
 
   session->write(flow_file, &write_cb);
   session->transfer(flow_file, Success);
+  logger_->log_info("A frame is captured");
 } else {
   logger_->log_error("Empty Mat frame received from capture");
   session->transfer(flow_file, Failure);
@@ -154,13 +173,9 @@ void CaptureRTSPFrame::onTrigger(const 
std::shared_ptr &co
 session->transfer(flow_file, Failure);
   }
 
-  frame.release();
-
 }
 
 void CaptureRTSPFrame::notifyStop() {
-  // Release the Capture reference and free up resources.
-  video_capture_.release();
 }
 
 } /* namespace processors */
diff --git a/extensions/opencv/CaptureRTSPFrame.h 
b/extensions/opencv/CaptureRTSPFrame.h
index e3ac740..537d672 100644
--- a/extensions/opencv/CaptureRTSPFrame.h
+++ b/extensions/opencv/CaptureRTSPFrame.h
@@ -84,11 +84,13 @@ class CaptureRTSPFrame : public core::Processor {
 
  private:
   std::shared_ptr logger_;
+  std::mutex mutex_;
   std::string rtsp_username_;
   std::string rtsp_password_;
   std::string rtsp_host_;
   std::string rtsp_port_;
   std::string rtsp_uri_;
+  std::string rtsp_url_;
   cv::VideoCapture video_capture_;
   std::string image_encoding_;
   std::string video_backend_driver_;
diff --git a/extensions/opencv/OpenCVLoader.cpp 
b/extensions/opencv/OpenCVLoader.cpp
new file mode 100644
index 000..eb7cfba
--- /dev/null
+++ b/extensions/opencv/OpenCVLoader.cpp
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a

[nifi-minifi-cpp] branch master updated: MINIFICPP-732 - Add property to expose librdkafka "debug" values for PublishKafka

2019-07-31 Thread aboda
This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
 new cd0be87  MINIFICPP-732 - Add property to expose librdkafka "debug" 
values for PublishKafka
cd0be87 is described below

commit cd0be87f36cd348510dd614eb786dac45b394592
Author: Nghia Le 
AuthorDate: Wed Jul 17 11:23:27 2019 +0200

MINIFICPP-732 - Add property to expose librdkafka "debug" values for 
PublishKafka

Signed-off-by: Arpad Boda 

This closes #614
---
 extensions/librdkafka/PublishKafka.cpp | 14 +++---
 extensions/librdkafka/PublishKafka.h   |  1 +
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/extensions/librdkafka/PublishKafka.cpp 
b/extensions/librdkafka/PublishKafka.cpp
index a1887a2..75cb55c 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -79,6 +79,7 @@ core::Property PublishKafka::KerberosKeytabPath("Kerberos 
Keytab Path",
 core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of 
a field in the Input Records that should be used as the Key for the Kafka 
message.\n"
  "Supports Expression Language: 
true (will be evaluated using flow file attributes)",
  "");
+core::Property PublishKafka::DebugContexts("Debug contexts", "A 
comma-separated list of debug contexts to enable. Including: generic, broker, 
topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, 
interceptor, plugin, consumer, admin, eos, all", "");
 core::Relationship PublishKafka::Success("success", "Any FlowFile that is 
successfully sent to Kafka will be routed to this Relationship");
 core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot 
be sent to Kafka will be routed to this Relationship");
 
@@ -107,6 +108,7 @@ void PublishKafka::initialize() {
   properties.insert(KerberosPrincipal);
   properties.insert(KerberosKeytabPath);
   properties.insert(MessageKeyField);
+  properties.insert(DebugContexts);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set relationships;
@@ -129,9 +131,16 @@ bool PublishKafka::configureNewConnection(const 
std::shared_ptr
 
   auto key = conn->getKey();
 
+  if (context->getProperty(DebugContexts.getName(), value) && !value.empty()) {
+rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr, sizeof(errstr));
+logger_->log_debug("PublishKafka: debug properties [%s]", value);
+if (result != RD_KAFKA_CONF_OK)
+  logger_->log_error("PublishKafka: configure debug properties error 
result [%s]", errstr);
+  }
+
   if (!key->brokers_.empty()) {
 result = rd_kafka_conf_set(conf_, "bootstrap.servers", 
key->brokers_.c_str(), errstr, sizeof(errstr));
-logger_->log_debug("PublishKafka: bootstrap.servers [%s]", value);
+logger_->log_debug("PublishKafka: bootstrap.servers [%s]", 
key->brokers_.c_str());
 if (result != RD_KAFKA_CONF_OK)
   logger_->log_error("PublishKafka: configure error result [%s]", errstr);
   } else {
@@ -141,7 +150,7 @@ bool PublishKafka::configureNewConnection(const 
std::shared_ptr
 
   if (!key->client_id_.empty()) {
 rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr, 
sizeof(errstr));
-logger_->log_debug("PublishKafka: client.id [%s]", value);
+logger_->log_debug("PublishKafka: client.id [%s]", 
key->client_id_.c_str());
 if (result != RD_KAFKA_CONF_OK)
   logger_->log_error("PublishKafka: configure error result [%s]", errstr);
   } else {
@@ -232,7 +241,6 @@ bool PublishKafka::configureNewConnection(const 
std::shared_ptr
   logger_->log_error("PublishKafka: configure compression codec error 
result [%s]", errstr);
   }
   value = "";
-  value = "";
   if (context->getProperty(SecurityProtocol.getName(), value) && 
!value.empty()) {
 if (value == SECURITY_PROTOCOL_SSL) {
   rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, 
sizeof(errstr));
diff --git a/extensions/librdkafka/PublishKafka.h 
b/extensions/librdkafka/PublishKafka.h
index 8a23dee..8f6806f 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -338,6 +338,7 @@ class PublishKafka : public core::Processor {
   static core::Property KerberosPrincipal;
   static core::Property KerberosKeytabPath;
   static core::Property MessageKeyField;
+  static core::Property DebugContexts;
 
   // Supported Relationships
   static core::Relationship Failure;



[nifi] branch master updated: NIFI-6407 - added support for useAvroLogicalTypes in PutBigQueryBatch

2019-07-31 Thread ijokarumawak
This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
 new 639e81e  NIFI-6407 - added support for useAvroLogicalTypes in 
PutBigQueryBatch
639e81e is described below

commit 639e81e5a12f5fccb35a581e009eee5bcdb4bff6
Author: Pierre Villard 
AuthorDate: Fri Jul 19 14:20:27 2019 +0200

NIFI-6407 - added support for useAvroLogicalTypes in PutBigQueryBatch

fix Maven dep

This closes #3592.

Signed-off-by: Koji Kawamura 
---
 .../nifi/processors/gcp/bigquery/BigQueryAttributes.java  |  5 +
 .../apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java | 11 +++
 .../nifi-gcp-bundle/nifi-gcp-services-api/pom.xml |  2 +-
 nifi-nar-bundles/nifi-gcp-bundle/pom.xml  |  2 +-
 4 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
index 842a176..81978eb 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
@@ -97,6 +97,11 @@ public class BigQueryAttributes {
 + "will skip when reading the data. The default value is 0. This 
property is useful if you have header rows in the "
 + "file that should be skipped.";
 
+public static final String AVRO_USE_LOGICAL_TYPES_ATTR = 
"bq.avro.use.logical.types";
+public static final String AVRO_USE_LOGICAL_TYPES_DESC = "If format is set 
to Avro and if this option is set to true, you "
++ "can interpret logical types into their corresponding types 
(such as TIMESTAMP) instead of only using their raw "
++ "types (such as INTEGER).";
+
 
 
 // Batch Attributes
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
index 5068ab5..5446c20 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
@@ -214,6 +214,15 @@ public class PutBigQueryBatch extends 
AbstractBigQueryProcessor {
 
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
 .build();
 
+public static final PropertyDescriptor AVRO_USE_LOGICAL_TYPES = new 
PropertyDescriptor.Builder()
+.name(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_ATTR)
+.displayName("Avro Input - Use Logical Types")
+.description(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_DESC)
+.required(true)
+.allowableValues("true", "false")
+.defaultValue("false")
+.build();
+
 @Override
 public List getSupportedPropertyDescriptors() {
 return ImmutableList.builder()
@@ -229,6 +238,7 @@ public class PutBigQueryBatch extends 
AbstractBigQueryProcessor {
 .add(CSV_FIELD_DELIMITER)
 .add(CSV_QUOTE)
 .add(CSV_SKIP_LEADING_ROWS)
+.add(AVRO_USE_LOGICAL_TYPES)
 .build();
 }
 
@@ -280,6 +290,7 @@ public class PutBigQueryBatch extends 
AbstractBigQueryProcessor {
 
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
 
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
 
.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
+
.setUseAvroLogicalTypes(context.getProperty(AVRO_USE_LOGICAL_TYPES).asBoolean())
 
.setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
 .setSchema(schema)
 .setFormatOptions(formatOption)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
index 671474b..ea79c10 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
@@ -33,7 +33,7 @@
 
 com.google.auth
 google-auth-library-oauth2-http
-  

[nifi] branch master updated: NIFI-6487 Add S3 User Metadata to ListS3 processor

2019-07-31 Thread ijokarumawak
This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
 new cdee1d8  NIFI-6487 Add S3 User Metadata to ListS3 processor
cdee1d8 is described below

commit cdee1d8c09c52dd2aa78311cf8c8fe9b2f449d44
Author: JF Beauvais 
AuthorDate: Thu Jul 25 14:20:35 2019 +0200

NIFI-6487 Add S3 User Metadata to ListS3 processor

Fix imports auto formatted by intellij

NIFI-6487 Fix WriteAttribute documentation

This closes #3603.

Signed-off-by: Koji Kawamura 
---
 .../org/apache/nifi/processors/aws/s3/ListS3.java  | 46 +
 .../nifi/processors/aws/s3/AbstractS3IT.java   |  9 
 .../apache/nifi/processors/aws/s3/ITListS3.java| 32 +++
 .../apache/nifi/processors/aws/s3/TestListS3.java  | 48 ++
 4 files changed, 117 insertions(+), 18 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index d3bade9..fb4e49f 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -26,9 +26,19 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
 import com.amazonaws.services.s3.model.GetObjectTaggingResult;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ListVersionsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.S3VersionSummary;
 import com.amazonaws.services.s3.model.Tag;
+import com.amazonaws.services.s3.model.VersionListing;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -53,14 +63,6 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.ListObjectsRequest;
-import com.amazonaws.services.s3.model.ListVersionsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-import com.amazonaws.services.s3.model.S3VersionSummary;
-import com.amazonaws.services.s3.model.VersionListing;
-import com.amazonaws.services.s3.model.ListObjectsV2Request;
-import com.amazonaws.services.s3.model.ListObjectsV2Result;
 
 @PrimaryNodeOnly
 @TriggerSerially
@@ -85,6 +87,8 @@ import com.amazonaws.services.s3.model.ListObjectsV2Result;
 @WritesAttribute(attribute = "s3.storeClass", description = "The 
storage class of the object"),
 @WritesAttribute(attribute = "s3.version", description = "The version 
of the object, if applicable"),
 @WritesAttribute(attribute = "s3.tag.___", description = "If 'Write 
Object Tags' is set to 'True', the tags associated to the S3 object that is 
being listed " +
+"will be written as part of the flowfile attributes"),
+@WritesAttribute(attribute = "s3.user.metadata.___", description = "If 
'Write User Metadata' is set to 'True', the user defined metadata associated to 
the S3 object that is being listed " +
 "will be written as part of the flowfile attributes")})
 @SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
 public class ListS3 extends AbstractS3Processor {
@@ -150,8 +154,18 @@ public class ListS3 extends AbstractS3Processor {
 .defaultValue("false")
 .build();
 
+public static final PropertyDescriptor WRITE_USER_METADATA = new 
PropertyDescriptor.Builder()
+.name("write-s3-user-metadata")
+.displayName("Write User Metadata")
+.description("If set to 'True', the user defined metadata 
associated with the S3 object will be written as FlowFile attributes")
+.required(true)
+.allowableValues(new AllowableValue("true", "True"), new 
AllowableValue("false", "False"))
+.defaultValue("false")
+.build();
+
+
 public static final List properties = 
Collections.unmodifiableList(
-Arrays.asList(BU

[nifi] branch master updated: NIFI-6490 MergeRecord supports Variable Registry for MIN_RECORDS and MAX_RECORDS

2019-07-31 Thread ijokarumawak
This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
 new 1d22e8a  NIFI-6490 MergeRecord supports Variable Registry for 
MIN_RECORDS and MAX_RECORDS
1d22e8a is described below

commit 1d22e8a86d001540bb86d017a9139393778628cb
Author: Alessandro D'Armiento 
AuthorDate: Fri Jul 26 17:14:11 2019 +0200

NIFI-6490 MergeRecord supports Variable Registry for MIN_RECORDS and 
MAX_RECORDS

Unified unit tests

Added custom validation cases for MIN_RECORDS and MAX_RECORDS enforcing 
they are greater than zero.
While MIN_RECORDS > 0 can fail individually, MAX_RECORDS > 0 validator 
cannot fail without having also at least another validation step (either the 
MIN_RECORDS > 0 or the MAX_RECORDS > MIN_RECORDS) to fail, since MIN_RECORDS is 
a required property with default value 1

This closes #3607.

Signed-off-by: Koji Kawamura 
---
 .../nifi/processors/standard/MergeRecord.java  |  22 -
 .../standard/merge/RecordBinManager.java   |   4 +-
 .../nifi/processors/standard/TestMergeRecord.java  | 100 +
 3 files changed, 122 insertions(+), 4 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index 130d6b6..797359e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -179,6 +179,7 @@ public class MergeRecord extends 
AbstractSessionFactoryProcessor {
 .required(true)
 .defaultValue("1")
 .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
 .build();
 public static final PropertyDescriptor MAX_RECORDS = new 
PropertyDescriptor.Builder()
 .name("max-records")
@@ -188,6 +189,7 @@ public class MergeRecord extends 
AbstractSessionFactoryProcessor {
 .required(false)
 .defaultValue("1000")
 .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
 .build();
 public static final PropertyDescriptor MAX_BIN_COUNT = new 
PropertyDescriptor.Builder()
 .name("max.bin.count")
@@ -268,8 +270,8 @@ public class MergeRecord extends 
AbstractSessionFactoryProcessor {
 protected Collection customValidate(final 
ValidationContext validationContext) {
 final List results = new ArrayList<>();
 
-final Integer minRecords = 
validationContext.getProperty(MIN_RECORDS).asInteger();
-final Integer maxRecords = 
validationContext.getProperty(MAX_RECORDS).asInteger();
+final Integer minRecords = 
validationContext.getProperty(MIN_RECORDS).evaluateAttributeExpressions().asInteger();
+final Integer maxRecords = 
validationContext.getProperty(MAX_RECORDS).evaluateAttributeExpressions().asInteger();
 if (minRecords != null && maxRecords != null && maxRecords < 
minRecords) {
 results.add(new ValidationResult.Builder()
 .subject("Max Records")
@@ -278,6 +280,22 @@ public class MergeRecord extends 
AbstractSessionFactoryProcessor {
 .explanation(" property cannot be 
smaller than  property")
 .build());
 }
+if (minRecords != null && minRecords <= 0) {
+results.add(new ValidationResult.Builder()
+.subject("Min Records")
+.input(String.valueOf(minRecords))
+.valid(false)
+.explanation(" property cannot 
be negative or zero")
+.build());
+}
+if (maxRecords != null && maxRecords <= 0) {
+results.add(new ValidationResult.Builder()
+.subject("Max Records")
+.input(String.valueOf(maxRecords))
+.valid(false)
+.explanation(" property cannot 
be negative or zero")
+.build());
+}
 
 final Double minSize = 
validationContext.getProperty(MIN_SIZE).asDataSize(DataUnit.B);
 final Double maxSize = 
validationContext.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache