[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175411532 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java --- @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "application/json"), +@WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"}) +@CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " + +"ElasticSearch JSON DSL. It currently does not support pagination.") +public class JsonQueryElasticsearch extends AbstractProcessor { +public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original") +.description("All original flowfiles that don't cause an error to occur go to this relationship. " + +"This applies even if you select the \"split up hits\" option to send individual hits to the " + +"\"hits\" relationship.").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_HITS = new Relationship.Builder().name("hits") +.description("Search hits are routed to this relationship.") +.build(); + +public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations") +.description("Aggregations are routed to this relationship.") +.build(); + +
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175411128 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java --- @@ -61,22 +58,23 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService { private ObjectMapper mapper = new ObjectMapper(); -private List properties; +static final private List properties; private RestClient client; private String url; -@Override -protected void init(ControllerServiceInitializationContext config) { -properties = new ArrayList<>(); -properties.add(ElasticSearchClientService.HTTP_HOSTS); -properties.add(ElasticSearchClientService.USERNAME); -properties.add(ElasticSearchClientService.PASSWORD); - properties.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE); -properties.add(ElasticSearchClientService.CONNECT_TIMEOUT); -properties.add(ElasticSearchClientService.SOCKET_TIMEOUT); -properties.add(ElasticSearchClientService.RETRY_TIMEOUT); +static { +List _props = new ArrayList(); --- End diff -- Done. ---
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175411050 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java --- @@ -121,7 +119,7 @@ private void setupClient(ConfigurationContext context) throws Exception { RestClientBuilder builder = RestClient.builder(hh) .setHttpClientConfigCallback(httpClientBuilder -> { -if (sslService != null) { +if (sslService != null && sslService.isKeyStoreConfigured() && sslService.isTrustStoreConfigured()) { try { --- End diff -- Done. I moved that logic out and have the it bubble it up with an InitializationException if any of those exceptions are thrown. ---
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175407337 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService { +private ObjectMapper mapper = new ObjectMapper(); + +static final private List properties; + +private RestClient client; + +private String url; + +static { +List _props = new ArrayList(); +_props.add(ElasticSearchClientService.HTTP_HOSTS); +_props.add(ElasticSearchClientService.USERNAME); +_props.add(ElasticSearchClientService.PASSWORD); +_props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE); +_props.add(ElasticSearchClientService.CONNECT_TIMEOUT); +_props.add(ElasticSearchClientService.SOCKET_TIMEOUT); +_props.add(ElasticSearchClientService.RETRY_TIMEOUT); + +properties = Collections.unmodifiableList(_props); +} + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@OnEnabled +public void onEnabled(final ConfigurationContext context) throws InitializationException { +try { +setupClient(context); +} catch (Exception ex) { +getLogger().error("Could not initialize ElasticSearch client.", ex); +throw new InitializationException(ex); +} +} + +@OnDisabled +public void onDisabled() throws IOException { +this.client.close(); +this.url = null; +} + +private void setupClient(ConfigurationContext context) throws MalformedURLException { +final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue(); +String[] hostsSplit = hosts.split(",[\\s]*"); +this.url = hostsSplit[0]; +final SSLContextService sslService = + context.getProperty(PROP_
[GitHub] nifi issue #2518: NIFI-4637 Added support for visibility labels to the HBase...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2518 Had to rebase because of the commit that added `ScanHBase`. That doesn't have visibility labels added to it yet. That needs to be done. Keeping this ticket open because there's plenty that can be reviewed in the mean time if anyone is interested. ---
[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2113 > I hope my review doesn't come off in the wrong way, there is a lot of great work here and I just want to make sure the usability is top notch. Not at all. It's all fair and good feedback. I'll take a crack at these tomorrow. ---
[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2560#discussion_r175296639 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java --- @@ -196,6 +237,33 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } +private void removeUpdateKeys(String updateKeyParam, Map doc) { +String[] parts = updateKeyParam.split(",[\\s]*"); +for (String part : parts) { +if (part.contains(".")) { --- End diff -- For this input: ``` { "name": "John Smith", "department": "Engineering" } ``` It makes no sense to remove `name` if we're doing a full document update using the `name key. Now consider this complex document: ``` { "name": "John Smith", "department": "Engineering", "contacts": { "email": "john.sm...@test.com" } } ``` To search on `email`, we have to submit this payload with the lookup key being `contacts.email`: ``` { "contacts.email": "john.sm...@test.com", "name": "John Smith", "department": "Engineering", "contacts": { "email": "john.sm...@test.com" } } ``` Mongo cannot do a lookup using this: `{ "contacts": { "email": "john.sm...@test.com" }}` So if we don't remove the complex lookup key, we are leaving extraneous information in the document that almost certainly has no value to the user. Now maybe we'll get an angry ticket complaining that they can't do periods in the key names, but I've never seen normal use cases where developers do that. The whole idea of creating complex key names for real data using periods and such flies in the face of how JSON is supposed to work. ---
[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2560#discussion_r175286408 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java --- @@ -137,6 +151,31 @@ return propertyDescriptors; } +@Override +protected Collection customValidate(final ValidationContext validationContext) { +List problems = new ArrayList<>(); + +final boolean queryKey = validationContext.getProperty(UPDATE_QUERY_KEY).isSet() +&& !StringUtils.isBlank(validationContext.getProperty(UPDATE_QUERY_KEY).getValue()); --- End diff -- I just remembered why I did this. When you call `removeProperty` to unset the property, the `getProperty` call here will return the default value. So maybe I need to remove the default value. What do you think? Is this a problem with the test helpers or my understanding of how properties should work? ---
[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2560#discussion_r175286257 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java --- @@ -96,6 +99,125 @@ public void testValidators() { Assert.assertEquals(0, results.size()); } +@Test +public void testQueryAndUpdateKey() { +runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id"); +runner.setProperty(PutMongo.UPDATE_QUERY, "{}"); +runner.assertNotValid(); +} + +@Test +public void testNoQueryAndNoUpdateKey() { +runner.removeProperty(PutMongo.UPDATE_QUERY); +runner.setProperty(PutMongo.UPDATE_QUERY_KEY, ""); +runner.assertNotValid(); +} + +@Test +public void testBlankUpdateKey() { +runner.setProperty(PutMongo.UPDATE_QUERY_KEY, " "); +runner.assertNotValid(); +} + +@Test +public void testUpdateQuery() { +Document document = new Document() +.append("name", "John Smith") +.append("department", "Engineering"); +collection.insertOne(document); +String updateBody = "{\n" + +"\t\"$set\": {\n" + +"\t\t\"email\": \"john.sm...@test.com\",\n" + +"\t\t\"grade\": \"Sr. Principle Eng.\"\n" + +"\t},\n" + +"\t\"$inc\": {\n" + +"\t\t\"writes\": 1\n" + +"\t}\n" + +"}"; +Map attr = new HashMap<>(); +attr.put("mongo.update.query", document.toJson()); +runner.setProperty(PutMongo.UPDATE_QUERY_KEY, ""); +runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS); +runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE); +runner.setProperty(PutMongo.UPDATE_QUERY, "${mongo.update.query}"); +runner.setValidateExpressionUsage(true); +runner.enqueue(updateBody, attr); +updateTests(document); +} + +@Test +public void testUpdateBySimpleKey() { +Document document = new Document() +.append("name", "John Smith") +.append("department", "Engineering"); +collection.insertOne(document); +String updateBody = "{\n" + +"\t\"name\": \"John Smith\",\n" + +"\t\"$set\": {\n" + +"\t\t\"email\": \"john.sm...@test.com\",\n" + +"\t\t\"grade\": \"Sr. Principle Eng.\"\n" + +"\t},\n" + +"\t\"$inc\": {\n" + +"\t\t\"writes\": 1\n" + +"\t}\n" + +"}"; +runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name"); +runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS); +runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE); +runner.setValidateExpressionUsage(true); +runner.enqueue(updateBody); +updateTests(document); +} + +@Test --- End diff -- Yeah. I'm adding 2 at the moment. One for update by keys and one for query. ---
[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2560#discussion_r175286235 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java --- @@ -86,19 +91,27 @@ .name("Update Query Key") .description("Key name used to build the update query criteria; this property is valid only when using update mode, " + "otherwise it is ignored") -.required(true) -.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.required(false) +.addValidator(Validator.VALID) .defaultValue("_id") .build(); +static final PropertyDescriptor UPDATE_QUERY = new PropertyDescriptor.Builder() +.name("putmongo-update-query") --- End diff -- Done. ---
[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2560#discussion_r175286234 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java --- @@ -169,14 +208,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session // update final boolean upsert = context.getProperty(UPSERT).asBoolean(); final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue(); +final String updateQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue(); +final Document query; -Object keyVal = ((Map)doc).get(updateKey); -if (updateKey.equals("_id") && ObjectId.isValid(((String)keyVal))) { -keyVal = new ObjectId((String) keyVal); +if (!StringUtils.isBlank(updateKey)) { +query = parseUpdateKey(updateKey, (Map)doc); +removeUpdateKeys(updateKey, (Map)doc); +} else { +query = Document.parse(updateQuery); --- End diff -- Done. ---
[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2560#discussion_r175285785 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java --- @@ -86,19 +91,27 @@ .name("Update Query Key") .description("Key name used to build the update query criteria; this property is valid only when using update mode, " + "otherwise it is ignored") -.required(true) -.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.required(false) +.addValidator(Validator.VALID) --- End diff -- Wasn't sure if that's the case. ---
[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2113 Ok. It should all be there now. ---
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175271885 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java --- @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "application/json"), +@WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"}) +@CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " + +"ElasticSearch JSON DSL. It currently does not support pagination.") --- End diff -- Ok. I should also note that scroll queries are not supported. ---
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175271865 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java --- @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.net.URL; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService { +private ObjectMapper mapper = new ObjectMapper(); + +private List properties; + +private RestClient client; + +private String url; + +@Override +protected void init(ControllerServiceInitializationContext config) { +properties = new ArrayList<>(); +properties.add(ElasticSearchClientService.HTTP_HOSTS); +properties.add(ElasticSearchClientService.USERNAME); +properties.add(ElasticSearchClientService.PASSWORD); + properties.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE); +properties.add(ElasticSearchClientService.CONNECT_TIMEOUT); +properties.add(ElasticSearchClientService.SOCKET_TIMEOUT); +properties.add(ElasticSearchClientService.RETRY_TIMEOUT); +} + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@OnEnabled +public void onEnabled(final ConfigurationContext context) throws InitializationException { +try { +setupClient(context); +} catch (Exception ex) { +getLogger().error("Could not initialize ElasticSearch client.", ex); +throw new InitializationException(ex); +} +} + +@OnDisabled +public void onDisabled() throws IOException { +this.client.close(); +this.url = null; +} + +private void setupClient(ConfigurationContext context) throws Exception { +final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue(); +Stri
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175271759 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java --- @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.net.URL; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService { +private ObjectMapper mapper = new ObjectMapper(); + +private List properties; + +private RestClient client; + +private String url; + +@Override +protected void init(ControllerServiceInitializationContext config) { +properties = new ArrayList<>(); +properties.add(ElasticSearchClientService.HTTP_HOSTS); +properties.add(ElasticSearchClientService.USERNAME); +properties.add(ElasticSearchClientService.PASSWORD); + properties.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE); +properties.add(ElasticSearchClientService.CONNECT_TIMEOUT); +properties.add(ElasticSearchClientService.SOCKET_TIMEOUT); +properties.add(ElasticSearchClientService.RETRY_TIMEOUT); +} + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@OnEnabled +public void onEnabled(final ConfigurationContext context) throws InitializationException { +try { +setupClient(context); +} catch (Exception ex) { +getLogger().error("Could not initialize ElasticSearch client.", ex); +throw new InitializationException(ex); +} +} + +@OnDisabled +public void onDisabled() throws IOException { +this.client.close(); +this.url = null; +} + +private void setupClient(ConfigurationContext context) throws Exception { +final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue(); +Stri
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175271747 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java --- @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.net.URL; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService { +private ObjectMapper mapper = new ObjectMapper(); + +private List properties; + +private RestClient client; + +private String url; + +@Override +protected void init(ControllerServiceInitializationContext config) { +properties = new ArrayList<>(); +properties.add(ElasticSearchClientService.HTTP_HOSTS); +properties.add(ElasticSearchClientService.USERNAME); +properties.add(ElasticSearchClientService.PASSWORD); + properties.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE); +properties.add(ElasticSearchClientService.CONNECT_TIMEOUT); +properties.add(ElasticSearchClientService.SOCKET_TIMEOUT); +properties.add(ElasticSearchClientService.RETRY_TIMEOUT); +} + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@OnEnabled +public void onEnabled(final ConfigurationContext context) throws InitializationException { +try { +setupClient(context); +} catch (Exception ex) { +getLogger().error("Could not initialize ElasticSearch client.", ex); +throw new InitializationException(ex); +} +} + +@OnDisabled +public void onDisabled() throws IOException { +this.client.close(); +this.url = null; +} + +private void setupClient(ConfigurationContext context) throws Exception { --- End diff -- Done. ---
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175271738 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java --- @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.net.URL; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService { +private ObjectMapper mapper = new ObjectMapper(); + +private List properties; --- End diff -- Done. ---
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175271547 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java --- @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "application/json"), +@WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"}) +@CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " + +"ElasticSearch JSON DSL. It currently does not support pagination.") +public class JsonQueryElasticsearch extends AbstractProcessor { +public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original") +.description("All original flowfiles that don't cause an error to occur go to this relationship. " + +"This applies even if you select the \"split up hits\" option to send individual hits to the " + +"\"hits\" relationship.").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_HITS = new Relationship.Builder().name("hits") +.description("Search hits are routed to this relationship.") +.build(); + +public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations") +.description("Aggregations are routed to this relationship.") +.build(); + +
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175271557 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java --- @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "application/json"), +@WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"}) +@CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " + +"ElasticSearch JSON DSL. It currently does not support pagination.") +public class JsonQueryElasticsearch extends AbstractProcessor { +public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original") +.description("All original flowfiles that don't cause an error to occur go to this relationship. " + +"This applies even if you select the \"split up hits\" option to send individual hits to the " + +"\"hits\" relationship.").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_HITS = new Relationship.Builder().name("hits") +.description("Search hits are routed to this relationship.") +.build(); + +public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations") +.description("Aggregations are routed to this relationship.") +.build(); + +
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175271522 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java --- @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +@WritesAttributes({ +@WritesAttribute(attribute = "mime.type", description = "application/json"), +@WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"}) +@CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " + +"ElasticSearch JSON DSL. It currently does not support pagination.") +public class JsonQueryElasticsearch extends AbstractProcessor { +public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original") +.description("All original flowfiles that don't cause an error to occur go to this relationship. " + +"This applies even if you select the \"split up hits\" option to send individual hits to the " + +"\"hits\" relationship.").build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + +public static final Relationship REL_HITS = new Relationship.Builder().name("hits") +.description("Search hits are routed to this relationship.") +.build(); + +public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations") +.description("Aggregations are routed to this relationship.") +.build(); + +
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175271497 --- Diff: nifi-nar-bundles/nifi-standard-services/pom.xml --- @@ -15,6 +15,18 @@ --> http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> 4.0.0 + --- End diff -- Done. ---
[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2113 @JPercivall Thanks for the feedback. I'll get to working on these. WRT: > Lastly, it's preferred if you don't squash your commits every time. If you don't, that allows the reviewer to more easily see exactly what changed since they last reviewed it. Also allows reviewers to see how the PR evolved over time in response to different comments. Ok, I can do that from now on. Shouldn't they be squashed before a merge into master? ---
[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2113 @JPercivall Done. ---
[GitHub] nifi pull request #2560: NIFI-4989 Made PutMongo able to use nested lookup k...
GitHub user MikeThomsen opened a pull request: https://github.com/apache/nifi/pull/2560 NIFI-4989 Made PutMongo able to use nested lookup keys, a query param… … and multiple lookup keys. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MikeThomsen/nifi NIFI-4989 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2560.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2560 commit 380e342643885b8919599a1e8d329dfbb89600a6 Author: Mike Thomsen Date: 2018-03-17T01:51:19Z NIFI-4989 Made PutMongo able to use nested lookup keys, a query param and multiple lookup keys. ---
[GitHub] nifi issue #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2517 Sorry, haven't had time. There's a merge conflict now. Can you fix that? ---
[GitHub] nifi issue #2540: Nifi 4914
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2540 Saw you opened a new version of this. Can you close this one? ---
[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2113 @JPercivall It's ready for review. ---
[GitHub] nifi issue #2553: Nifi 4908 rebase
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2553 @david-streamlio thanks for the link. > Is there another car bundle that uses a Docker image for integration testing that I can use as an example? The Mongo package, sorta. We wrote all of the tests to assume Mongo defaults, so if you set up a simple Docker install of Mongo, all of the defaults will just click between the image and the int tests. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174883410 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174881995 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174874403 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +import java.util.Iterator; +import java.util.Properties; +import java.util.Vector; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +public class ResourcePoolImpl implements ResourcePool { + +private final Lock lock = new ReentrantLock(); +private final Condition poolAvailable = lock.newCondition(); +private int max_resources; +private final Vector pool; + +private final ResourceExceptionHandler resourceExceptionHandler; +private final ResourceFactory factory; + +public ResourcePoolImpl(ResourceFactory factory, int max_resources) { +this(factory, new ResourceExceptionHandlerImpl(), max_resources); +} + +public ResourcePoolImpl(ResourceFactory factory, ResourceExceptionHandler handler, int max_resources) { +lock.lock(); +try { +this.factory = factory; +this.resourceExceptionHandler = handler; +this.max_resources = max_resources; +this.pool = new Vector(max_resources); +} finally { +lock.unlock(); +} +} + +private R createResource(Properties props) { +R resource = null; +try { + +resource = factory.create(props); + +if (resource == null) --- End diff -- Curly brackets. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174881541 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name
[GitHub] nifi issue #2553: Nifi 4908 rebase
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2553 You should strongly consider setting up some integration tests that can be run against a simple Docker image. All you have to do with NiFi to get that done is add a few classes with "IT" at the end of their name and you can run them with `mvn integration-test -Pintegration-tests`. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174881132 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174873036 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of thr
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174880893 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174872527 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of thr
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174878320 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestStandardPulsarClientService { + +@Before +public void init() { --- End diff -- Looks like you can delete this. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174882988 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174884957 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarProducer; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API." ++ "The messages to send may be individual FlowFiles or may be delimited, using a " ++ "user-specified delimiter, such as a new-line. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +public class PublishPulsar_1_0 extends AbstractPulsarProcessor { + +protected static final String MSG_COUNT = "msg.count"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableVal
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174883778 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarProducer; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API." ++ "The messages to send may be individual FlowFiles or may be delimited, using a " ++ "user-specified delimiter, such as a new-line. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +public class PublishPulsar_1_0 extends AbstractPulsarProcessor { --- End diff -- Consider changing it to `PublishPulsar_1_X`. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174884531 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarProducer; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API." ++ "The messages to send may be individual FlowFiles or may be delimited, using a " ++ "user-specified delimiter, such as a new-line. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +public class PublishPulsar_1_0 extends AbstractPulsarProcessor { + +protected static final String MSG_COUNT = "msg.count"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableVal
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174879831 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { --- End diff -- Also note that as it is an incubator project and someone really doesn't want to track 1.X going forward AND complains that you broke compatibility for them by staying up to date with 1.X, that's on them. Incubator projects are by definition moving targets and should be handled that way during risk assessment by teams using them. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174872630 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of thr
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174874448 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +import java.util.Iterator; +import java.util.Properties; +import java.util.Vector; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +public class ResourcePoolImpl implements ResourcePool { + +private final Lock lock = new ReentrantLock(); +private final Condition poolAvailable = lock.newCondition(); +private int max_resources; +private final Vector pool; + +private final ResourceExceptionHandler resourceExceptionHandler; +private final ResourceFactory factory; + +public ResourcePoolImpl(ResourceFactory factory, int max_resources) { +this(factory, new ResourceExceptionHandlerImpl(), max_resources); +} + +public ResourcePoolImpl(ResourceFactory factory, ResourceExceptionHandler handler, int max_resources) { +lock.lock(); +try { +this.factory = factory; +this.resourceExceptionHandler = handler; +this.max_resources = max_resources; +this.pool = new Vector(max_resources); +} finally { +lock.unlock(); +} +} + +private R createResource(Properties props) { +R resource = null; +try { + +resource = factory.create(props); --- End diff -- There's some extraneous white space around this. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174879478 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { --- End diff -- You should consider changing this to `ConsumePulsar_1_X` to warn users that you may be moving the internal client compatibility forward if let's say 1.4 breaks compatibility with the current 1.2 branch in the incubator. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174878122 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +import java.util.Iterator; +import java.util.Properties; +import java.util.Vector; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +public class ResourcePoolImpl implements ResourcePool { + +private final Lock lock = new ReentrantLock(); +private final Condition poolAvailable = lock.newCondition(); +private int max_resources; +private final Vector pool; + +private final ResourceExceptionHandler resourceExceptionHandler; +private final ResourceFactory factory; + +public ResourcePoolImpl(ResourceFactory factory, int max_resources) { +this(factory, new ResourceExceptionHandlerImpl(), max_resources); +} + +public ResourcePoolImpl(ResourceFactory factory, ResourceExceptionHandler handler, int max_resources) { +lock.lock(); +try { +this.factory = factory; +this.resourceExceptionHandler = handler; +this.max_resources = max_resources; +this.pool = new Vector(max_resources); +} finally { +lock.unlock(); +} +} + +private R createResource(Properties props) { +R resource = null; +try { + +resource = factory.create(props); + +if (resource == null) +throw new ResourceCreationException("Unable to create resource"); + +} catch (Exception e) { +resourceExceptionHandler.handle(e); +} +return resource; +} + + +/* + * Shutdown the pool and release the resources + */ +public void close() { + +Iterator itr = pool.iterator(); +while (itr.hasNext()) { +itr.next().close(); +} + +} + +public boolean isEmpty() { +return (pool.isEmpty()); +} + +public boolean isFull() { +return (pool != null && pool.size() == max_resources); +} + +@Override +public R acquire(Properties props) throws InterruptedException { +lock.lock(); +try { +while (max_resources <= 0) { +poolAvailable.await(); +} + +--max_resources; + +if (pool != null) { +int size = pool.size(); +if (size > 0) +return pool.remove(size - 1); +} +return createResource(props); +} finally { +lock.unlock(); +} +} + +@Override +public void evict(R resource) { +lock.lock(); +try { + +// Attempt to close the connection +if (!resource.isClosed()) --- End diff -- Curly brackets. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174879028 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml --- @@ -0,0 +1,78 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.6.0-SNAPSHOT + + +nifi-pulsar-processors +jar + + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-record-serialization-service-api + + +org.apache.nifi +nifi-record + + +org.apache.nifi +nifi-utils +1.6.0-SNAPSHOT + + +org.apache.nifi +nifi-ssl-context-service-api + + +org.apache.nifi +nifi-pulsar-client-service-api +1.6.0-SNAPSHOT +provided + + + org.apache.pulsar --- End diff -- Broken indentation level. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174872872 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of thr
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174884100 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarProducer; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API." ++ "The messages to send may be individual FlowFiles or may be delimited, using a " ++ "user-specified delimiter, such as a new-line. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +public class PublishPulsar_1_0 extends AbstractPulsarProcessor { + +protected static final String MSG_COUNT = "msg.count"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableVal
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174866298 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java --- @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +public interface PoolableResource { + +public void close(); --- End diff -- Javadoc. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174882429 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174882522 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174884863 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarProducer; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API." ++ "The messages to send may be individual FlowFiles or may be delimited, using a " ++ "user-specified delimiter, such as a new-line. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +public class PublishPulsar_1_0 extends AbstractPulsarProcessor { + +protected static final String MSG_COUNT = "msg.count"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableVal
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174874129 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of thr
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174878824 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestStandardPulsarClientService { + +@Before +public void init() { + +} + +@Test +public void testService() throws InitializationException { +final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); +final PulsarClientPool service = new StandardPulsarClientPool(); +runner.addControllerService("test-good", service); + +runner.setProperty(service, StandardPulsarClientPool.PULSAR_SERVICE_URL, "localhost:6667"); +// runner.enableControllerService(service); --- End diff -- I think you might actually needs this. If that's not the case, it should be removed. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174868443 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceFactory.java --- @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +import java.util.Properties; + +public interface ResourceFactory { + +public R create(Properties props) throws ResourceCreationException; --- End diff -- Javadoc ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174865149 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/pom.xml --- @@ -0,0 +1,40 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.6.0-SNAPSHOT + + +nifi-pulsar-client-service-api +jar + + + +org.apache.nifi +nifi-api +provided + + + org.apache.pulsar --- End diff -- Nit: indent level is broken here. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174866349 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java --- @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +public interface PoolableResource { + +public void close(); + +public boolean isClosed(); --- End diff -- Javadoc ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174866791 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceExceptionHandler.java --- @@ -0,0 +1,23 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +public interface ResourceExceptionHandler { + +void handle(Exception exc); --- End diff -- Javadoc ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174865317 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.pulsar.pool.ResourcePool; + + +@Tags({"Pulsar"}) +@CapabilityDescription("Provides the ability to create Pulsar Producer / Consumer instances on demand, based on the configuration." + + "properties defined") +public interface PulsarClientPool extends ControllerService { + +public ResourcePool getProducerPool(); --- End diff -- There should be a basic javadoc here. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174865373 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.pulsar.pool.ResourcePool; + + +@Tags({"Pulsar"}) +@CapabilityDescription("Provides the ability to create Pulsar Producer / Consumer instances on demand, based on the configuration." + + "properties defined") +public interface PulsarClientPool extends ControllerService { + +public ResourcePool getProducerPool(); + +public ResourcePool getConsumerPool(); --- End diff -- Same here. ---
[GitHub] nifi issue #2501: NIFI-4743 Added configurable null suppression to PutElasti...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2501 @pvillard31 Can you close this? ---
[GitHub] nifi issue #2494: NIFI-4912: Update jackson version to latest stable version...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2494 @derekstraka You have a merge conflict now. ---
[GitHub] nifi pull request #2549: NIFI-4979: Fix ReportLineageToAtlas documentation e...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2549#discussion_r174761784 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java --- @@ -91,10 +91,12 @@ import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION; @Tags({"atlas", "lineage"}) -@CapabilityDescription("Publishes NiFi flow data set level lineage to Apache Atlas." + -" By reporting flow information to Atlas, an end-to-end Process and DataSet lineage such as across NiFi environments and other systems" + -" connected by technologies, for example NiFi Site-to-Site, Kafka topic or Hive tables." + -" There are limitations and required configurations for both NiFi and Atlas. See 'Additional Details' for further description.") +@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." + +" End-to-end lineages across NiFi environments and other systems can be reported those are" + --- End diff -- > can be reported those are Did you mean something like *can be reported **if** those are* ---
[GitHub] nifi issue #2548: NIFI-4978: Fixed ReportLineageToAtlas NPE when unscheduled
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2548 +1 LGTM ---
[GitHub] nifi issue #2493: Added Pulsar processors and Controller Service
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2493 Pulsar looks interesting, so if you can do the rebase I'll try to help out with the review. ---
[GitHub] nifi issue #2493: Added Pulsar processors and Controller Service
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2493 @david-streamlio Looks like you pulled in about 30 some commits from other folks into this. Can you rebase the branch so that only your commits are part of the PR? ---
[GitHub] nifi issue #2541: Exception in thread "main" java.lang.NoClassDefFoundError:...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2541 -1 No commits from the submitter. If you need help, the NiFi users mailing list is the appropriate place. Not whatever this is. ---
[GitHub] nifi pull request #2546: NIFI-4975 Add GridFS processors
GitHub user MikeThomsen opened a pull request: https://github.com/apache/nifi/pull/2546 NIFI-4975 Add GridFS processors Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MikeThomsen/nifi NIFI-4975 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2546.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2546 commit caef8ce496ab861e8e32224aa37fda32d43f45d2 Author: Mike Thomsen Date: 2018-03-10T17:57:28Z NIFI-4975 Add GridFS processors ---
[GitHub] nifi issue #2501: NIFI-4743 Added configurable null suppression to PutElasti...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2501 @pvillard31 @robertrbruno I checked in a new commit based on the feedback. Please take a look when you have a min. ---
[GitHub] nifi pull request #2501: NIFI-4743 Added configurable null suppression to Pu...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2501#discussion_r174483646 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java --- @@ -284,6 +306,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session return; } +final NullSuppression suppression; --- End diff -- Done. ---
[GitHub] nifi pull request #2501: NIFI-4743 Added configurable null suppression to Pu...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2501#discussion_r174483710 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/NullSuppression.java --- @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch; + +public enum NullSuppression { +ALWAYS_SUPPRESS, +NEVER_SUPPRESS, +SUPPRESS_MISSING --- End diff -- Done. ---
[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2113 2/3 builds pass now. ---
[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2113 @JPercivall @mattyb149 Once I changed the code to use `transfer(FlowFile, Relationship)` and not `transfer(List, Relationship)` the error stopped happening. ---
[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2113 @JPercivall @mattyb149 That exception only happens for me when I stop the processor. It happens here: `` if (hitsFlowFiles.size() > 0) { session.transfer(hitsFlowFiles, REL_HITS); for (FlowFile ff : hitsFlowFiles) { session.getProvenanceReporter().send(ff, clientService.getTransitUrl(index, type)); } } `` On the provenance reporting line. Any ideas? ---
[GitHub] nifi pull request #2113: NIFI-4325 Added new processor that uses the JSON DS...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r174078307 --- Diff: nifi-assembly/pom.xml --- @@ -445,6 +445,24 @@ language governing permissions and limitations under the License. --> 1.6.0-SNAPSHOT nar + +org.apache.nifi + nifi-elasticsearch-client-service-api-nar --- End diff -- Weird, I thought that I took that out. ---
[GitHub] nifi issue #2150: NIFI-3402: Added etag support to InvokeHTTP
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2150 @pvillard31 @m-hogue Think we can close the loop on this one? ---
[GitHub] nifi issue #2501: NIFI-4743 Added configurable null suppression to PutElasti...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2501 @pvillard31 If you have a min this is a small one that'd be good to have in 1.6. @robertrbruno wrote 95% of it, but I took his patch from Jira and made it into a PR. We've both kicked the tires, and it seems solid. ---
[GitHub] nifi issue #2530: NIFI-4800 Expose the flattenMode as property in FlattenJSO...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2530 +1 LGTM. ---
[GitHub] nifi issue #2113: NIFI-4325 Added new processor that uses the JSON DSL.
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2113 Rebased and building. @mattyb149 I changed the functionality to match the GetMongo functionality you reviewed recently. Had to spend a while getting the build to work again. ---
[GitHub] nifi issue #2448: NIFI-4838 Added configurable progressive commits to GetMon...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2448 @pvillard31 I found that a catch block was suppressing a critical problem with how I was handling the input flowfile. When I'd call `session.commit()` after a block of results were written to a flowfile, it would throw an exception because the input flowfile had not been transferred yet. So what I am going to do is redo this to make it configurable again. With the default behavior being only one single commit. Then users can choose a "streaming mode" where at the the first sign of success it transfers the input flowfile to REL_ORIGINAL. Then if an error happens, it would write a new flowfile with the same attributes and content as the original and send that cloned copy to the failure relationship. It may not be perfect, but I don't think NiFi has a way to do a partial commit which is what I'd need to make it work as-is. ---
[GitHub] nifi issue #2448: NIFI-4838 Added configurable progressive commits to GetMon...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2448 @pvillard31 Done. ---
[GitHub] nifi issue #2448: NIFI-4838 Added configurable progressive commits to GetMon...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2448 Will do. I should have it rebased tonight unless something comes up. ---
[GitHub] nifi issue #2527: FetchHBaseRow - log level and displayName
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2527 @bdesert When I read the commit, I wasn't sure what regression it was actually fixing. ---
[GitHub] nifi issue #2527: FetchHBaseRow - log level and displayName
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2527 What sort of regression was this supposed to address? ---
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173582090 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", &q
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173581872 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", &q
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173581678 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java --- @@ -66,6 +67,15 @@ public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue( "Standard", "Standard", "A stand-alone Solr instance."); +public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor --- End diff -- Ok. I might be able to help with some of that. ---
[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r173485840 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java --- @@ -103,6 +103,17 @@ .defaultValue("UTF-8") .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); +static final PropertyDescriptor VISIBLITY_LABEL = new PropertyDescriptor.Builder() +.name("delete-visibility-label") +.displayName("Visibility Label") +.description("If visibility labels are enabled, a row cannot be deleted without supplying its visibility label(s) in the delete " + +"request. Note: this visibility label will be applied to all cells within the row that is specified. If some cells have " + +"different visibility labels, they will not be deleted. When that happens, the failure to delete will be considered a success " + +"because HBase does not report it as a failure.") +.required(false) --- End diff -- I'm not sure about that, but you have a point on being proactive. ---
[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r173484771 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -349,18 +399,54 @@ public boolean checkAndPut(final String tableName, final byte[] rowId, final byt @Override public void delete(final String tableName, final byte[] rowId) throws IOException { +delete(tableName, rowId, null); +} + +@Override +public void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException { try (final Table table = connection.getTable(TableName.valueOf(tableName))) { Delete delete = new Delete(rowId); +if (visibilityLabel != null && !visibilityLabel.trim().equals("")) { +delete.setCellVisibility(new CellVisibility(visibilityLabel)); +} table.delete(delete); } } @Override public void delete(String tableName, List rowIds) throws IOException { +delete(tableName, rowIds); +} + +@Override +public void deleteCells(String tableName, List deletes) throws IOException { +List deleteRequests = new ArrayList<>(); +for (int index = 0; index < deletes.size(); index++) { +DeleteRequest req = deletes.get(index); +Delete delete = new Delete(req.getRowId()) +.addColumn(req.getColumnFamily(), req.getColumnQualifier()); +if (req.getVisibilityLabel() != null && !req.getVisibilityLabel().trim().equals("")) { +delete.setCellVisibility(new CellVisibility(req.getVisibilityLabel())); +} +deleteRequests.add(delete); +} +batchDelete(tableName, deleteRequests); +} + +@Override +public void delete(String tableName, List rowIds, String visibilityLabel) throws IOException { List deletes = new ArrayList<>(); for (int index = 0; index < rowIds.size(); index++) { -deletes.add(new Delete(rowIds.get(index))); +Delete delete = new Delete(rowIds.get(index)); +if (visibilityLabel != null && !visibilityLabel.trim().equals("")) { --- End diff -- Done. ---
[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r173484741 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -349,18 +399,54 @@ public boolean checkAndPut(final String tableName, final byte[] rowId, final byt @Override public void delete(final String tableName, final byte[] rowId) throws IOException { +delete(tableName, rowId, null); +} + +@Override +public void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException { try (final Table table = connection.getTable(TableName.valueOf(tableName))) { Delete delete = new Delete(rowId); +if (visibilityLabel != null && !visibilityLabel.trim().equals("")) { +delete.setCellVisibility(new CellVisibility(visibilityLabel)); +} table.delete(delete); } } @Override public void delete(String tableName, List rowIds) throws IOException { +delete(tableName, rowIds); +} + +@Override +public void deleteCells(String tableName, List deletes) throws IOException { +List deleteRequests = new ArrayList<>(); +for (int index = 0; index < deletes.size(); index++) { +DeleteRequest req = deletes.get(index); +Delete delete = new Delete(req.getRowId()) +.addColumn(req.getColumnFamily(), req.getColumnQualifier()); +if (req.getVisibilityLabel() != null && !req.getVisibilityLabel().trim().equals("")) { --- End diff -- Done. ---
[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r173483953 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java --- @@ -126,6 +138,25 @@ void delete(String tableName, List rowIds) throws IOException; +/** + * Deletes a list of cells from HBase. This is intended to be used with granual delete operations. --- End diff -- I think that's a reasonable word choice because deleting whole row is not necessarily possible all at once with visibility labels enabled. Each cell that has a label must have its visibility label sent as part of the delete request. So you have to build a nuanced delete request. ---
[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r173483514 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java --- @@ -385,7 +427,18 @@ protected PutFlowFile createPut(ProcessContext context, Record record, RecordSch if (fieldValueBytes != null) { -columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp)); +PutColumn column; + +String visString = (visField != null && visSettings != null && visSettings.containsKey(name)) +? (String)visSettings.get(name) : defaultVisibility; + +if (visString != null && !visString.equals("") ) { --- End diff -- Done. ---
[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r173482943 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java --- @@ -150,6 +194,40 @@ */ void scan(String tableName, byte[] startRow, byte[] endRow, Collection columns, ResultHandler handler) throws IOException; +/** + * Scans the given table for the given rowId and passes the result to the handler. + * + * @param tableName the name of an HBase table to scan + * @param startRow the row identifier to start scanning at + * @param endRow the row identifier to end scanning at + * @param columns optional columns to return, if not specified all columns are returned + * @param visibilityLabels optional list of visibility labels that the user should be able to see when communicating with HBase + * @param handler a handler to process rows of the result + * @throws IOException thrown when there are communication errors with HBase + */ +void scan(String tableName, byte[] startRow, byte[] endRow, Collection columns, List visibilityLabels, ResultHandler handler) throws IOException; + +/** + * Get all of the labels in HBase. + * + * @return a List of all of the labels. + */ +List getLabels(); + +/** + * Get all of the labels a given user can see. + * @param user the user to lookup. + * @return a List of all of the labels a user is allowed to see. + */ +List getLabelsForUser(String user); + +/** + * Get all of the labels the current user (NiFi process user or Kerberos keytab principle) can see. + * + * @return a List of all of the labels the current can see. --- End diff -- I have to give this some more thought, but HBase visibility labels work with or without a Kerberos configuration. So without it, the HBase client does simple auth and sends over the user running NiFi as the user. ---
[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r173482568 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java --- @@ -194,6 +220,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue(); +final String recordPathText = context.getProperty(VISIBILITY_RECORD_PATH).getValue(); + +RecordPath recordPath = null; +if (recordPathCache != null && recordPathText != null && !recordPathText.equals("")) { --- End diff -- Done. ---
[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r173482333 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java --- @@ -141,6 +149,16 @@ .allowableValues(NULL_FIELD_EMPTY, NULL_FIELD_SKIP) .build(); +protected static final PropertyDescriptor VISIBILITY_RECORD_PATH = new PropertyDescriptor.Builder() +.name("put-hb-rec-visibility-record-path") +.displayName("Visibility String Record Path Root") +.description("A record path that points to part of the record which contains a path to a mapping of visibility strings to record paths") +.required(false) +.addValidator(Validator.VALID) //new RecordPathPropertyNameValidator()) --- End diff -- Done ---
[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r173482233 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java --- @@ -255,7 +253,16 @@ public void process(final InputStream in) throws IOException { final byte[] colFamBytes = columnFamily.getBytes(StandardCharsets.UTF_8); final byte[] colQualBytes = fieldName.getBytes(StandardCharsets.UTF_8); final byte[] colValBytes = fieldValueHolder.get(); -columns.add(new PutColumn(colFamBytes, colQualBytes, colValBytes, timestamp)); + +final String visibilityStringToUse = pickVisibilityString(visibilityString, columnFamily, fieldName, flowFile); +PutColumn column; +if (visibilityStringToUse != null && !visibilityStringToUse.equals("")) { --- End diff -- Done. ---
[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r173455925 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java --- @@ -96,16 +97,18 @@ protected PutFlowFile createPut(final ProcessSession session, final ProcessConte final byte[] buffer = new byte[(int) flowFile.getSize()]; -session.read(flowFile, new InputStreamCallback() { -@Override -public void process(final InputStream in) throws IOException { -StreamUtils.fillBuffer(in, buffer); -} -}); +session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer)); +PutColumn column = null; +if (visibilityStringToUse != null && !visibilityStringToUse.equals("")) { --- End diff -- I like that. Done. ---