Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2294#discussion_r167307609 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.List; + +@WritesAttribute( attribute = "restart.index", description = "If a delete batch fails, it will restart from restart.index" ) +@Tags({ "delete", "hbase" }) +@CapabilityDescription( + "Delete HBase records individually or in batches. The input can be a single row ID in the body, one ID per line, " + + "row IDs separated by commas or a combination of the two. ") +public class DeleteHBaseRow extends AbstractDeleteHBase { + static final AllowableValue ROW_ID_BODY = new AllowableValue("body", "FlowFile content", "Get the row key(s) from the flowfile content."); + static final AllowableValue ROW_ID_ATTR = new AllowableValue("attr", "FlowFile attributes", "Get the row key from an expression language statement."); + + static final PropertyDescriptor ROW_ID_LOCATION = new PropertyDescriptor.Builder() + .name("delete-hb-row-id-location") + .displayName("Row ID Location") + .description("The location of the row ID to use for building the delete. Can be from the content or an expression language statement.") + .required(true) + .defaultValue(ROW_ID_BODY.getValue()) + .allowableValues(ROW_ID_BODY, ROW_ID_ATTR) + .addValidator(Validator.VALID) + .build(); + + static final PropertyDescriptor FLOWFILE_FETCH_COUNT = new PropertyDescriptor.Builder() + .name("delete-hb-flowfile-fetch-count") + .displayName("Flowfile Fetch Count") + .description("The number of flowfiles to fetch per run.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("5") + .expressionLanguageSupported(false) + .build(); + + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("delete-hb-row-ff-count") + .displayName("Batch Size") + .description("The number of deletes to send per batch.") + .required(true) + .defaultValue("50") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + static final PropertyDescriptor KEY_SEPARATOR = new PropertyDescriptor.Builder() + .name("delete-hb-separator") + .displayName("Delete Row Key Separator") + .description("The separator character(s) that separate multiple row keys " + + "when multiple row keys are provided in the flowfile body") + .required(true) + .defaultValue(",") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("The character set used to encode the row key for HBase.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = super.getSupportedPropertyDescriptors(); + properties.add(ROW_ID_LOCATION); + properties.add(FLOWFILE_FETCH_COUNT); + properties.add(BATCH_SIZE); + properties.add(KEY_SEPARATOR); + properties.add(CHARSET); + + return properties; + } + + @Override + protected void doDelete(ProcessContext context, ProcessSession session) throws Exception { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final String location = context.getProperty(ROW_ID_LOCATION).getValue(); + final int flowFileCount = context.getProperty(FLOWFILE_FETCH_COUNT).asInteger(); + final String charset = context.getProperty(CHARSET).getValue(); + List<FlowFile> flowFiles = session.get(flowFileCount); + + if (flowFiles != null && flowFiles.size() > 0) { + for (int index = 0; index < flowFiles.size(); index++) { + FlowFile flowFile = flowFiles.get(index); + try { + if (location.equals(ROW_ID_BODY.getValue())) { + flowFile = doDeleteFromBody(flowFile, context, session, batchSize, charset); + Relationship rel = flowFile.getAttribute("restart.index") != null ? REL_FAILURE : REL_SUCCESS; + session.transfer(flowFile, rel); + } else { + doDeleteFromAttribute(flowFile, context, charset); + session.transfer(flowFile, REL_SUCCESS); + } --- End diff -- Done
---