[ 
https://issues.apache.org/jira/browse/NIFI-1868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397924#comment-15397924
 ] 

ASF GitHub Bot commented on NIFI-1868:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/706#discussion_r72672215
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
 ---
    @@ -0,0 +1,657 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.ConnectionError;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.SerializationError;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.KerberosProperties;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.HiveWriter;
    +import org.json.JSONException;
    +import org.json.JSONObject;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.regex.Pattern;
    +
    +/**
    + * This processor utilizes the Hive Streaming capability to insert data 
from the flow into a Hive database table.
    + */
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow 
file data to an Apache Hive table. The incoming flow file is expected to be in "
    +        + "Avro format and the table must exist in Hive. Please see the 
Hive documentation for requirements on the Hive table (format, partitions, 
etc.). "
    +        + "The partition values are extracted from the Avro record based 
on the names of the partition columns as specified in the processor. ")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", 
description = "The number of records from this flow file written using Hive 
Streaming.")
    +})
    +public class PutHiveStreaming extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = 
"hivestreaming.record.count";
    +
    +    // Validators
    +    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, 
value, context) -> {
    +        if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(value)) {
    +            return new 
ValidationResult.Builder().subject(subject).input(value).explanation("Expression
 Language Present").valid(true).build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            final int intVal = Integer.parseInt(value);
    +
    +            if (intVal < 2) {
    +                reason = "value is less than 2";
    +            }
    +        } catch (final NumberFormatException e) {
    +            reason = "value is not a valid integer";
    +        }
    +
    +        return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
    +    };
    +
    +    // Properties
    +    public static final PropertyDescriptor METASTORE_URI = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note 
