[ https://issues.apache.org/jira/browse/NIFI-1868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397924#comment-15397924 ]
ASF GitHub Bot commented on NIFI-1868: -------------------------------------- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/706#discussion_r72672215 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java --- @@ -0,0 +1,657 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.hcatalog.streaming.ConnectionError; +import org.apache.hive.hcatalog.streaming.HiveEndPoint; +import org.apache.hive.hcatalog.streaming.SerializationError; +import org.apache.hive.hcatalog.streaming.StreamingException; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.hive.AuthenticationFailedException; +import org.apache.nifi.util.hive.HiveConfigurator; +import org.apache.nifi.util.hive.HiveOptions; +import org.apache.nifi.util.hive.HiveUtils; +import org.apache.nifi.util.hive.HiveWriter; +import org.json.JSONException; +import org.json.JSONObject; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; + +/** + * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table. + */ +@Tags({"hive", "streaming", "put", "database", "store"}) +@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in " + + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). " + + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. ") +@WritesAttributes({ + @WritesAttribute(attribute = "hivestreaming.record.count", description = "The number of records from this flow file written using Hive Streaming.") +}) +public class PutHiveStreaming extends AbstractProcessor { + + // Attributes + public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count"; + + // Validators + private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); + } + + String reason = null; + try { + final int intVal = Integer.parseInt(value); + + if (intVal < 2) { + reason = "value is less than 2"; + } + } catch (final NumberFormatException e) { + reason = "value is not a valid integer"; + } + + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + }; + + // Properties + public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder() + .name("hive-stream-metastore-uri") + .displayName("Hive Metastore URI") + .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the " + + "Hive metastore is 9043.") + .required(true) + .addValidator(StandardValidators.URI_VALIDATOR) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with / + .build(); + + public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() + .name("hive-config-resources") + .displayName("Hive Configuration Resources") + .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop " + + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication " + + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.") + .required(false) + .addValidator(HiveUtils.createMultipleFilesExistValidator()) + .build(); + + public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder() + .name("hive-stream-database-name") + .displayName("Database Name") + .description("The name of the database in which to put the data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("hive-stream-table-name") + .displayName("Table Name") + .description("The name of the database table in which to put the data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PARTITION_COLUMNS = new PropertyDescriptor.Builder() + .name("hive-stream-partition-cols") + .displayName("Partition Columns") + .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must " + + "correspond exactly to the order of partition columns specified during the table creation.") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries + .build(); + + public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder() + .name("hive-stream-autocreate-partition") + .displayName("Auto-Create Partitions") + .description("Flag indicating whether partitions should be automatically created") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder() + .name("hive-stream-max-open-connections") + .displayName("Max Open Connections") + .description("The maximum number of open connections that can be allocated from this pool at the same time, " + + "or negative for no limit.") + .defaultValue("8") + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder() + .name("hive-stream-heartbeat-interval") + .displayName("Heartbeat Interval") + .description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. " + + "A value of 0 indicates that no heartbeat should be sent.") + .defaultValue("60") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor TXNS_PER_BATCH = new PropertyDescriptor.Builder() + .name("hive-stream-transactions-per-batch") + .displayName("Transactions per Batch") + .description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(GREATER_THAN_ONE_VALIDATOR) + .defaultValue("100") + .build(); + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after the database is successfully updated") + .build(); + public static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail.") + .build(); + + private final static List<PropertyDescriptor> propertyDescriptors; + private final static Set<Relationship> relationships; + + private static final long TICKET_RENEWAL_PERIOD = 60000; + + protected KerberosProperties kerberosProperties; + + protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator(); + protected volatile UserGroupInformation ugi; + + protected final AtomicBoolean isInitialized = new AtomicBoolean(false); + + protected HiveOptions options; + protected ExecutorService callTimeoutPool; + protected transient Timer heartBeatTimer; + protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false); + protected Map<HiveEndPoint, HiveWriter> allWriters; + + + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + propertyDescriptors = new ArrayList<>(); + propertyDescriptors.add(METASTORE_URI); + propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES); + propertyDescriptors.add(DB_NAME); + propertyDescriptors.add(TABLE_NAME); + propertyDescriptors.add(PARTITION_COLUMNS); + propertyDescriptors.add(AUTOCREATE_PARTITIONS); + propertyDescriptors.add(MAX_OPEN_CONNECTIONS); + propertyDescriptors.add(HEARTBEAT_INTERVAL); + propertyDescriptors.add(TXNS_PER_BATCH); + + Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + _relationships.add(REL_RETRY); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + protected void init(ProcessorInitializationContext context) { + kerberosProperties = getKerberosProperties(); + propertyDescriptors.add(kerberosProperties.getKerberosPrincipal()); + propertyDescriptors.add(kerberosProperties.getKerberosKeytab()); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + + @OnScheduled + public void setup(final ProcessContext context) { + ComponentLog log = getLogger(); + + final String metastoreUri = context.getProperty(METASTORE_URI).getValue(); + final String dbName = context.getProperty(DB_NAME).getValue(); + final String tableName = context.getProperty(TABLE_NAME).getValue(); + final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean(); + final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger(); + final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger(); + final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger(); + final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue(); + final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles); + + // add any dynamic properties to the Hive configuration + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + hiveConfig.set(descriptor.getName(), entry.getValue()); + } + } + + options = new HiveOptions(metastoreUri, dbName, tableName) + .withTxnsPerBatch(txnsPerBatch) + .withAutoCreatePartitions(autoCreatePartitions) + .withMaxOpenConnections(maxConnections) + .withHeartBeatInterval(heartbeatInterval); + + if (SecurityUtil.isSecurityEnabled(hiveConfig)) { + final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + + log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab}); + try { + ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab, TICKET_RENEWAL_PERIOD, log); + } catch (AuthenticationFailedException ae) { + throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae); + } + log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab}); + options = options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab); + } + + allWriters = new ConcurrentHashMap<>(); + String timeoutName = "put-hive-streaming-%d"; + this.callTimeoutPool = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); + + sendHeartBeat.set(true); + heartBeatTimer = new Timer(); + setupHeartBeatTimer(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ComponentLog log = getLogger(); + try { + final List<String> partitionColumnList; + String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue(); + if (StringUtils.isEmpty(partitionColumns)) { + partitionColumnList = Collections.emptyList(); + } else { + String[] partitionCols = partitionColumns.split(","); + partitionColumnList = new ArrayList<>(partitionCols.length); + for (String col : partitionCols) { + partitionColumnList.add(col.trim()); + } + } + + // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore) + ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); + + int recordCount = 0; + final List<HiveStreamingRecord> records = new LinkedList<>(); + + session.read(flowFile, in -> { + + try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) { + + GenericRecord currRecord; + while (reader.hasNext()) { + currRecord = reader.next(); --- End diff -- Minor point, I think theres a call to read.next() that takes in a record instance to reuse, could save some object creating for a large number of records > Add support for Hive Streaming > ------------------------------ > > Key: NIFI-1868 > URL: https://issues.apache.org/jira/browse/NIFI-1868 > Project: Apache NiFi > Issue Type: New Feature > Reporter: Matt Burgess > Assignee: Matt Burgess > Fix For: 1.0.0 > > > Traditionally adding new data into Hive requires gathering a large amount of > data onto HDFS and then periodically adding a new partition. This is > essentially a “batch insertion”. Insertion of new data into an existing > partition is not permitted. Hive Streaming API allows data to be pumped > continuously into Hive. The incoming data can be continuously committed in > small batches of records into an existing Hive partition or table. Once data > is committed it becomes immediately visible to all Hive queries initiated > subsequently. > This case is to add a PutHiveStreaming processor to NiFi, to leverage the > Hive Streaming API to allow continuous streaming of data into a Hive > partition/table. -- This message was sent by Atlassian JIRA (v6.3.4#6332)