[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-19 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175411532
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
 ---
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@WritesAttributes({
+@WritesAttribute(attribute = "mime.type", description = 
"application/json"),
+@WritesAttribute(attribute = "aggregation.name", description = "The 
name of the aggregation whose results are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"})
+@CapabilityDescription("A processor that allows the user to run a query 
(with aggregations) written with the " +
+"ElasticSearch JSON DSL. It currently does not support 
pagination.")
+public class JsonQueryElasticsearch extends AbstractProcessor {
+public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+.description("All original flowfiles that don't cause an error 
to occur go to this relationship. " +
+"This applies even if you select the \"split up hits\" 
option to send individual hits to the " +
+"\"hits\" relationship.").build();
+
+public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("All FlowFiles that cannot be read from 
Elasticsearch are routed to this relationship").build();
+
+public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+.description("Search hits are routed to this relationship.")
+.build();
+
+public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+.description("Aggregations are routed to this relationship.")
+.build();
+
+

[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-19 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175411128
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 ---
@@ -61,22 +58,23 @@
 public class ElasticSearchClientServiceImpl extends 
AbstractControllerService implements ElasticSearchClientService {
 private ObjectMapper mapper = new ObjectMapper();
 
-private List properties;
+static final private List properties;
 
 private RestClient client;
 
 private String url;
 
-@Override
-protected void init(ControllerServiceInitializationContext config) {
-properties = new ArrayList<>();
-properties.add(ElasticSearchClientService.HTTP_HOSTS);
-properties.add(ElasticSearchClientService.USERNAME);
-properties.add(ElasticSearchClientService.PASSWORD);
-
properties.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
-properties.add(ElasticSearchClientService.CONNECT_TIMEOUT);
-properties.add(ElasticSearchClientService.SOCKET_TIMEOUT);
-properties.add(ElasticSearchClientService.RETRY_TIMEOUT);
+static {
+List _props = new ArrayList();
--- End diff --

Done.


---


[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-19 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175411050
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 ---
@@ -121,7 +119,7 @@ private void setupClient(ConfigurationContext context) 
throws Exception {
 
 RestClientBuilder builder = RestClient.builder(hh)
 .setHttpClientConfigCallback(httpClientBuilder -> {
-if (sslService != null) {
+if (sslService != null && 
sslService.isKeyStoreConfigured() && sslService.isTrustStoreConfigured()) {
 try {
--- End diff --

Done. I moved that logic out and have the it bubble it up with an 
InitializationException if any of those exceptions are thrown.


---


[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-19 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175407337
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class ElasticSearchClientServiceImpl extends 
AbstractControllerService implements ElasticSearchClientService {
+private ObjectMapper mapper = new ObjectMapper();
+
+static final private List properties;
+
+private RestClient client;
+
+private String url;
+
+static {
+List _props = new ArrayList();
+_props.add(ElasticSearchClientService.HTTP_HOSTS);
+_props.add(ElasticSearchClientService.USERNAME);
+_props.add(ElasticSearchClientService.PASSWORD);
+_props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
+_props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
+_props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
+_props.add(ElasticSearchClientService.RETRY_TIMEOUT);
+
+properties = Collections.unmodifiableList(_props);
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return properties;
+}
+
+@OnEnabled
+public void onEnabled(final ConfigurationContext context) throws 
InitializationException {
+try {
+setupClient(context);
+} catch (Exception ex) {
+getLogger().error("Could not initialize ElasticSearch 
client.", ex);
+throw new InitializationException(ex);
+}
+}
+
+@OnDisabled
+public void onDisabled() throws IOException {
+this.client.close();
+this.url = null;
+}
+
+private void setupClient(ConfigurationContext context) throws 
MalformedURLException {
+final String hosts = 
context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
+String[] hostsSplit = hosts.split(",[\\s]*");
+this.url = hostsSplit[0];
+final SSLContextService sslService =
+
context.getProperty(PROP_

[GitHub] nifi issue #2518: NIFI-4637 Added support for visibility labels to the HBase...

2018-03-19 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2518
  
Had to rebase because of the commit that added `ScanHBase`. That doesn't 
have visibility labels added to it yet. That needs to be done. Keeping this 
ticket open because there's plenty that can be reviewed in the mean time if 
anyone is interested.


---


[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.

2018-03-18 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2113
  
> I hope my review doesn't come off in the wrong way, there is a lot of 
great work here and I just want to make sure the usability is top notch.

Not at all. It's all fair and good feedback. I'll take a crack at these 
tomorrow.


---


[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...

2018-03-18 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2560#discussion_r175296639
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
 ---
@@ -196,6 +237,33 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 }
 }
 
+private void removeUpdateKeys(String updateKeyParam, Map doc) {
+String[] parts = updateKeyParam.split(",[\\s]*");
+for (String part : parts) {
+if (part.contains(".")) {
--- End diff --

For this input:

```
{
"name": "John Smith",
"department": "Engineering"
}
```

It makes no sense to remove `name` if we're doing a full document update 
using the `name key.

Now consider this complex document:

```
{
"name": "John Smith",
"department": "Engineering",
"contacts": {
 "email": "john.sm...@test.com"
 }
}
```

To search on `email`, we have to submit this payload with the lookup key 
being `contacts.email`:

```
{
"contacts.email": "john.sm...@test.com",
"name": "John Smith",
"department": "Engineering",
"contacts": {
 "email": "john.sm...@test.com"
 }
}
```

Mongo cannot do a lookup using this: `{ "contacts": { "email": 
"john.sm...@test.com" }}`

So if we don't remove the complex lookup key, we are leaving extraneous 
information in the document that almost certainly has no value to the user.

Now maybe we'll get an angry ticket complaining that they can't do periods 
in the key names, but I've never seen normal use cases where developers do 
that. The whole idea of creating complex key names for real data using periods 
and such flies in the face of how JSON is supposed to work.


---


[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...

2018-03-18 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2560#discussion_r175286408
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
 ---
@@ -137,6 +151,31 @@
 return propertyDescriptors;
 }
 
+@Override
+protected Collection customValidate(final 
ValidationContext validationContext) {
+List problems = new ArrayList<>();
+
+final boolean queryKey = 
validationContext.getProperty(UPDATE_QUERY_KEY).isSet()
+&& 
!StringUtils.isBlank(validationContext.getProperty(UPDATE_QUERY_KEY).getValue());
--- End diff --

I just remembered why I did this. When you call `removeProperty` to unset 
the property, the `getProperty` call here will return the default value. So 
maybe I need to remove the default value. What do you think? Is this a problem 
with the test helpers or my understanding of how properties should work?


---


[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...

2018-03-18 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2560#discussion_r175286257
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
 ---
@@ -96,6 +99,125 @@ public void testValidators() {
 Assert.assertEquals(0, results.size());
 }
 
+@Test
+public void testQueryAndUpdateKey() {
+runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
+runner.setProperty(PutMongo.UPDATE_QUERY, "{}");
+runner.assertNotValid();
+}
+
+@Test
+public void testNoQueryAndNoUpdateKey() {
+runner.removeProperty(PutMongo.UPDATE_QUERY);
+runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "");
+runner.assertNotValid();
+}
+
+@Test
+public void testBlankUpdateKey() {
+runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "  ");
+runner.assertNotValid();
+}
+
+@Test
+public void testUpdateQuery() {
+Document document = new Document()
+.append("name", "John Smith")
+.append("department", "Engineering");
+collection.insertOne(document);
+String updateBody = "{\n" +
+"\t\"$set\": {\n" +
+"\t\t\"email\": \"john.sm...@test.com\",\n" +
+"\t\t\"grade\": \"Sr. Principle Eng.\"\n" +
+"\t},\n" +
+"\t\"$inc\": {\n" +
+"\t\t\"writes\": 1\n" +
+"\t}\n" +
+"}";
+Map attr = new HashMap<>();
+attr.put("mongo.update.query", document.toJson());
+runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "");
+runner.setProperty(PutMongo.UPDATE_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
+runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
+runner.setProperty(PutMongo.UPDATE_QUERY, "${mongo.update.query}");
+runner.setValidateExpressionUsage(true);
+runner.enqueue(updateBody, attr);
+updateTests(document);
+}
+
+@Test
+public void testUpdateBySimpleKey() {
+Document document = new Document()
+.append("name", "John Smith")
+.append("department", "Engineering");
+collection.insertOne(document);
+String updateBody = "{\n" +
+"\t\"name\": \"John Smith\",\n" +
+"\t\"$set\": {\n" +
+"\t\t\"email\": \"john.sm...@test.com\",\n" +
+"\t\t\"grade\": \"Sr. Principle Eng.\"\n" +
+"\t},\n" +
+"\t\"$inc\": {\n" +
+"\t\t\"writes\": 1\n" +
+"\t}\n" +
+"}";
+runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name");
+runner.setProperty(PutMongo.UPDATE_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
+runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
+runner.setValidateExpressionUsage(true);
+runner.enqueue(updateBody);
+updateTests(document);
+}
+
+@Test
--- End diff --

Yeah. I'm adding 2 at the moment. One for update by keys and one for query.


---


[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...

2018-03-18 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2560#discussion_r175286235
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
 ---
@@ -86,19 +91,27 @@
 .name("Update Query Key")
 .description("Key name used to build the update query criteria; 
this property is valid only when using update mode, "
 + "otherwise it is ignored")
-.required(true)
-.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.required(false)
+.addValidator(Validator.VALID)
 .defaultValue("_id")
 .build();
+static final PropertyDescriptor UPDATE_QUERY = new 
PropertyDescriptor.Builder()
+.name("putmongo-update-query")
--- End diff --

Done.


---


[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...

2018-03-18 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2560#discussion_r175286234
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
 ---
@@ -169,14 +208,16 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 // update
 final boolean upsert = 
context.getProperty(UPSERT).asBoolean();
 final String updateKey = 
context.getProperty(UPDATE_QUERY_KEY).getValue();
+final String updateQuery = 
context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
+final Document query;
 
-Object keyVal = ((Map)doc).get(updateKey);
-if (updateKey.equals("_id") && 
ObjectId.isValid(((String)keyVal))) {
-keyVal = new ObjectId((String) keyVal);
+if (!StringUtils.isBlank(updateKey)) {
+query = parseUpdateKey(updateKey, (Map)doc);
+removeUpdateKeys(updateKey, (Map)doc);
+} else {
+query = Document.parse(updateQuery);
--- End diff --

Done.


---


[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...

2018-03-18 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2560#discussion_r175285785
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
 ---
@@ -86,19 +91,27 @@
 .name("Update Query Key")
 .description("Key name used to build the update query criteria; 
this property is valid only when using update mode, "
 + "otherwise it is ignored")
-.required(true)
-.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.required(false)
+.addValidator(Validator.VALID)
--- End diff --

Wasn't sure if that's the case.


---


[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.

2018-03-17 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2113
  
Ok. It should all be there now.


---


[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-17 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175271885
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@WritesAttributes({
+@WritesAttribute(attribute = "mime.type", description = 
"application/json"),
+@WritesAttribute(attribute = "aggregation.name", description = "The 
name of the aggregation whose results are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"})
+@CapabilityDescription("A processor that allows the user to run a query 
(with aggregations) written with the " +
+"ElasticSearch JSON DSL. It currently does not support 
pagination.")
--- End diff --

Ok. I should also note that scroll queries are not supported.


---


[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-17 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175271865
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class ElasticSearchClientServiceImpl extends 
AbstractControllerService implements ElasticSearchClientService {
+private ObjectMapper mapper = new ObjectMapper();
+
+private List properties;
+
+private RestClient client;
+
+private String url;
+
+@Override
+protected void init(ControllerServiceInitializationContext config) {
+properties = new ArrayList<>();
+properties.add(ElasticSearchClientService.HTTP_HOSTS);
+properties.add(ElasticSearchClientService.USERNAME);
+properties.add(ElasticSearchClientService.PASSWORD);
+
properties.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
+properties.add(ElasticSearchClientService.CONNECT_TIMEOUT);
+properties.add(ElasticSearchClientService.SOCKET_TIMEOUT);
+properties.add(ElasticSearchClientService.RETRY_TIMEOUT);
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return properties;
+}
+
+@OnEnabled
+public void onEnabled(final ConfigurationContext context) throws 
InitializationException {
+try {
+setupClient(context);
+} catch (Exception ex) {
+getLogger().error("Could not initialize ElasticSearch 
client.", ex);
+throw new InitializationException(ex);
+}
+}
+
+@OnDisabled
+public void onDisabled() throws IOException {
+this.client.close();
+this.url = null;
+}
+
+private void setupClient(ConfigurationContext context) throws 
Exception {
+final String hosts = 
context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
+Stri

[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-17 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175271759
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class ElasticSearchClientServiceImpl extends 
AbstractControllerService implements ElasticSearchClientService {
+private ObjectMapper mapper = new ObjectMapper();
+
+private List properties;
+
+private RestClient client;
+
+private String url;
+
+@Override
+protected void init(ControllerServiceInitializationContext config) {
+properties = new ArrayList<>();
+properties.add(ElasticSearchClientService.HTTP_HOSTS);
+properties.add(ElasticSearchClientService.USERNAME);
+properties.add(ElasticSearchClientService.PASSWORD);
+
properties.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
+properties.add(ElasticSearchClientService.CONNECT_TIMEOUT);
+properties.add(ElasticSearchClientService.SOCKET_TIMEOUT);
+properties.add(ElasticSearchClientService.RETRY_TIMEOUT);
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return properties;
+}
+
+@OnEnabled
+public void onEnabled(final ConfigurationContext context) throws 
InitializationException {
+try {
+setupClient(context);
+} catch (Exception ex) {
+getLogger().error("Could not initialize ElasticSearch 
client.", ex);
+throw new InitializationException(ex);
+}
+}
+
+@OnDisabled
+public void onDisabled() throws IOException {
+this.client.close();
+this.url = null;
+}
+
+private void setupClient(ConfigurationContext context) throws 
Exception {
+final String hosts = 
context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
+Stri

[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-17 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175271747
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class ElasticSearchClientServiceImpl extends 
AbstractControllerService implements ElasticSearchClientService {
+private ObjectMapper mapper = new ObjectMapper();
+
+private List properties;
+
+private RestClient client;
+
+private String url;
+
+@Override
+protected void init(ControllerServiceInitializationContext config) {
+properties = new ArrayList<>();
+properties.add(ElasticSearchClientService.HTTP_HOSTS);
+properties.add(ElasticSearchClientService.USERNAME);
+properties.add(ElasticSearchClientService.PASSWORD);
+
properties.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
+properties.add(ElasticSearchClientService.CONNECT_TIMEOUT);
+properties.add(ElasticSearchClientService.SOCKET_TIMEOUT);
+properties.add(ElasticSearchClientService.RETRY_TIMEOUT);
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return properties;
+}
+
+@OnEnabled
+public void onEnabled(final ConfigurationContext context) throws 
InitializationException {
+try {
+setupClient(context);
+} catch (Exception ex) {
+getLogger().error("Could not initialize ElasticSearch 
client.", ex);
+throw new InitializationException(ex);
+}
+}
+
+@OnDisabled
+public void onDisabled() throws IOException {
+this.client.close();
+this.url = null;
+}
+
+private void setupClient(ConfigurationContext context) throws 
Exception {
--- End diff --

Done.


---


[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-17 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175271738
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class ElasticSearchClientServiceImpl extends 
AbstractControllerService implements ElasticSearchClientService {
+private ObjectMapper mapper = new ObjectMapper();
+
+private List properties;
--- End diff --

Done.


---


[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-17 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175271547
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@WritesAttributes({
+@WritesAttribute(attribute = "mime.type", description = 
"application/json"),
+@WritesAttribute(attribute = "aggregation.name", description = "The 
name of the aggregation whose results are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"})
+@CapabilityDescription("A processor that allows the user to run a query 
(with aggregations) written with the " +
+"ElasticSearch JSON DSL. It currently does not support 
pagination.")
+public class JsonQueryElasticsearch extends AbstractProcessor {
+public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+.description("All original flowfiles that don't cause an error 
to occur go to this relationship. " +
+"This applies even if you select the \"split up hits\" 
option to send individual hits to the " +
+"\"hits\" relationship.").build();
+
+public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("All FlowFiles that cannot be read from 
Elasticsearch are routed to this relationship").build();
+
+public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+.description("Search hits are routed to this relationship.")
+.build();
+
+public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+.description("Aggregations are routed to this relationship.")
+.build();
+
+

[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-17 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175271557
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@WritesAttributes({
+@WritesAttribute(attribute = "mime.type", description = 
"application/json"),
+@WritesAttribute(attribute = "aggregation.name", description = "The 
name of the aggregation whose results are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"})
+@CapabilityDescription("A processor that allows the user to run a query 
(with aggregations) written with the " +
+"ElasticSearch JSON DSL. It currently does not support 
pagination.")
+public class JsonQueryElasticsearch extends AbstractProcessor {
+public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+.description("All original flowfiles that don't cause an error 
to occur go to this relationship. " +
+"This applies even if you select the \"split up hits\" 
option to send individual hits to the " +
+"\"hits\" relationship.").build();
+
+public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("All FlowFiles that cannot be read from 
Elasticsearch are routed to this relationship").build();
+
+public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+.description("Search hits are routed to this relationship.")
+.build();
+
+public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+.description("Aggregations are routed to this relationship.")
+.build();
+
+

[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-17 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175271522
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@WritesAttributes({
+@WritesAttribute(attribute = "mime.type", description = 
"application/json"),
+@WritesAttribute(attribute = "aggregation.name", description = "The 
name of the aggregation whose results are in the output flowfile")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"})
+@CapabilityDescription("A processor that allows the user to run a query 
(with aggregations) written with the " +
+"ElasticSearch JSON DSL. It currently does not support 
pagination.")
+public class JsonQueryElasticsearch extends AbstractProcessor {
+public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
+.description("All original flowfiles that don't cause an error 
to occur go to this relationship. " +
+"This applies even if you select the \"split up hits\" 
option to send individual hits to the " +
+"\"hits\" relationship.").build();
+
+public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("All FlowFiles that cannot be read from 
Elasticsearch are routed to this relationship").build();
+
+public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
+.description("Search hits are routed to this relationship.")
+.build();
+
+public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
+.description("Aggregations are routed to this relationship.")
+.build();
+
+

[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-17 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r175271497
  
--- Diff: nifi-nar-bundles/nifi-standard-services/pom.xml ---
@@ -15,6 +15,18 @@
 -->
 http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
 4.0.0
+
--- End diff --

Done.


---


[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.

2018-03-17 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2113
  
@JPercivall Thanks for the feedback. I'll get to working on these. WRT:

> Lastly, it's preferred if you don't squash your commits every time. If 
you don't, that allows the reviewer to more easily see exactly what changed 
since they last reviewed it. Also allows reviewers to see how the PR evolved 
over time in response to different comments.

Ok, I can do that from now on. Shouldn't they be squashed before a merge 
into master?


---


[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.

2018-03-17 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2113
  
@JPercivall Done.


---


[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...

2018-03-16 Thread MikeThomsen
GitHub user MikeThomsen opened a pull request:

https://github.com/apache/nifi/pull/2560

NIFI-4989 Made PutMongo able to use nested lookup keys, a query param…

… and multiple lookup keys.

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MikeThomsen/nifi NIFI-4989

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2560.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2560


commit 380e342643885b8919599a1e8d329dfbb89600a6
Author: Mike Thomsen 
Date:   2018-03-17T01:51:19Z

NIFI-4989 Made PutMongo able to use nested lookup keys, a query param and 
multiple lookup keys.




---


[GitHub] nifi issue #2517: NIFI-4516 FetchSolr Processor

2018-03-16 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2517
  
Sorry, haven't had time. There's a merge conflict now. Can you fix that?


---


[GitHub] nifi issue #2540: Nifi 4914

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2540
  
Saw you opened a new version of this. Can you close this one?


---


[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2113
  
@JPercivall It's ready for review.


---


[GitHub] nifi issue #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2553
  
@david-streamlio thanks for the link.

> Is there another car bundle that uses a Docker image for integration 
testing that I can use as an example?

The Mongo package, sorta. We wrote all of the tests to assume Mongo 
defaults, so if you set up a simple Docker install of Mongo, all of the 
defaults will just click between the image and the int tests.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174883410
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174881995
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174874403
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class ResourcePoolImpl implements 
ResourcePool {
+
+private final Lock lock = new ReentrantLock();
+private final Condition poolAvailable = lock.newCondition();
+private int max_resources;
+private final Vector pool;
+
+private final ResourceExceptionHandler resourceExceptionHandler;
+private final ResourceFactory factory;
+
+public ResourcePoolImpl(ResourceFactory factory, int max_resources) 
{
+this(factory, new ResourceExceptionHandlerImpl(), 
max_resources);
+}
+
+public ResourcePoolImpl(ResourceFactory factory, 
ResourceExceptionHandler handler, int max_resources) {
+lock.lock();
+try {
+this.factory = factory;
+this.resourceExceptionHandler = handler;
+this.max_resources = max_resources;
+this.pool = new Vector(max_resources);
+} finally {
+lock.unlock();
+}
+}
+
+private R createResource(Properties props) {
+R resource = null;
+try {
+
+resource = factory.create(props);
+
+if (resource == null)
--- End diff --

Curly brackets.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174881541
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name

[GitHub] nifi issue #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2553
  
You should strongly consider setting up some integration tests that can be 
run against a simple Docker image. All you have to do with NiFi to get that 
done is add a few classes with "IT" at the end of their name and you can run 
them with `mvn integration-test -Pintegration-tests`.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174881132
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174873036
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of thr

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174880893
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174872527
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of thr

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174878320
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStandardPulsarClientService {
+
+@Before
+public void init() {
--- End diff --

Looks like you can delete this.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174882988
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174884957
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarProducer;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import 
org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Pulsar using the Pulsar 1.21 Producer API."
++ "The messages to send may be individual FlowFiles or may be 
delimited, using a "
++ "user-specified delimiter, such as a new-line. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
+
+protected static final String MSG_COUNT = "msg.count";
+
+static final AllowableValue COMPRESSION_TYPE_NONE = new 
AllowableValue("NONE", "None", "No compression");
+static final AllowableValue COMPRESSION_TYPE_LZ4 = new 
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+static final AllowableValue COMPRESSION_TYPE_ZLIB = new 
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = 
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a 
custom partition");
+static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION 
= new AllowableVal

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174883778
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarProducer;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import 
org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Pulsar using the Pulsar 1.21 Producer API."
++ "The messages to send may be individual FlowFiles or may be 
delimited, using a "
++ "user-specified delimiter, such as a new-line. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
--- End diff --

Consider changing it to `PublishPulsar_1_X`.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174884531
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarProducer;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import 
org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Pulsar using the Pulsar 1.21 Producer API."
++ "The messages to send may be individual FlowFiles or may be 
delimited, using a "
++ "user-specified delimiter, such as a new-line. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
+
+protected static final String MSG_COUNT = "msg.count";
+
+static final AllowableValue COMPRESSION_TYPE_NONE = new 
AllowableValue("NONE", "None", "No compression");
+static final AllowableValue COMPRESSION_TYPE_LZ4 = new 
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+static final AllowableValue COMPRESSION_TYPE_ZLIB = new 
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = 
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a 
custom partition");
+static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION 
= new AllowableVal

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174879831
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
--- End diff --

Also note that as it is an incubator project and someone really doesn't 
want to track 1.X going forward AND complains that you broke compatibility for 
them by staying up to date with 1.X, that's on them. Incubator projects are by 
definition moving targets and should be handled that way during risk assessment 
by teams using them.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174872630
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of thr

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174874448
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class ResourcePoolImpl implements 
ResourcePool {
+
+private final Lock lock = new ReentrantLock();
+private final Condition poolAvailable = lock.newCondition();
+private int max_resources;
+private final Vector pool;
+
+private final ResourceExceptionHandler resourceExceptionHandler;
+private final ResourceFactory factory;
+
+public ResourcePoolImpl(ResourceFactory factory, int max_resources) 
{
+this(factory, new ResourceExceptionHandlerImpl(), 
max_resources);
+}
+
+public ResourcePoolImpl(ResourceFactory factory, 
ResourceExceptionHandler handler, int max_resources) {
+lock.lock();
+try {
+this.factory = factory;
+this.resourceExceptionHandler = handler;
+this.max_resources = max_resources;
+this.pool = new Vector(max_resources);
+} finally {
+lock.unlock();
+}
+}
+
+private R createResource(Properties props) {
+R resource = null;
+try {
+
+resource = factory.create(props);
--- End diff --

There's some extraneous white space around this.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174879478
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
--- End diff --

You should consider changing this to `ConsumePulsar_1_X` to warn users that 
you may be moving the internal client compatibility forward if let's say 1.4 
breaks compatibility with the current 1.2 branch in the incubator.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174878122
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class ResourcePoolImpl implements 
ResourcePool {
+
+private final Lock lock = new ReentrantLock();
+private final Condition poolAvailable = lock.newCondition();
+private int max_resources;
+private final Vector pool;
+
+private final ResourceExceptionHandler resourceExceptionHandler;
+private final ResourceFactory factory;
+
+public ResourcePoolImpl(ResourceFactory factory, int max_resources) 
{
+this(factory, new ResourceExceptionHandlerImpl(), 
max_resources);
+}
+
+public ResourcePoolImpl(ResourceFactory factory, 
ResourceExceptionHandler handler, int max_resources) {
+lock.lock();
+try {
+this.factory = factory;
+this.resourceExceptionHandler = handler;
+this.max_resources = max_resources;
+this.pool = new Vector(max_resources);
+} finally {
+lock.unlock();
+}
+}
+
+private R createResource(Properties props) {
+R resource = null;
+try {
+
+resource = factory.create(props);
+
+if (resource == null)
+throw new ResourceCreationException("Unable to create 
resource");
+
+} catch (Exception e) {
+resourceExceptionHandler.handle(e);
+}
+return resource;
+}
+
+
+/*
+ * Shutdown the pool and release the resources
+ */
+public void close() {
+
+Iterator itr = pool.iterator();
+while (itr.hasNext()) {
+itr.next().close();
+}
+
+}
+
+public boolean isEmpty() {
+return (pool.isEmpty());
+}
+
+public boolean isFull() {
+return (pool != null && pool.size() == max_resources);
+}
+
+@Override
+public R acquire(Properties props) throws InterruptedException {
+lock.lock();
+try {
+while (max_resources <= 0) {
+poolAvailable.await();
+}
+
+--max_resources;
+
+if (pool != null) {
+int size = pool.size();
+if (size > 0)
+return pool.remove(size - 1);
+}
+return createResource(props);
+} finally {
+lock.unlock();
+}
+}
+
+@Override
+public void evict(R resource) {
+lock.lock();
+try {
+
+// Attempt to close the connection
+if (!resource.isClosed())
--- End diff --

Curly brackets.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174879028
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml ---
@@ -0,0 +1,78 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.6.0-SNAPSHOT
+
+
+nifi-pulsar-processors
+jar
+
+
+
+org.apache.nifi
+nifi-api
+
+
+org.apache.nifi
+nifi-record-serialization-service-api
+
+
+org.apache.nifi
+nifi-record
+
+
+org.apache.nifi
+nifi-utils
+1.6.0-SNAPSHOT
+
+ 
+org.apache.nifi
+nifi-ssl-context-service-api
+
+
+org.apache.nifi
+nifi-pulsar-client-service-api
+1.6.0-SNAPSHOT
+provided
+   
+
+   org.apache.pulsar
--- End diff --

Broken indentation level.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174872872
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of thr

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174884100
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarProducer;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import 
org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Pulsar using the Pulsar 1.21 Producer API."
++ "The messages to send may be individual FlowFiles or may be 
delimited, using a "
++ "user-specified delimiter, such as a new-line. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
+
+protected static final String MSG_COUNT = "msg.count";
+
+static final AllowableValue COMPRESSION_TYPE_NONE = new 
AllowableValue("NONE", "None", "No compression");
+static final AllowableValue COMPRESSION_TYPE_LZ4 = new 
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+static final AllowableValue COMPRESSION_TYPE_ZLIB = new 
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = 
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a 
custom partition");
+static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION 
= new AllowableVal

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174866298
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java
 ---
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+public interface PoolableResource {
+
+public void close();
--- End diff --

Javadoc.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174882429
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174882522
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174884863
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarProducer;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import 
org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Pulsar using the Pulsar 1.21 Producer API."
++ "The messages to send may be individual FlowFiles or may be 
delimited, using a "
++ "user-specified delimiter, such as a new-line. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
+
+protected static final String MSG_COUNT = "msg.count";
+
+static final AllowableValue COMPRESSION_TYPE_NONE = new 
AllowableValue("NONE", "None", "No compression");
+static final AllowableValue COMPRESSION_TYPE_LZ4 = new 
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+static final AllowableValue COMPRESSION_TYPE_ZLIB = new 
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = 
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a 
custom partition");
+static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION 
= new AllowableVal

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174874129
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of thr

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174878824
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStandardPulsarClientService {
+
+@Before
+public void init() {
+
+}
+
+@Test
+public void testService() throws InitializationException {
+final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+final PulsarClientPool service = new StandardPulsarClientPool();
+runner.addControllerService("test-good", service);
+
+runner.setProperty(service, 
StandardPulsarClientPool.PULSAR_SERVICE_URL, "localhost:6667");
+// runner.enableControllerService(service);
--- End diff --

I think you might actually needs this. If that's not the case, it should be 
removed.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174868443
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceFactory.java
 ---
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+import java.util.Properties;
+
+public interface ResourceFactory {
+
+public R create(Properties props) throws ResourceCreationException;
--- End diff --

Javadoc


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174865149
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/pom.xml ---
@@ -0,0 +1,40 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.6.0-SNAPSHOT
+
+
+nifi-pulsar-client-service-api
+jar
+
+
+
+org.apache.nifi
+nifi-api
+provided
+
+
+   org.apache.pulsar
--- End diff --

Nit: indent level is broken here.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174866349
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java
 ---
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+public interface PoolableResource {
+
+public void close();
+
+public boolean isClosed();
--- End diff --

Javadoc


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174866791
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceExceptionHandler.java
 ---
@@ -0,0 +1,23 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+public interface ResourceExceptionHandler {
+
+void handle(Exception exc);
--- End diff --

Javadoc


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174865317
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+
+
+@Tags({"Pulsar"})
+@CapabilityDescription("Provides the ability to create Pulsar Producer / 
Consumer instances on demand, based on the configuration."
+ + "properties defined")
+public interface PulsarClientPool extends ControllerService {
+
+public ResourcePool getProducerPool();
--- End diff --

There should be a basic javadoc here.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174865373
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+
+
+@Tags({"Pulsar"})
+@CapabilityDescription("Provides the ability to create Pulsar Producer / 
Consumer instances on demand, based on the configuration."
+ + "properties defined")
+public interface PulsarClientPool extends ControllerService {
+
+public ResourcePool getProducerPool();
+
+public ResourcePool getConsumerPool();
--- End diff --

Same here.


---


[GitHub] nifi issue #2501: NIFI-4743 Added configurable null suppression to PutElasti...

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2501
  
@pvillard31 Can you close this?


---


[GitHub] nifi issue #2494: NIFI-4912: Update jackson version to latest stable version...

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2494
  
@derekstraka You have a merge conflict now.


---


[GitHub] nifi pull request #2549: NIFI-4979: Fix ReportLineageToAtlas documentation e...

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2549#discussion_r174761784
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
 ---
@@ -91,10 +91,12 @@
 import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
 
 @Tags({"atlas", "lineage"})
-@CapabilityDescription("Publishes NiFi flow data set level lineage to 
Apache Atlas." +
-" By reporting flow information to Atlas, an end-to-end Process 
and DataSet lineage such as across NiFi environments and other systems" +
-" connected by technologies, for example NiFi Site-to-Site, Kafka 
topic or Hive tables." +
-" There are limitations and required configurations for both NiFi 
and Atlas. See 'Additional Details' for further description.")
+@CapabilityDescription("Report NiFi flow data set level lineage to Apache 
Atlas." +
+" End-to-end lineages across NiFi environments and other systems 
can be reported those are" +
--- End diff --

> can be reported those are

Did you mean something like *can be reported **if** those are*


---


[GitHub] nifi issue #2548: NIFI-4978: Fixed ReportLineageToAtlas NPE when unscheduled

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2548
  
+1 LGTM


---


[GitHub] nifi issue #2493: Added Pulsar processors and Controller Service

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2493
  
Pulsar looks interesting, so if you can do the rebase I'll try to help out 
with the review.


---


[GitHub] nifi issue #2493: Added Pulsar processors and Controller Service

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2493
  
@david-streamlio Looks like you pulled in about 30 some commits from other 
folks into this. Can you rebase the branch so that only your commits are part 
of the PR?


---


[GitHub] nifi issue #2541: Exception in thread "main" java.lang.NoClassDefFoundError:...

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2541
  
-1 No commits from the submitter. If you need help, the NiFi users mailing 
list is the appropriate place. Not whatever this is.


---


[GitHub] nifi pull request #2546: NIFI-4975 Add GridFS processors

2018-03-14 Thread MikeThomsen
GitHub user MikeThomsen opened a pull request:

https://github.com/apache/nifi/pull/2546

NIFI-4975 Add GridFS processors

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MikeThomsen/nifi NIFI-4975

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2546.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2546


commit caef8ce496ab861e8e32224aa37fda32d43f45d2
Author: Mike Thomsen 
Date:   2018-03-10T17:57:28Z

NIFI-4975 Add GridFS processors




---


[GitHub] nifi issue #2501: NIFI-4743 Added configurable null suppression to PutElasti...

2018-03-14 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2501
  
@pvillard31 @robertrbruno I checked in a new commit based on the feedback. 
Please take a look when you have a min.


---


[GitHub] nifi pull request #2501: NIFI-4743 Added configurable null suppression to Pu...

2018-03-14 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2501#discussion_r174483646
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
 ---
@@ -284,6 +306,17 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 return;
 }
 
+final NullSuppression suppression;
--- End diff --

Done.


---


[GitHub] nifi pull request #2501: NIFI-4743 Added configurable null suppression to Pu...

2018-03-14 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2501#discussion_r174483710
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/NullSuppression.java
 ---
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+public enum NullSuppression {
+ALWAYS_SUPPRESS,
+NEVER_SUPPRESS,
+SUPPRESS_MISSING
--- End diff --

Done.


---


[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.

2018-03-13 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2113
  
2/3 builds pass now.


---


[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.

2018-03-13 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2113
  
@JPercivall @mattyb149 Once I changed the code to use `transfer(FlowFile, 
Relationship)` and not `transfer(List, Relationship)` the error stopped 
happening.


---


[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.

2018-03-13 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2113
  
@JPercivall @mattyb149 That exception only happens for me when I stop the 
processor. It happens here:

``
if (hitsFlowFiles.size() > 0) {
session.transfer(hitsFlowFiles, REL_HITS);
for (FlowFile ff : hitsFlowFiles) {
session.getProvenanceReporter().send(ff, 
clientService.getTransitUrl(index, type));
}
}
``

On the provenance reporting line. Any ideas?


---


[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...

2018-03-13 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2113#discussion_r174078307
  
--- Diff: nifi-assembly/pom.xml ---
@@ -445,6 +445,24 @@ language governing permissions and limitations under 
the License. -->
 1.6.0-SNAPSHOT
 nar
 
+
+org.apache.nifi
+
nifi-elasticsearch-client-service-api-nar
--- End diff --

Weird, I thought that I took that out.


---


[GitHub] nifi issue #2150: NIFI-3402: Added etag support to InvokeHTTP

2018-03-12 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2150
  
@pvillard31 @m-hogue Think we can close the loop on this one?


---


[GitHub] nifi issue #2501: NIFI-4743 Added configurable null suppression to PutElasti...

2018-03-12 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2501
  
@pvillard31 If you have a min this is a small one that'd be good to have in 
1.6. @robertrbruno wrote 95% of it, but I took his patch from Jira and made it 
into a PR. We've both kicked the tires, and it seems solid.


---


[GitHub] nifi issue #2530: NIFI-4800 Expose the flattenMode as property in FlattenJSO...

2018-03-12 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2530
  
+1 LGTM.


---


[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.

2018-03-12 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2113
  
Rebased and building. @mattyb149 I changed the functionality to match the 
GetMongo functionality you reviewed recently. Had to spend a while getting the 
build to work again.


---


[GitHub] nifi issue #2448: NIFI-4838 Added configurable progressive commits to GetMon...

2018-03-11 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2448
  
@pvillard31 I found that a catch block was suppressing a critical problem 
with how I was handling the input flowfile. When I'd call `session.commit()` 
after a block of results were written to a flowfile, it would throw an 
exception because the input flowfile had not been transferred yet.

So what I am going to do is redo this to make it configurable again. With 
the default behavior being only one single commit. Then users can choose a 
"streaming mode" where at the the first sign of success it transfers the input 
flowfile to REL_ORIGINAL. Then if an error happens, it would write a new 
flowfile with the same attributes and content as the original and send that 
cloned copy to the failure relationship. It may not be perfect, but I don't 
think NiFi has a way to do a partial commit which is what I'd need to make it 
work as-is.


---


[GitHub] nifi issue #2448: NIFI-4838 Added configurable progressive commits to GetMon...

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2448
  
@pvillard31 Done.


---


[GitHub] nifi issue #2448: NIFI-4838 Added configurable progressive commits to GetMon...

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2448
  
Will do. I should have it rebased tonight unless something comes up.


---


[GitHub] nifi issue #2527: FetchHBaseRow - log level and displayName

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2527
  
@bdesert When I read the commit, I wasn't sure what regression it was 
actually fixing.


---


[GitHub] nifi issue #2527: FetchHBaseRow - log level and displayName

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2527
  
What sort of regression was this supposed to address?


---


[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2517#discussion_r173582090
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.solr;
+
+import com.google.gson.stream.JsonWriter;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.FacetField;
+import org.apache.solr.client.solrj.response.FieldStatsInfo;
+import org.apache.solr.client.solrj.response.IntervalFacet;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.RangeFacet;
+import org.apache.solr.client.solrj.response.RangeFacet.Count;
+import org.apache.solr.common.params.FacetParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
+import org.apache.solr.common.params.StatsParams;
+import org.apache.solr.servlet.SolrRequestParsers;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
+import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
+import static 
org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
+import static 
org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
+import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER;
+
+@Tags({"Apache", &q

[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2517#discussion_r173581872
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.solr;
+
+import com.google.gson.stream.JsonWriter;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.FacetField;
+import org.apache.solr.client.solrj.response.FieldStatsInfo;
+import org.apache.solr.client.solrj.response.IntervalFacet;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.RangeFacet;
+import org.apache.solr.client.solrj.response.RangeFacet.Count;
+import org.apache.solr.common.params.FacetParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
+import org.apache.solr.common.params.StatsParams;
+import org.apache.solr.servlet.SolrRequestParsers;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
+import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
+import static 
org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
+import static 
org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
+import static 
org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
+import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER;
+
+@Tags({"Apache", &q

[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2517#discussion_r173581678
  
--- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
 ---
@@ -66,6 +67,15 @@
 public static final AllowableValue SOLR_TYPE_STANDARD = new 
AllowableValue(
 "Standard", "Standard", "A stand-alone Solr instance.");
 
+public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor
--- End diff --

Ok. I might be able to help with some of that.


---


[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2518#discussion_r173485840
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java
 ---
@@ -103,6 +103,17 @@
 .defaultValue("UTF-8")
 .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
 .build();
+static final PropertyDescriptor VISIBLITY_LABEL = new 
PropertyDescriptor.Builder()
+.name("delete-visibility-label")
+.displayName("Visibility Label")
+.description("If visibility labels are enabled, a row cannot 
be deleted without supplying its visibility label(s) in the delete " +
+"request. Note: this visibility label will be applied 
to all cells within the row that is specified. If some cells have " +
+"different visibility labels, they will not be 
deleted. When that happens, the failure to delete will be considered a success 
" +
+"because HBase does not report it as a failure.")
+.required(false)
--- End diff --

I'm not sure about that, but you have a point on being proactive. 


---


[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2518#discussion_r173484771
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 ---
@@ -349,18 +399,54 @@ public boolean checkAndPut(final String tableName, 
final byte[] rowId, final byt
 
 @Override
 public void delete(final String tableName, final byte[] rowId) throws 
IOException {
+delete(tableName, rowId, null);
+}
+
+@Override
+public void delete(String tableName, byte[] rowId, String 
visibilityLabel) throws IOException {
 try (final Table table = 
connection.getTable(TableName.valueOf(tableName))) {
 Delete delete = new Delete(rowId);
+if (visibilityLabel != null && 
!visibilityLabel.trim().equals("")) {
+delete.setCellVisibility(new 
CellVisibility(visibilityLabel));
+}
 table.delete(delete);
 }
 }
 
 @Override
 public void delete(String tableName, List rowIds) throws 
IOException {
+delete(tableName, rowIds);
+}
+
+@Override
+public void deleteCells(String tableName, List deletes) 
throws IOException {
+List deleteRequests = new ArrayList<>();
+for (int index = 0; index < deletes.size(); index++) {
+DeleteRequest req = deletes.get(index);
+Delete delete = new Delete(req.getRowId())
+.addColumn(req.getColumnFamily(), 
req.getColumnQualifier());
+if (req.getVisibilityLabel() != null && 
!req.getVisibilityLabel().trim().equals("")) {
+delete.setCellVisibility(new 
CellVisibility(req.getVisibilityLabel()));
+}
+deleteRequests.add(delete);
+}
+batchDelete(tableName, deleteRequests);
+}
+
+@Override
+public void delete(String tableName, List rowIds, String 
visibilityLabel) throws IOException {
 List deletes = new ArrayList<>();
 for (int index = 0; index < rowIds.size(); index++) {
-deletes.add(new Delete(rowIds.get(index)));
+Delete delete = new Delete(rowIds.get(index));
+if (visibilityLabel != null && 
!visibilityLabel.trim().equals("")) {
--- End diff --

Done.


---


[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2518#discussion_r173484741
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 ---
@@ -349,18 +399,54 @@ public boolean checkAndPut(final String tableName, 
final byte[] rowId, final byt
 
 @Override
 public void delete(final String tableName, final byte[] rowId) throws 
IOException {
+delete(tableName, rowId, null);
+}
+
+@Override
+public void delete(String tableName, byte[] rowId, String 
visibilityLabel) throws IOException {
 try (final Table table = 
connection.getTable(TableName.valueOf(tableName))) {
 Delete delete = new Delete(rowId);
+if (visibilityLabel != null && 
!visibilityLabel.trim().equals("")) {
+delete.setCellVisibility(new 
CellVisibility(visibilityLabel));
+}
 table.delete(delete);
 }
 }
 
 @Override
 public void delete(String tableName, List rowIds) throws 
IOException {
+delete(tableName, rowIds);
+}
+
+@Override
+public void deleteCells(String tableName, List deletes) 
throws IOException {
+List deleteRequests = new ArrayList<>();
+for (int index = 0; index < deletes.size(); index++) {
+DeleteRequest req = deletes.get(index);
+Delete delete = new Delete(req.getRowId())
+.addColumn(req.getColumnFamily(), 
req.getColumnQualifier());
+if (req.getVisibilityLabel() != null && 
!req.getVisibilityLabel().trim().equals("")) {
--- End diff --

Done.


---


[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2518#discussion_r173483953
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
 ---
@@ -126,6 +138,25 @@
 
 void delete(String tableName, List rowIds) throws IOException;
 
+/**
+ * Deletes a list of cells from HBase. This is intended to be used 
with granual delete operations.
--- End diff --

I think that's a reasonable word choice because deleting whole row is not 
necessarily possible all at once with visibility labels enabled. Each cell that 
has a label must have its visibility label sent as part of the delete request. 
So you have to build a nuanced delete request.


---


[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2518#discussion_r173483514
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
 ---
@@ -385,7 +427,18 @@ protected PutFlowFile createPut(ProcessContext 
context, Record record, RecordSch
 
 
 if (fieldValueBytes != null) {
-columns.add(new PutColumn(fam, 
clientService.toBytes(name), fieldValueBytes, timestamp));
+PutColumn column;
+
+String visString = (visField != null && visSettings != 
null && visSettings.containsKey(name))
+? (String)visSettings.get(name) : 
defaultVisibility;
+
+if (visString != null && !visString.equals("") ) {
--- End diff --

Done.


---


[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2518#discussion_r173482943
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
 ---
@@ -150,6 +194,40 @@
  */
 void scan(String tableName, byte[] startRow, byte[] endRow, 
Collection columns, ResultHandler handler) throws IOException;
 
+/**
+ * Scans the given table for the given rowId and passes the result to 
the handler.
+ *
+ * @param tableName the name of an HBase table to scan
+ * @param startRow the row identifier to start scanning at
+ * @param endRow the row identifier to end scanning at
+ * @param columns optional columns to return, if not specified all 
columns are returned
+ * @param visibilityLabels optional list of visibility labels that the 
user should be able to see when communicating with HBase
+ * @param handler a handler to process rows of the result
+ * @throws IOException thrown when there are communication errors with 
HBase
+ */
+void scan(String tableName, byte[] startRow, byte[] endRow, 
Collection columns, List visibilityLabels, ResultHandler 
handler) throws IOException;
+
+/**
+ * Get all of the labels in HBase.
+ *
+ * @return a List of all of the labels.
+ */
+List getLabels();
+
+/**
+ * Get all of the labels a given user can see.
+ * @param user the user to lookup.
+ * @return a List of all of the labels a user is allowed to see.
+ */
+List getLabelsForUser(String user);
+
+/**
+ * Get all of the labels the current user (NiFi process user or 
Kerberos keytab principle) can see.
+ *
+ * @return a List of all of the labels the current can see.
--- End diff --

I have to give this some more thought, but HBase visibility labels work 
with or without a Kerberos configuration. So without it, the HBase client does 
simple auth and sends over the user running NiFi as the user.


---


[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2518#discussion_r173482568
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
 ---
@@ -194,6 +220,12 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 final String fieldEncodingStrategy = 
context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
 final String complexFieldStrategy = 
context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
 final String rowEncodingStrategy = 
context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
+final String recordPathText = 
context.getProperty(VISIBILITY_RECORD_PATH).getValue();
+
+RecordPath recordPath = null;
+if (recordPathCache != null && recordPathText != null && 
!recordPathText.equals("")) {
--- End diff --

Done.


---


[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2518#discussion_r173482333
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
 ---
@@ -141,6 +149,16 @@
 .allowableValues(NULL_FIELD_EMPTY, NULL_FIELD_SKIP)
 .build();
 
+protected static final PropertyDescriptor VISIBILITY_RECORD_PATH = new 
PropertyDescriptor.Builder()
+.name("put-hb-rec-visibility-record-path")
+.displayName("Visibility String Record Path Root")
+.description("A record path that points to part of the record 
which contains a path to a mapping of visibility strings to record paths")
+.required(false)
+.addValidator(Validator.VALID) //new 
RecordPathPropertyNameValidator())
--- End diff --

Done


---


[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2518#discussion_r173482233
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
 ---
@@ -255,7 +253,16 @@ public void process(final InputStream in) throws 
IOException {
 final byte[] colFamBytes = 
columnFamily.getBytes(StandardCharsets.UTF_8);
 final byte[] colQualBytes = 
fieldName.getBytes(StandardCharsets.UTF_8);
 final byte[] colValBytes = fieldValueHolder.get();
-columns.add(new PutColumn(colFamBytes, colQualBytes, 
colValBytes, timestamp));
+
+final String visibilityStringToUse = 
pickVisibilityString(visibilityString, columnFamily, fieldName, flowFile);
+PutColumn column;
+if (visibilityStringToUse != null && 
!visibilityStringToUse.equals("")) {
--- End diff --

Done.


---


[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...

2018-03-09 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2518#discussion_r173455925
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
 ---
@@ -96,16 +97,18 @@ protected PutFlowFile createPut(final ProcessSession 
session, final ProcessConte
 
 
 final byte[] buffer = new byte[(int) flowFile.getSize()];
-session.read(flowFile, new InputStreamCallback() {
-@Override
-public void process(final InputStream in) throws IOException {
-StreamUtils.fillBuffer(in, buffer);
-}
-});
+session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
 
+PutColumn column = null;
+if (visibilityStringToUse != null && 
!visibilityStringToUse.equals("")) {
--- End diff --

I like that. Done.


---


<    5   6   7   8   9   10   11   12   13   14   >