that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)")))
 // no start with / or end with /
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = 
new PropertyDescriptor.Builder()
    +            .name("hive-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which 
contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' 
file or will revert to a default configuration. Note that to enable 
authentication "
    +                    + "with Kerberos e.g., the appropriate properties must 
be set in the configuration files. Please see the Hive documentation for more 
details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    public static final PropertyDescriptor DB_NAME = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the 
data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put 
the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PARTITION_COLUMNS = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-partition-cols")
    +            .displayName("Partition Columns")
    +            .description("A comma-delimited list of column names on which 
the table has been partitioned. The order of values in this list must "
    +                    + "correspond exactly to the order of partition 
columns specified during the table creation.")
    +            .required(false)
    +            .expressionLanguageSupported(false)
    +            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*")))
 // comma-separated list with non-empty entries
    +            .build();
    +
    +    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be 
automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-max-open-connections")
    +            .displayName("Max Open Connections")
    +            .description("The maximum number of open connections that can 
be allocated from this pool at the same time, "
    +                    + "or negative for no limit.")
    +            .defaultValue("8")
    +            .required(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor HEARTBEAT_INTERVAL = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-heartbeat-interval")
    +            .displayName("Heartbeat Interval")
    +            .description("Indicates that a heartbeat should be sent when 
the specified number of seconds has elapsed. "
    +                    + "A value of 0 indicates that no heartbeat should be 
sent.")
    +            .defaultValue("60")
    +            .required(true)
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor TXNS_PER_BATCH = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-transactions-per-batch")
    +            .displayName("Transactions per Batch")
    +            .description("A hint to Hive Streaming indicating how many 
transactions the processor task will need. This value must be greater than 1.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(GREATER_THAN_ONE_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship after 
the database is successfully updated")
    +            .build();
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("A FlowFile is routed to this relationship if the 
database cannot be updated but attempting the operation again may succeed")
    +            .build();
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile is routed to this relationship if the 
database cannot be updated and retrying the operation will also fail.")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +
    +    private static final long TICKET_RENEWAL_PERIOD = 60000;
    +
    +    protected KerberosProperties kerberosProperties;
    +
    +    protected volatile HiveConfigurator hiveConfigurator = new 
HiveConfigurator();
    +    protected volatile UserGroupInformation ugi;
    +
    +    protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
    +
    +    protected HiveOptions options;
    +    protected ExecutorService callTimeoutPool;
    +    protected transient Timer heartBeatTimer;
    +    protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    +    protected Map<HiveEndPoint, HiveWriter> allWriters;
    +
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only 
once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        propertyDescriptors = new ArrayList<>();
    +        propertyDescriptors.add(METASTORE_URI);
    +        propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
    +        propertyDescriptors.add(DB_NAME);
    +        propertyDescriptors.add(TABLE_NAME);
    +        propertyDescriptors.add(PARTITION_COLUMNS);
    +        propertyDescriptors.add(AUTOCREATE_PARTITIONS);
    +        propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
    +        propertyDescriptors.add(HEARTBEAT_INTERVAL);
    +        propertyDescriptors.add(TXNS_PER_BATCH);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected void init(ProcessorInitializationContext context) {
    +        kerberosProperties = getKerberosProperties();
    +        propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
    +        propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        ComponentLog log = getLogger();
    +
    +        final String metastoreUri = 
context.getProperty(METASTORE_URI).getValue();
    +        final String dbName = context.getProperty(DB_NAME).getValue();
    +        final String tableName = 
context.getProperty(TABLE_NAME).getValue();
    +        final boolean autoCreatePartitions = 
context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
    +        final Integer maxConnections = 
context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
    +        final Integer heartbeatInterval = 
context.getProperty(HEARTBEAT_INTERVAL).asInteger();
    +        final Integer txnsPerBatch = 
context.getProperty(TXNS_PER_BATCH).asInteger();
    +        final String configFiles = 
context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
    +        final Configuration hiveConfig = 
hiveConfigurator.getConfigurationFromFiles(configFiles);
    +
    +        // add any dynamic properties to the Hive configuration
    +        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                hiveConfig.set(descriptor.getName(), entry.getValue());
    +            }
    +        }
    +
    +        options = new HiveOptions(metastoreUri, dbName, tableName)
    +                .withTxnsPerBatch(txnsPerBatch)
    +                .withAutoCreatePartitions(autoCreatePartitions)
    +                .withMaxOpenConnections(maxConnections)
    +                .withHeartBeatInterval(heartbeatInterval);
    +
    +        if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
    +            final String principal = 
context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
    +            final String keyTab = 
context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
    +
    +            log.info("Hive Security Enabled, logging in as principal {} 
with keytab {}", new Object[]{principal, keyTab});
    +            try {
    +                ugi = hiveConfigurator.authenticate(hiveConfig, principal, 
keyTab, TICKET_RENEWAL_PERIOD, log);
    +            } catch (AuthenticationFailedException ae) {
    +                throw new ProcessException("Kerberos authentication failed 
for Hive Streaming", ae);
    +            }
    +            log.info("Successfully logged in as principal {} with keytab 
{}", new Object[]{principal, keyTab});
    +            options = 
options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
    +        }
    +
    +        allWriters = new ConcurrentHashMap<>();
    +        String timeoutName = "put-hive-streaming-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                new 
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +
    +        sendHeartBeat.set(true);
    +        heartBeatTimer = new Timer();
    +        setupHeartBeatTimer();
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final ComponentLog log = getLogger();
    +        try {
    +            final List<String> partitionColumnList;
    +            String partitionColumns = 
context.getProperty(PARTITION_COLUMNS).getValue();
    +            if (StringUtils.isEmpty(partitionColumns)) {
    +                partitionColumnList = Collections.emptyList();
    +            } else {
    +                String[] partitionCols = partitionColumns.split(",");
    +                partitionColumnList = new 
ArrayList<>(partitionCols.length);
    +                for (String col : partitionCols) {
    +                    partitionColumnList.add(col.trim());
    +                }
    +            }
    +
    +            // Store the original class loader, then explicitly set it to 
this class's classloader (for use by the Hive Metastore)
    +            ClassLoader originalClassloader = 
Thread.currentThread().getContextClassLoader();
    +            
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
    +
    +            int recordCount = 0;
    +            final List<HiveStreamingRecord> records = new LinkedList<>();
    +
    +            session.read(flowFile, in -> {
    +
    +                try (final DataFileStream<GenericRecord> reader = new 
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
    +
    +                    GenericRecord currRecord;
    +                    while (reader.hasNext()) {
    +                        currRecord = reader.next();
    --- End diff --
    
    Minor point, I think theres a call to read.next() that takes in a record 
instance to reuse, could save some object creating for a large number of records


> Add support for Hive Streaming
> ------------------------------
>
>                 Key: NIFI-1868
>                 URL: https://issues.apache.org/jira/browse/NIFI-1868
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>             Fix For: 1.0.0
>
>
> Traditionally adding new data into Hive requires gathering a large amount of 
> data onto HDFS and then periodically adding a new partition. This is 
> essentially a “batch insertion”. Insertion of new data into an existing 
> partition is not permitted. Hive Streaming API allows data to be pumped 
> continuously into Hive. The incoming data can be continuously committed in 
> small batches of records into an existing Hive partition or table. Once data 
> is committed it becomes immediately visible to all Hive queries initiated 
> subsequently.
> This case is to add a PutHiveStreaming processor to NiFi, to leverage the 
> Hive Streaming API to allow continuous streaming of data into a Hive 
> partition/table.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to