[ https://issues.apache.org/jira/browse/NIFI-2724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15610089#comment-15610089 ]
ASF GitHub Bot commented on NIFI-2724: -------------------------------------- Github user trixpan commented on a diff in the pull request: https://github.com/apache/nifi/pull/1016#discussion_r85244266 --- Diff: nifi-nar-bundles/nifi-jmx-bundle/nifi-jmx-processors/src/main/java/org/apache/nifi/processors/jmx/GetJMX.java --- @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.jmx; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import javax.json.Json; +import javax.json.JsonBuilderFactory; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import javax.management.ReflectionException; +import javax.management.InstanceNotFoundException; +import javax.management.IntrospectionException; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.ListIterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Tags({"JMX"}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@SeeAlso({}) +@CapabilityDescription( + "Connects to the JMX RMI Url on the configured hostname and port. " + + "All domains are queried and can be filtered by providing the full domain name " + + "and optional MBean type as whitelist or blacklist parameters.\n\n" + + "Blacklist example to exclude all types that start with 'Memory' and GarbageCollector from the " + + "java.lang domain and everything from the java.util.logging domain:" + + "\n\njava.lang:Memory.* GarbageCollector,java.util.logging") +public class GetJMX extends AbstractProcessor { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor + .Builder().name("Hostname") + .description("The JMX Hostname or IP address") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PORT = new PropertyDescriptor + .Builder().name("Port") + .description("The JMX Port") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + + public static final PropertyDescriptor WHITELIST = new PropertyDescriptor + .Builder().name("DomainWhiteList") + .description("Include only these MBean domain(s) and type(s). Domains are comma delimited " + + "and optional MBean types follow a colon and are space delimited. " + + "Format: [domain1[:type1[ type2]][,domain2]]") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + public static final PropertyDescriptor BLACKLIST = new PropertyDescriptor + .Builder().name("DomainBlackList") + .description("Include everything excluding these MBean domain(s) and type(s). Domains are " + + "comma delimited and optional MBean types follow a colon and are space delimited. " + + "Format: [domain1[:type1[ type2]][,domain2]])") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder() + .name("Polling Interval") + .description("Indicates how long to wait before performing a directory listing") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("300 sec") + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of files to pull in each iteration") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are created are routed to this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that encounter errors are routed to this relationship") + .build(); + + private List<PropertyDescriptor> descriptors; + + private Set<Relationship> relationships; + private final BlockingQueue<Record> queue = new LinkedBlockingQueue<>(); + + private final Set<Record> inProcess = new HashSet<>(); // guarded by queueLock + private final Set<Record> recentlyProcessed = new HashSet<>(); // guarded by queueLock + + private final Lock queueLock = new ReentrantLock(); + private final Lock listingLock = new ReentrantLock(); + private final AtomicLong queueLastUpdated = new AtomicLong(0L); + + private final JsonBuilderFactory jsonBuilderFactory = Json.createBuilderFactory(null); + + private ListFilter whiteListFilter; + private ListFilter blackListFilter; + + @Override + protected void init(final ProcessorInitializationContext context) { --- End diff -- @olegz once gave me a hint that there's an issue with it since init() method as well as few others are invoked more then once per life-cycle of a processor (see https://issues.apache.org/jira/browse/NIFI-1318). Consider using a static initializer > JMX Processor > ------------- > > Key: NIFI-2724 > URL: https://issues.apache.org/jira/browse/NIFI-2724 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Affects Versions: 1.0.0 > Environment: All platforms with Java RMI support for JMX > Reporter: Brian Burnett > Assignee: Joseph Witt > Priority: Minor > Labels: processor > Fix For: 1.1.0 > > Attachments: 0001-NIFI-2724-New-JMX-Processor.patch > > Original Estimate: 24h > Remaining Estimate: 24h > > The JMX Processor feature addition includes only GetJMX without > SecurityManager capabilities at this time. The processor in its current > state is capable of pulling MBean Property and Attribute values from a remote > RMI Server. Each set of Mbean data is wrapped in a JSON formatted FlowFile > for downstream processing. It has the ability to control content with > whitelist and blacklist properties. > Possible use for this processor and the reason it was created is to help make > sense of Kafka server metrics. > Will followup with a SecurityManager Context Service and PutJMX Processor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)