[ 
https://issues.apache.org/jira/browse/NIFI-2724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15610332#comment-15610332
 ] 

ASF GitHub Bot commented on NIFI-2724:
--------------------------------------

Github user brianburnett commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1016#discussion_r85254162
  
    --- Diff: 
nifi-nar-bundles/nifi-jmx-bundle/nifi-jmx-processors/src/main/java/org/apache/nifi/processors/jmx/GetJMX.java
 ---
    @@ -0,0 +1,385 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.jmx;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import javax.json.Json;
    +import javax.json.JsonBuilderFactory;
    +import javax.management.MBeanAttributeInfo;
    +import javax.management.MBeanInfo;
    +import javax.management.MBeanServerConnection;
    +import javax.management.ObjectName;
    +import javax.management.remote.JMXConnector;
    +import javax.management.remote.JMXConnectorFactory;
    +import javax.management.remote.JMXServiceURL;
    +import javax.management.ReflectionException;
    +import javax.management.InstanceNotFoundException;
    +import javax.management.IntrospectionException;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.Map;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.Hashtable;
    +import java.util.ListIterator;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.Lock;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +@Tags({"JMX"})
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@SeeAlso({})
    +@CapabilityDescription(
    +        "Connects to the JMX RMI Url on the configured hostname and port.  
"
    +                + "All domains are queried and can be filtered by 
providing the full domain name "
    +                + "and optional MBean type as whitelist or blacklist 
parameters.\n\n"
    +                + "Blacklist example to exclude all types that start with 
'Memory' and GarbageCollector from the "
    +                + "java.lang domain and everything from the 
java.util.logging domain:"
    +                + "\n\njava.lang:Memory.* 
GarbageCollector,java.util.logging")
    +public class GetJMX extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor HOSTNAME = new 
PropertyDescriptor
    +            .Builder().name("Hostname")
    +            .description("The JMX Hostname or IP address")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PORT = new PropertyDescriptor
    +            .Builder().name("Port")
    +            .description("The JMX Port")
    +            .required(true)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WHITELIST = new 
PropertyDescriptor
    +            .Builder().name("DomainWhiteList")
    +            .description("Include only these MBean domain(s) and type(s).  
Domains are comma delimited "
    +                    + "and optional MBean types follow a colon and are 
space delimited.   "
    +                    + "Format: [domain1[:type1[ type2]][,domain2]]")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .required(false)
    +            .build();
    +
    +    public static final PropertyDescriptor BLACKLIST = new 
PropertyDescriptor
    +            .Builder().name("DomainBlackList")
    +            .description("Include everything excluding these MBean 
domain(s) and type(s).   Domains are "
    +                    + "comma delimited and optional MBean types follow a 
colon and are space delimited.   "
    +                    + "Format: [domain1[:type1[ type2]][,domain2]])")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .required(false)
    +            .build();
    +
    +    public static final PropertyDescriptor POLLING_INTERVAL = new 
PropertyDescriptor.Builder()
    +            .name("Polling Interval")
    +            .description("Indicates how long to wait before performing a 
directory listing")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("300 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .description("The maximum number of files to pull in each 
iteration")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("All FlowFiles that are created are routed to 
this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that encounter errors are routed to 
this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> descriptors;
    +
    +    private Set<Relationship> relationships;
    +    private final BlockingQueue<Record> queue = new 
LinkedBlockingQueue<>();
    +
    +    private final Set<Record> inProcess = new HashSet<>();    // guarded 
by queueLock
    +    private final Set<Record> recentlyProcessed = new HashSet<>();    // 
guarded by queueLock
    +
    +    private final Lock queueLock = new ReentrantLock();
    +    private final Lock listingLock = new ReentrantLock();
    +    private final AtomicLong queueLastUpdated = new AtomicLong(0L);
    +
    +    private final JsonBuilderFactory jsonBuilderFactory = 
Json.createBuilderFactory(null);
    +
    +    private ListFilter whiteListFilter;
    +    private ListFilter blackListFilter;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
    +        descriptors.add(HOSTNAME);
    +        descriptors.add(PORT);
    +        descriptors.add(WHITELIST);
    +        descriptors.add(BLACKLIST);
    +        descriptors.add(POLLING_INTERVAL);
    +        descriptors.add(BATCH_SIZE);
    +        //descriptors.add(SSL_CONTEXT_SERVICE);
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new 
HashSet<Relationship>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +        whiteListFilter = new ListFilter( "" );
    +        blackListFilter = new ListFilter( "" );
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return this.relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        queue.clear();
    +    }
    +
    +    private Set<Record> performQuery( ProcessContext context) throws 
ProcessException {
    +        PropertyValue hostname = context.getProperty(HOSTNAME);
    +        PropertyValue port = context.getProperty(PORT);
    +        PropertyValue whitelist = context.getProperty(WHITELIST);
    +        PropertyValue blacklist = context.getProperty(BLACKLIST);
    +
    +        Set<Record> metricsSet = new HashSet<Record>();
    +
    +        whiteListFilter.setListString(whitelist.getValue());
    +        blackListFilter.setListString( blacklist.getValue() );
    +
    +        try {
    +            HashMap<String,Object> env = new HashMap<String,Object>();
    +
    +            StringBuilder urlStr = new 
StringBuilder("service:jmx:rmi:///jndi/rmi://");
    +            urlStr.append(hostname).append(":");
    +            urlStr.append(port);
    +            urlStr.append("/jmxrmi");
    +
    +            JMXServiceURL url = new JMXServiceURL(urlStr.toString());
    +            JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
    +
    +            MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
    +
    +            Set<ObjectName> mbeans = mbsc.queryNames(null, null);
    +            for (ObjectName mbean : mbeans) {
    +                String domain = mbean.getDomain();
    +
    +                Record record = new Record(jsonBuilderFactory);
    +                Hashtable<String, String> mbeanProperties = 
mbean.getKeyPropertyList();
    +
    +                if (mbeanProperties.containsKey("type") && 
canProcess(domain, mbeanProperties.get("type"))) {
    +                    record.setDomain( domain );
    +                    record.setProperties(mbeanProperties.entrySet());
    +
    +                    MBeanInfo mbeanInfo = mbsc.getMBeanInfo(mbean);
    +                    MBeanAttributeInfo[] mbeanAttributeInfoAr = 
mbeanInfo.getAttributes();
    +
    +                    boolean validRecord = true;
    +
    +                    Map<String, String> mbeanStringMap = new 
HashMap<String, String>();
    +
    +                    for (MBeanAttributeInfo mbeanAttributeInfo : 
mbeanAttributeInfoAr) {
    +                        try {
    +                            Object value = mbsc.getAttribute(mbean, 
mbeanAttributeInfo.getName());
    +
    +                            mbeanStringMap.put( 
mbeanAttributeInfo.getName(), value.toString() );
    +                        } catch (Exception e) {
    +                            // IF ERROR DO NOT ADD to metricsSet
    +                            validRecord = false;
    +                            getLogger().warn("Exception fetching MBean 
attribute: " + e.getMessage() +
    +                                    ": details: [" + 
mbean.getCanonicalName() + "]");
    +                        }
    +                    }
    +
    +                    if (validRecord) {
    +                        record.setAttributes(mbeanStringMap);
    +                        metricsSet.add(record);
    +                    }
    +                } else {
    +                    getLogger().info("SKIPPING: domain [" + domain + "] 
type [" + mbeanProperties.get("type") + "]");
    +                }
    +            }
    +
    +            jmxc.close();
    +        } catch( ProcessException pe ) {
    +            throw pe;
    +        } catch( IOException ioe) {
    +            getLogger().error( "Exception connecting to JMX RMI Listener: 
" + ioe.getMessage() + ": hostname [" +
    +                    hostname + "] port [" + port + "]");
    +        } catch( SecurityException se ) {
    +          getLogger().error( "Exception connecting to JMX RMI Listener due 
to security issues: " +
    +                    se.getMessage() + ": hostname [" + hostname + "] port 
[" + port + "]" );
    +        } catch( 
InstanceNotFoundException|IntrospectionException|ReflectionException e ) {
    +            getLogger().error( "Exception with MBean Server: " + 
e.getMessage() );
    +        } catch( Exception e ) {
    +            getLogger().error( "Exception performing MBean Query: " + 
e.getMessage() );
    +        }
    +
    +        return metricsSet;
    +    }
    +
    +
    +
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +
    +        if (queue.size() < 100) {
    --- End diff --
    
    I modeled after the GetFile processor which still appears to use a hard 
coded value.  I can use the property value here.


> JMX Processor
> -------------
>
>                 Key: NIFI-2724
>                 URL: https://issues.apache.org/jira/browse/NIFI-2724
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.0.0
>         Environment: All platforms with Java RMI support for JMX
>            Reporter: Brian Burnett
>            Assignee: Joseph Witt
>            Priority: Minor
>              Labels: processor
>             Fix For: 1.1.0
>
>         Attachments: 0001-NIFI-2724-New-JMX-Processor.patch
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The JMX Processor feature addition includes only GetJMX without 
> SecurityManager capabilities at this time.  The processor in its current 
> state is capable of pulling MBean Property and Attribute values from a remote 
> RMI Server.  Each set of Mbean data is wrapped in a JSON formatted FlowFile 
> for downstream processing.  It has the ability to control content with 
> whitelist and blacklist properties.
> Possible use for this processor and the reason it was created is to help make 
> sense of Kafka server metrics.
> Will followup with a SecurityManager Context Service and PutJMX Processor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to