Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/502#discussion_r66514855 --- Diff: nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.ignite.cache; + +import java.io.IOException; +import java.io.InputStream; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; + +/** + * Put cache processors which pushes the flow file content into Ignite Cache using + * DataStreamer interface + */ +@EventDriven +@SupportsBatching +@Tags({ "Ignite", "insert", "update", "stream", "write", "put", "cache", "key" }) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Stream the contents of a FlowFile to Ignite Cache using DataStreamer. " + + "The processor uses the value of FlowFile attribute " + PutIgniteCache.IGNITE_CACHE_ENTRY_KEY + " as the " + + "cache key and the byte array of the FlowFile as the value of the cache entry value. Both the string key and a " + + " non-empty byte array value are required otherwise the FlowFile is transfered to the failure relation.") +@WritesAttributes({ + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, description = "The total number of FlowFile in the batch"), + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, description = "The item number of FlowFile in the batch"), + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, description = "The successful FlowFile item number"), + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, description = "The number of successful FlowFiles"), + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, description = "The failed FlowFile item number"), + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, description = "The total number of failed FlowFiles in the batch"), + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, description = "The failed reason attribute key") + }) +@ReadsAttributes({ + @ReadsAttribute(attribute = PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, description = "Ignite cache key"), + }) +public class PutIgniteCache extends AbstractIgniteCacheProcessor { + + /** + * The batch size of flow files to be processed on invocation of onTrigger + */ + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size For Entries") + .description("Batch size for entries (1-500).") + .defaultValue("250") + .required(false) + .addValidator(StandardValidators.createLongValidator(1, 500, true)) + .sensitive(false) + .build(); + + /** + * Data streamer's per node parallelism + */ + public static final PropertyDescriptor DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS = new PropertyDescriptor.Builder() + .name("Data Streamer Per Node Parallel Operations") + .description("Data streamer per node parallelism") + .defaultValue("5") + .required(false) + .addValidator(StandardValidators.createLongValidator(1, 10, true)) + .sensitive(false) + .build(); + + /** + * Data streamers per node buffer size + */ + public static final PropertyDescriptor DATA_STREAMER_PER_NODE_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Data Streamer Per Node Buffer Size") + .description("Data streamer per node buffer size (1-500).") + .defaultValue("250") + .required(false) + .addValidator(StandardValidators.createLongValidator(1, 500, true)) + .sensitive(false) + .build(); + + /** + * Data streamers auto flush frequency + */ + public static final PropertyDescriptor DATA_STREAMER_AUTO_FLUSH_FREQUENCY = new PropertyDescriptor.Builder() + .name("Data Streamer Auto Flush Frequency in millis") + .description("Data streamer flush interval in millis") + .defaultValue("10") + .required(false) + .addValidator(StandardValidators.createLongValidator(1, 100, true)) + .sensitive(false) + .build(); + + /** + * Data streamers override values property + */ + public static final PropertyDescriptor DATA_STREAMER_ALLOW_OVERRIDE = new PropertyDescriptor.Builder() + .name("Data Streamer Allow Override") + .description("Whether to override values already in the cache") + .defaultValue("false") + .allowableValues(new AllowableValue("true"), new AllowableValue("false")) + .sensitive(false) + .build(); + + /** Flow file attribute keys and messages */ + public static final String IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT = "ignite.cache.batch.flow.file.total.count"; + public static final String IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER = "ignite.cache.batch.flow.file.item.number"; + public static final String IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT = "ignite.cache.batch.flow.file.successful.count"; + public static final String IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER = "ignite.cache.batch.flow.file.successful.number"; + public static final String IGNITE_BATCH_FLOW_FILE_FAILED_COUNT = "ignite.cache.batch.flow.file.failed.count"; + public static final String IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER = "ignite.cache.batch.flow.file.failed.number"; + public static final String IGNITE_BATCH_FLOW_FILE_FAILED_FILE_SIZE = "ignite.cache.batch.flow.file.failed.size"; + public static final String IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY = "ignite.cache.batch.flow.file.failed.reason"; + public static final String IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE = "The FlowFile key attribute was missing"; + public static final String IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE = "The FlowFile size was zero"; + + static { + descriptors = new ArrayList<>(); + descriptors.add(IGNITE_CONFIGURATION_FILE); + descriptors.add(CACHE_NAME); + descriptors.add(BATCH_SIZE); + descriptors.add(DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS); + descriptors.add(DATA_STREAMER_PER_NODE_BUFFER_SIZE); + descriptors.add(DATA_STREAMER_AUTO_FLUSH_FREQUENCY); + descriptors.add(DATA_STREAMER_ALLOW_OVERRIDE); + } + + /** + * Data streamer instance + */ + private transient IgniteDataStreamer<String, byte[]> igniteDataStreamer; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + /** + * Close data streamer and calls base classes close ignite cache + */ + @OnStopped + public final void closeIgniteDataStreamer() { + if (igniteDataStreamer != null) { + getLogger().info("Closing ignite data streamer"); + igniteDataStreamer.flush(); + igniteDataStreamer.close(); + igniteDataStreamer = null; + } + super.closeIgniteCache(); + } + + /** + * Get data streamer + * @return data streamer instance + */ + protected IgniteDataStreamer<String, byte[]> getIgniteDataStreamer() { + return igniteDataStreamer; + } + + /** + * Initialize ignite cache + */ + @OnScheduled + public final void initilizeIgniteDataStreamer(ProcessContext context) throws ProcessException { + super.initializeIgniteCache(context); + + if ( getIgniteDataStreamer() != null ) { + return; + } + + getLogger().info("Creating Ignite Datastreamer"); + try { + int perNodeParallelOperations = context.getProperty(DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS).asInteger(); + int perNodeBufferSize = context.getProperty(DATA_STREAMER_PER_NODE_BUFFER_SIZE).asInteger(); + int autoFlushFrequency = context.getProperty(DATA_STREAMER_AUTO_FLUSH_FREQUENCY).asInteger(); + boolean allowOverride = context.getProperty(DATA_STREAMER_ALLOW_OVERRIDE).asBoolean(); + + igniteDataStreamer = getIgnite().dataStreamer(getIgniteCache().getName()); + igniteDataStreamer.perNodeBufferSize(perNodeBufferSize); + igniteDataStreamer.perNodeParallelOperations(perNodeParallelOperations); + igniteDataStreamer.autoFlushFrequency(autoFlushFrequency); + igniteDataStreamer.allowOverwrite(allowOverride); + + } catch (Exception e) { + getLogger().error("Failed to schedule PutIgnite due to {}", new Object[] { e }, e); + throw new ProcessException(e); + } + } + + /** + * Handle flow files + */ + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final List<FlowFile> flowFiles = session.get(batchSize); + + if (flowFiles.isEmpty()) { + return; + } + +// Map<String, byte[]> cacheMap = new java.util.HashMap<>(); + List<Map.Entry<String, byte[]>> cacheItems = new ArrayList<>(); + List<FlowFile> successfulFlowFiles = new ArrayList<>(); + List<FlowFile> failedFlowFiles = new ArrayList<>(); + try { + for (int i = 0; i < flowFiles.size(); i++) { + FlowFile flowFile = null; + try { + flowFile = flowFiles.get(i); + + String key = flowFile.getAttribute(IGNITE_CACHE_ENTRY_KEY); + + if ( isFailedFlowFile(flowFile) ) { + failedFlowFiles.add(flowFile); + continue; + } + + final byte[] byteArray = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, byteArray, true); + } + }); + + cacheItems.add(new AbstractMap.SimpleEntry<String,byte[]>(key, byteArray)); + successfulFlowFiles.add(flowFile); + + } catch (Exception e) { + getLogger().error("Failed to insert {} into IgniteDB due to {}", new Object[] { flowFile, e }, e); + session.transfer(flowFile, REL_FAILURE); + } + } + } finally { + if (!cacheItems.isEmpty()) { + IgniteFuture<?> futures = igniteDataStreamer.addData(cacheItems); + Object result = futures.get(); + getLogger().debug("Result {} of addData", new Object [] {result}); + } + + if (!successfulFlowFiles.isEmpty()) { + successfulFlowFiles = updateSuccessfulFlowFileAttributes(flowFiles, successfulFlowFiles, session); + session.transfer(successfulFlowFiles, REL_SUCCESS); --- End diff -- What about provenance events?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---