n my custom processor i need to update config file(is placed in one of nifi folder not original config it is created by me :D ) based on data which i get from upstream connection but i can't get nor error neither desired result what should i do,
1.is there any way i can controll upstream connection flowfile destination i mean when i make debugging i saw that fileQueue.drainTo(file, batchSize); in this part file was null 2.here is one more thing i am interested in, on this line session.commit() i always get errors and whole operation is failed ,should i clean fileQueue and then make session.commit()? FlowFile flowfile; @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); final ArrayList value = new ArrayList<>(); flowfile = session.get(); if (flowfile == null) { return; } value.add(flowfile.getAttribute("filename")); session.remove(flowfile); final File directory = new File(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()); final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); final ComponentLog logger = getLogger(); if (fileQueue.size() < 100) { final long pollingMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS); if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingMillis) && listingLock.tryLock()) { try { final Set<File> listing = performListing(directory, fileFilterRef.get(), context.getProperty(RECURSE).asBoolean().booleanValue()); queueLock.lock(); try { listing.removeAll(inProcess); if (!keepingSourceFile) { listing.removeAll(recentlyProcessed); } fileQueue.clear(); fileQueue.addAll(listing); queueLastUpdated.set(System.currentTimeMillis()); recentlyProcessed.clear(); if (listing.isEmpty()) { context.yield(); } } finally { queueLock.unlock(); } } finally { listingLock.unlock(); } } } final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final List<File> file = new ArrayList<>(batchSize); queueLock.lock(); try { fileQueue.drainTo(file, batchSize); if (file.isEmpty()) { return; } else { inProcess.addAll(file); } } finally { queueLock.unlock(); } //make xml parsing DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); try { dBuilder = dbFactory.newDocumentBuilder(); } catch (ParserConfigurationException e) { e.printStackTrace(); } try { File f = file.get(0); doc = dBuilder.parse(f); } catch (IOException e) { e.printStackTrace(); } catch (org.xml.sax.SAXException e) { e.printStackTrace(); } NodeList nList = doc.getElementsByTagName("localAttributes"); for (int temp = 0; temp < nList.getLength(); temp++) { Node nNode = nList.item(temp); if (nNode.getNodeType() == Node.ELEMENT_NODE) { Element eElement = (Element) nNode; start = eElement.getElementsByTagName("start").item(0).getTextContent(); startDate = eElement.getElementsByTagName("startDate").item(0).getTextContent(); endDate = eElement.getElementsByTagName("endDate").item(0).getTextContent(); patch = eElement.getElementsByTagName("patch").item(0).getTextContent(); runAs = eElement.getElementsByTagName("runAs").item(0).getTextContent(); makeVersion = eElement.getElementsByTagName("makeVersion").item(0).getTextContent(); ///parameter = eElement.getElementsByTagName("parameter").item(0).getTextContent(); } } final ListIterator<File> itr = file.listIterator(); FlowFile flowFile1 = null; try { final Path directoryPath = directory.toPath(); while (itr.hasNext()) { final File files = itr.next(); final Path filePath = files.toPath(); final Path relativePath = directoryPath.relativize(filePath.getParent()); String relativePathString = relativePath.toString() + "/"; if (relativePathString.isEmpty()) { relativePathString = "./"; } final Path absPath = filePath.toAbsolutePath(); final String absPathString = absPath.getParent().toString() + "/"; flowFile1 = session.create(); final long importStart = System.nanoTime(); flowFile1 = session.importFrom(filePath, keepingSourceFile, flowFile1); final long importNanos = System.nanoTime() - importStart; final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS); flowFile1 = session.putAttribute(flowFile1, CoreAttributes.FILENAME.key(), files.getName()); flowFile1 = session.putAttribute(flowFile1, CoreAttributes.PATH.key(), relativePathString); flowFile1 = session.putAttribute(flowFile1, CoreAttributes.ABSOLUTE_PATH.key(), absPathString); Map<String, String> attributes = getAttributesFromFile(filePath); if (attributes.size() > 0) { flowFile1 = session.putAllAttributes(flowFile1, attributes); } InputStream ffStream = session.read(flowFile1); DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder builder = builderFactory.newDocumentBuilder(); Document xmlDocument = builder.parse(ffStream); XPath xPath = XPathFactory.newInstance().newXPath(); XPathExpression myNodeList = (XPathExpression) xPath.compile("/localAttributes"); Node nodeGettingChanged = (Node) myNodeList.evaluate(xmlDocument, XPathConstants.NODE); NodeList childNodes = nodeGettingChanged.getChildNodes(); boolean make=false; for (int i = 0; i != childNodes.getLength(); ++i) { Node child = childNodes.item(i); if (!(child instanceof Element)) continue; if(child.getNodeName().equals("start")){ String date; for(int j=0;j<value.size();j++) { if(value.get(j).length()>10){ date=value.get(j).substring(0,10); } else{ date=value.get(j); } if (date == child.getFirstChild().getTextContent()){ child.getFirstChild().setNodeValue(addOneDay(child.getFirstChild().getTextContent())); make=true; } } } if(make){ if(child.getNodeName().equals("runAs")){ child.getFirstChild().setNodeValue("true"); } } } TransformerFactory transformerFactory = TransformerFactory.newInstance(); Transformer transformer = null; transformer = transformerFactory.newTransformer(); DOMSource source = new DOMSource(xmlDocument); String path = "C:/Users/user/Desktop/nifi-1.3.0/nifi-assembly/target/nifi-1.3.0-bin/nifi-1.3.0/1/conf.xml"; File f = new File(path); StreamResult file1 = new StreamResult(f); try { transformer.transform(source, file1); } catch (TransformerException e) { e.printStackTrace(); } session.write(flowFile1, new StreamCallback() { @Override public void process(InputStream inputStream, OutputStream outputStream) throws IOException { TransformerFactory transformerFactory = TransformerFactory.newInstance(); Transformer transformer = null; try { transformer = transformerFactory.newTransformer(); } catch (TransformerConfigurationException e) { e.printStackTrace(); } DOMSource source = new DOMSource(xmlDocument); ffStream.close(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); StreamResult result = new StreamResult(bos); try { transformer.transform(source, result); } catch (TransformerException e) { e.printStackTrace(); } byte[] array = bos.toByteArray(); outputStream.write(array); } }); Path tempDotCopyFile = null; try { final Path rootDirPath = Paths.get("C://Users//s.tkhilaishvili//Desktop//try2//nifi-1.3.0//1"); final Path tempCopyFile = rootDirPath.resolve("." + flowFile1.getAttribute(CoreAttributes.FILENAME.key())); final Path copyFile = rootDirPath.resolve(flowFile1.getAttribute(CoreAttributes.FILENAME.key())); if (!Files.exists(rootDirPath)) { if (context.getProperty(CREATE_DIRS).asBoolean()) { Files.createDirectories(rootDirPath); } else { flowFile1 = session.penalize(flowFile1); session.transfer(flowFile1,REL_FAILURE); logger.error("Penalizing {} and routing to 'failure' because the output directory {} does not exist and Processor is " + "configured not to create missing directories", new Object[]{flowFile1, rootDirPath}); return; } } final Path dotCopyFile = tempCopyFile; tempDotCopyFile = dotCopyFile; Path finalCopyFile = copyFile; final Path finalCopyFileDir = finalCopyFile.getParent(); if (Files.exists(finalCopyFileDir)) { // check if too many files already final int numFiles = finalCopyFileDir.toFile().list().length; if (numFiles >= 34) { flowFile1 = session.penalize(flowFile1); logger.warn("Penalizing {} and routing to 'failure' because the output directory {} has {} files, which exceeds the " + "configured maximum number of files", new Object[]{flowFile1, finalCopyFileDir, numFiles}); session.transfer(flowFile1,REL_FAILURE); return; } } if (Files.exists(finalCopyFile)) { switch (conflictResponse) { case REPLACE_RESOLUTION: Files.delete(finalCopyFile); logger.info("Deleted {} as configured in order to replace with the contents of {}", new Object[]{finalCopyFile, flowFile1}); break; case IGNORE_RESOLUTION: session.transfer(flowFile1, REL_SUCCESS); logger.info("Transferring {} to success because file with same name already exists", new Object[]{flowFile1}); return; case FAIL_RESOLUTION: flowFile1 = session.penalize(flowFile1); logger.warn("Penalizing {} and routing to failure as configured because file with the same name already exists", new Object[]{flowFile1}); session.transfer(flowFile1,REL_FAILURE); return; default: break; } } session.exportTo(flowFile1, dotCopyFile, false); final String permissions = "-rwxrwx---"; if (permissions != null && !permissions.trim().isEmpty()) { try { String perms = stringPermissions(permissions); if (!perms.isEmpty()) { Files.setPosixFilePermissions(dotCopyFile, PosixFilePermissions.fromString(perms)); } } catch (Exception e) { logger.warn("Could not set file permissions to {} because {}", new Object[]{permissions, e}); } } boolean renamed = false; for (int i = 0; i < 10; i++) { // try rename up to 10 times. if (dotCopyFile.toFile().renameTo(finalCopyFile.toFile())) { renamed = true; break;// rename was successful } Thread.sleep(100L);// try waiting a few ms to let whatever might cause rename failure to resolve } if (!renamed) { if (Files.exists(dotCopyFile) && dotCopyFile.toFile().delete()) { logger.debug("Deleted dot copy file {}", new Object[]{dotCopyFile}); } throw new ProcessException("Could not rename: " + dotCopyFile); } else { logger.info("Produced copy of {} at location {}", new Object[]{flowFile1, finalCopyFile}); } /*session.getProvenanceReporter().send(flowFile, finalCopyFile.toFile().toURI().toString(), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS);*/ session.getProvenanceReporter().receive(flowFile1, files.toURI().toString(), importMillis); //session.transfer(flowFile1, REL_SUCCESS); session.remove(flowFile1); } catch (final Throwable t) { if (tempDotCopyFile != null) { try { Files.deleteIfExists(tempDotCopyFile); } catch (final Exception e) { logger.error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e}); } } flowFile1 = session.penalize(flowFile1); logger.error("Penalizing {} and transferring to failure due to {}", new Object[]{flowFile1, t}); session.transfer(flowFile1,REL_FAILURE); } } if (!isScheduled()) { // if processor stopped, put the rest of the files back on the queue. queueLock.lock(); try { while (itr.hasNext()) { final File nextFile = itr.next(); fileQueue.add(nextFile); inProcess.remove(nextFile); } } finally { queueLock.unlock(); } } } catch (IOException e1) { e1.printStackTrace(); } catch (TransformerConfigurationException e1) { e1.printStackTrace(); } catch (ParserConfigurationException e1) { e1.printStackTrace(); } catch (XPathExpressionException e1) { e1.printStackTrace(); } catch (org.xml.sax.SAXException e) { e.printStackTrace(); } session.commit(); } -- Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/