[ https://issues.apache.org/jira/browse/NIFI-2724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15997455#comment-15997455 ]
ASF GitHub Bot commented on NIFI-2724: -------------------------------------- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1016#discussion_r114887372 --- Diff: nifi-nar-bundles/nifi-jmx-bundle/nifi-jmx-processors/src/main/java/org/apache/nifi/processors/jmx/GetJMX.java --- @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.jmx; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import javax.management.ReflectionException; +import javax.management.InstanceNotFoundException; +import javax.management.IntrospectionException; +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Timestamp; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.ListIterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Tags({"JMX"}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@SeeAlso({}) +@CapabilityDescription( + "Connects to the JMX RMI Url on the configured hostname and port. " + + "All domains are queried and can be filtered by providing the full domain name " + + "and optional MBean type as whitelist or blacklist parameters.\n\n" + + "Blacklist example to exclude all types that start with 'Memory' and GarbageCollector from the " + + "java.lang domain and everything from the java.util.logging domain:" + + "\n\njava.lang:Memory.* GarbageCollector,java.util.logging") +@WritesAttributes({ + @WritesAttribute(attribute="hostname", description="The name of the host that the object originates."), + @WritesAttribute(attribute="port", description="The JMX connection port that the object originates."), + @WritesAttribute(attribute="timestamp", description="The timestamp of when the object was emitted.") }) + +public class GetJMX extends AbstractProcessor { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor + .Builder().name("Hostname") + .displayName("Hostname") + .description("The JMX Hostname or IP address") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PORT = new PropertyDescriptor + .Builder().name("Port") + .displayName("Port") + .description("The JMX Port") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + + public static final PropertyDescriptor WHITELIST = new PropertyDescriptor + .Builder().name("DomainWhiteList") + .displayName("Domain White List") + .description("Include only these MBean domain(s) and type(s). Domains are comma delimited " + + "and optional MBean types follow a colon and are space delimited. " + + "Format: [domain1[:type1[ type2]][,domain2]]") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + public static final PropertyDescriptor BLACKLIST = new PropertyDescriptor + .Builder().name("DomainBlackList") + .displayName("Domain Black List") + .description("Include everything excluding these MBean domain(s) and type(s). Domains are " + + "comma delimited and optional MBean types follow a colon and are space delimited. " + + "Format: [domain1[:type1[ type2]][,domain2]])") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor + .Builder().name("PollingInterval") + .displayName("Polling Interval") + .description("Indicates how long to wait before performing a connection to the RMI Server") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("300 sec") + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor + .Builder().name("BatchSize") + .displayName("Batch Size") + .description("The maximum number of MBean records to pull in each iteration") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are created are routed to this relationship") + .build(); + + public static final String HOSTNAME_ATTRIBUTE = "hostname"; + public static final String PORT_ATTRIBUTE = "port"; + public static final String TIMESTAMP_ATTRIBUTE = "timestamp"; + + private List<PropertyDescriptor> descriptors; + + private Set<Relationship> relationships; + private final BlockingQueue<Record> queue = new LinkedBlockingQueue<>(); + + private final Set<Record> inProcess = new HashSet<>(); // guarded by queueLock + private final Set<Record> recentlyProcessed = new HashSet<>(); // guarded by queueLock + + private final Lock queueLock = new ReentrantLock(); + private final Lock listingLock = new ReentrantLock(); + private final AtomicLong queueLastUpdated = new AtomicLong(0L); + + private ListFilter whiteListFilter; + private ListFilter blackListFilter; + + private final ComponentLog logger = getLogger(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.add(HOSTNAME); + descriptors.add(PORT); + descriptors.add(WHITELIST); + descriptors.add(BLACKLIST); + descriptors.add(POLLING_INTERVAL); + descriptors.add(BATCH_SIZE); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<Relationship>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + whiteListFilter = new ListFilter( "" ); + blackListFilter = new ListFilter( "" ); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + queue.clear(); + } + + private Set<Record> performQuery( ProcessContext context) throws ProcessException { + PropertyValue hostname = context.getProperty(HOSTNAME); + PropertyValue port = context.getProperty(PORT); + PropertyValue whitelist = context.getProperty(WHITELIST); + PropertyValue blacklist = context.getProperty(BLACKLIST); + + Set<Record> metricsSet = new HashSet<Record>(); + + whiteListFilter.setListString(whitelist.getValue()); + blackListFilter.setListString( blacklist.getValue() ); --- End diff -- whitespaces > JMX Processor > ------------- > > Key: NIFI-2724 > URL: https://issues.apache.org/jira/browse/NIFI-2724 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Affects Versions: 1.0.0 > Environment: All platforms with Java RMI support for JMX > Reporter: Brian Burnett > Assignee: Andre F de Miranda > Priority: Minor > Labels: processor > Attachments: 0001-NIFI-2724-New-JMX-Processor.patch > > Original Estimate: 24h > Remaining Estimate: 24h > > The JMX Processor feature addition includes only GetJMX without > SecurityManager capabilities at this time. The processor in its current > state is capable of pulling MBean Property and Attribute values from a remote > RMI Server. Each set of Mbean data is wrapped in a JSON formatted FlowFile > for downstream processing. It has the ability to control content with > whitelist and blacklist properties. > Possible use for this processor and the reason it was created is to help make > sense of Kafka server metrics. > Will followup with a SecurityManager Context Service and PutJMX Processor. -- This message was sent by Atlassian JIRA (v6.3.15#6346)