I want to reitreve data from folder and then update it and put it back to its' destination and i want to make this operation many times , here is my code i try to getfile and update it , but it doesn't roll back the file and can't reitreve data with the same filename(sometimes when i start this processor first time it reireives data and then rolls back updated one but then it remembers state or flowfile information i guess and doen't reitreive same updated file) , can someone help me what should i change to make this code work?
here is error i got :2017-10-08 21:40:55,959 ERROR [Timer-Driven Process Thread-9] Reader.MyProcessor MyProcessor[id=fcaf839f-015e-1000-da5d-a3256b960a67] MyProcessor[id=fcaf839f-015e-1000-da5d-a3256b960a67] failed to process due to java.lang.IllegalArgumentException: Cannot transfer FlowFiles that are created in this Session back to self; rolling back session: {} java.lang.IllegalArgumentException: Cannot transfer FlowFiles that are created in this Session back to self at org.apache.nifi.controller.repository.StandardProcessSession.transfer(StandardProcessSession.java:1848) at Reader.MyProcessor.onTrigger(MyProcessor.java:732) public class MyProcessor extends AbstractProcessor { public String start, startDate, endDate, makeVersion, runAs, patch; @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final StopWatch stopWatch = new StopWatch(true); final File directory = new File(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()); final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); final Integer maxDestinationFiles = 30; final ComponentLog logger = getLogger(); if (fileQueue.size() < 100) { final long pollingMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS); if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingMillis) && listingLock.tryLock()) { try { final ArrayList<File> listing = performListing(directory, fileFilterRef.get(), context.getProperty(RECURSE).asBoolean().booleanValue()); queueLock.lock(); try { listing.removeAll(inProcess); if (!keepingSourceFile) { listing.removeAll(recentlyProcessed); } fileQueue.clear(); fileQueue.addAll(listing); queueLastUpdated.set(System.currentTimeMillis()); recentlyProcessed.clear(); if (listing.isEmpty()) { context.yield(); } } finally { queueLock.unlock(); } } finally { listingLock.unlock(); } } } final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final List<File> files = new ArrayList<>(batchSize); queueLock.lock(); try { fileQueue.drainTo(files, batchSize); if (files.isEmpty()) { return; } else { inProcess.addAll(files); } } finally { queueLock.unlock(); } //make xml parsing DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); try { dBuilder = dbFactory.newDocumentBuilder(); } catch (ParserConfigurationException e) { e.printStackTrace(); } try { File f = files.get(0); doc = dBuilder.parse(f); } catch (IOException e) { e.printStackTrace(); } catch (org.xml.sax.SAXException e) { e.printStackTrace(); } NodeList nList = doc.getElementsByTagName("localAttributes"); for (int temp = 0; temp < nList.getLength(); temp++) { Node nNode = nList.item(temp); if (nNode.getNodeType() == Node.ELEMENT_NODE) { Element eElement = (Element) nNode; start = eElement.getElementsByTagName("start").item(0).getTextContent(); startDate = eElement.getElementsByTagName("startDate").item(0).getTextContent(); endDate = eElement.getElementsByTagName("endDate").item(0).getTextContent(); patch = eElement.getElementsByTagName("patch").item(0).getTextContent(); runAs = eElement.getElementsByTagName("runAs").item(0).getTextContent(); makeVersion = eElement.getElementsByTagName("makeVersion").item(0).getTextContent(); } } final ListIterator<File> itr = files.listIterator(); FlowFile flowFile = null; try { final Path directoryPath = directory.toPath(); while (itr.hasNext()) { final File file = itr.next(); final Path filePath = file.toPath(); final Path relativePath = directoryPath.relativize(filePath.getParent()); String relativePathString = relativePath.toString() + "/"; if (relativePathString.isEmpty()) { relativePathString = "./"; } final Path absPath = filePath.toAbsolutePath(); final String absPathString = absPath.getParent().toString() + "/"; flowFile = session.create(); final long importStart = System.nanoTime(); flowFile = session.importFrom(filePath, keepingSourceFile, flowFile); final long importNanos = System.nanoTime() - importStart; final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS); flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName()); flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString); flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString); Map<String, String> attributes = getAttributesFromFile(filePath); if (attributes.size() > 0) { flowFile = session.putAllAttributes(flowFile, attributes); } FlowFile flowFile1 = session.create(); flowFile1 = session.putAttribute(flowFile1, CoreAttributes.FILENAME.key(), file.getName()); flowFile1 = session.putAttribute(flowFile1, CoreAttributes.PATH.key(), relativePathString); flowFile1 = session.putAttribute(flowFile1, CoreAttributes.ABSOLUTE_PATH.key(), absPathString); flowFile1 = session.putAttribute(flowFile1, "start", start); flowFile1 = session.putAttribute(flowFile1, "startDate", startDate); flowFile1 = session.putAttribute(flowFile1, "endDate", endDate); flowFile1 = session.putAttribute(flowFile1, "runAs", runAs); flowFile1 = session.putAttribute(flowFile1, "patch", patch); flowFile1 = session.putAttribute(flowFile1, "makeVersion", makeVersion); flowFile1 = session.putAttribute(flowFile1, "filename", "Configuration"); //session.getProvenanceReporter().receive(flowFile1, file.toURI().toString(), importMillis); InputStream ffStream = session.read(flowFile); DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder builder = builderFactory.newDocumentBuilder(); Document xmlDocument = builder.parse(ffStream); XPath xPath = XPathFactory.newInstance().newXPath(); XPathExpression myNodeList = (XPathExpression) xPath.compile("/localAttributes"); Node nodeGettingChanged = (Node) myNodeList.evaluate(xmlDocument, XPathConstants.NODE); NodeList childNodes = nodeGettingChanged.getChildNodes(); for (int i = 0; i != childNodes.getLength(); ++i) { Node child = childNodes.item(i); if (!(child instanceof Element)) continue; if (child.getNodeName().equals("runAs")) child.getFirstChild().setNodeValue("false"); } TransformerFactory transformerFactory = TransformerFactory.newInstance(); Transformer transformer = null; transformer = transformerFactory.newTransformer(); DOMSource source = new DOMSource(xmlDocument); String path = "C://Users//user//Desktop//nifi-1.3.0//nifi-assembly//target//nifi-1.3.0-bin//nifi-1.3.0//1//conf.xml"; File f = new File(path); StreamResult file1 = new StreamResult(f); try { transformer.transform(source, file1); } catch (TransformerException e) { e.printStackTrace(); } session.write(flowFile, new StreamCallback() { @Override public void process(InputStream inputStream, OutputStream outputStream) throws IOException { TransformerFactory transformerFactory = TransformerFactory.newInstance(); Transformer transformer = null; try { transformer = transformerFactory.newTransformer(); } catch (TransformerConfigurationException e) { e.printStackTrace(); } DOMSource source = new DOMSource(xmlDocument); ffStream.close(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); StreamResult result = new StreamResult(bos); try { transformer.transform(source, result); } catch (TransformerException e) { e.printStackTrace(); } byte[] array = bos.toByteArray(); outputStream.write(array); } }); Path tempDotCopyFile = null; try { final Path rootDirPath = Paths.get("C://Users//user//Desktop//nifi-1.3.0//nifi-assembly//target//nifi-1.3.0-bin//nifi-1.3.0//1"); final Path tempCopyFile = rootDirPath.resolve("." + flowFile.getAttribute(CoreAttributes.FILENAME.key())); final Path copyFile = rootDirPath.resolve(flowFile.getAttribute(CoreAttributes.FILENAME.key())); if (!Files.exists(rootDirPath)) { if (context.getProperty(CREATE_DIRS).asBoolean()) { Files.createDirectories(rootDirPath); } else { flowFile = session.penalize(flowFile); session.transfer(flowFile); logger.error("Penalizing {} and routing to 'failure' because the output directory {} does not exist and Processor is " + "configured not to create missing directories", new Object[]{flowFile1, rootDirPath}); return; } } final Path dotCopyFile = tempCopyFile; tempDotCopyFile = dotCopyFile; Path finalCopyFile = copyFile; final Path finalCopyFileDir = finalCopyFile.getParent(); if (Files.exists(finalCopyFileDir) && maxDestinationFiles != null) { // check if too many files already final int numFiles = finalCopyFileDir.toFile().list().length; if (numFiles >= maxDestinationFiles) { flowFile= session.penalize(flowFile); logger.warn("Penalizing {} and routing to 'failure' because the output directory {} has {} files, which exceeds the " + "configured maximum number of files", new Object[]{flowFile, finalCopyFileDir, numFiles}); session.transfer(flowFile); return; } } if (Files.exists(finalCopyFile)) { switch (conflictResponse) { case REPLACE_RESOLUTION: Files.delete(finalCopyFile); logger.info("Deleted {} as configured in order to replace with the contents of {}", new Object[]{finalCopyFile, flowFile}); break; case IGNORE_RESOLUTION: session.transfer(flowFile, REL_SUCCESS); logger.info("Transferring {} to success because file with same name already exists", new Object[]{flowFile}); return; case FAIL_RESOLUTION: flowFile = session.penalize(flowFile); logger.warn("Penalizing {} and routing to failure as configured because file with the same name already exists", new Object[]{flowFile}); session.transfer(flowFile); return; default: break; } } session.exportTo(flowFile, dotCopyFile, false); final String permissions = context.getProperty(CHANGE_PERMISSIONS).evaluateAttributeExpressions(flowFile1).getValue(); if (permissions != null && !permissions.trim().isEmpty()) { try { String perms = stringPermissions(permissions); if (!perms.isEmpty()) { Files.setPosixFilePermissions(dotCopyFile, PosixFilePermissions.fromString(perms)); } } catch (Exception e) { logger.warn("Could not set file permissions to {} because {}", new Object[]{permissions, e}); } } final String owner = context.getProperty(CHANGE_OWNER).evaluateAttributeExpressions(flowFile1).getValue(); if (owner != null && !owner.trim().isEmpty()) { try { UserPrincipalLookupService lookupService = dotCopyFile.getFileSystem().getUserPrincipalLookupService(); Files.setOwner(dotCopyFile, lookupService.lookupPrincipalByName(owner)); } catch (Exception e) { logger.warn("Could not set file owner to {} because {}", new Object[]{owner, e}); } } final String group = context.getProperty(CHANGE_GROUP).evaluateAttributeExpressions(flowFile1).getValue(); if (group != null && !group.trim().isEmpty()) { try { UserPrincipalLookupService lookupService = dotCopyFile.getFileSystem().getUserPrincipalLookupService(); PosixFileAttributeView view = Files.getFileAttributeView(dotCopyFile, PosixFileAttributeView.class); view.setGroup(lookupService.lookupPrincipalByGroupName(group)); } catch (Exception e) { logger.warn("Could not set file group to {} because {}", new Object[]{group, e}); } } boolean renamed = false; for (int i = 0; i < 10; i++) { // try rename up to 10 times. if (dotCopyFile.toFile().renameTo(finalCopyFile.toFile())) { renamed = true; break;// rename was successful } Thread.sleep(100L);// try waiting a few ms to let whatever might cause rename failure to resolve } if (!renamed) { if (Files.exists(dotCopyFile) && dotCopyFile.toFile().delete()) { logger.debug("Deleted dot copy file {}", new Object[]{dotCopyFile}); } throw new ProcessException("Could not rename: " + dotCopyFile); } else { logger.info("Produced copy of {} at location {}", new Object[]{flowFile1, finalCopyFile}); } /*session.getProvenanceReporter().send(flowFile, finalCopyFile.toFile().toURI().toString(), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS);*/ session.getProvenanceReporter().receive(flowFile1, file.toURI().toString(), importMillis); session.transfer(flowFile1, REL_SUCCESS); session.remove(flowFile); } catch (final Throwable t) { if (tempDotCopyFile != null) { try { Files.deleteIfExists(tempDotCopyFile); } catch (final Exception e) { logger.error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e}); } } flowFile = session.penalize(flowFile1); logger.error("Penalizing {} and transferring to failure due to {}", new Object[]{flowFile1, t}); session.transfer(flowFile1); } } if (!isScheduled()) { // if processor stopped, put the rest of the files back on the queue. queueLock.lock(); try { while (itr.hasNext()) { final File nextFile = itr.next(); fileQueue.add(nextFile); inProcess.remove(nextFile); } } finally { queueLock.unlock(); } } } catch (IOException e1) { e1.printStackTrace(); } catch (TransformerConfigurationException e1) { e1.printStackTrace(); } catch (ParserConfigurationException e1) { e1.printStackTrace(); } catch (SAXException e1) { e1.printStackTrace(); } catch (XPathExpressionException e1) { e1.printStackTrace(); } session.commit(); } protected String stringPermissions(String perms) { String permissions = ""; final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$"); final Pattern numPattern = Pattern.compile("\\d+"); if (rwxPattern.matcher(perms).matches()) { permissions = perms; } else if (numPattern.matcher(perms).matches()) { try { int number = Integer.parseInt(perms, 8); StringBuilder permBuilder = new StringBuilder(); if ((number & 0x100) > 0) { permBuilder.append('r'); } else { permBuilder.append('-'); } if ((number & 0x80) > 0) { permBuilder.append('w'); } else { permBuilder.append('-'); } if ((number & 0x40) > 0) { permBuilder.append('x'); } else { permBuilder.append('-'); } if ((number & 0x20) > 0) { permBuilder.append('r'); } else { permBuilder.append('-'); } if ((number & 0x10) > 0) { permBuilder.append('w'); } else { permBuilder.append('-'); } if ((number & 0x8) > 0) { permBuilder.append('x'); } else { permBuilder.append('-'); } if ((number & 0x4) > 0) { permBuilder.append('r'); } else { permBuilder.append('-'); } if ((number & 0x2) > 0) { permBuilder.append('w'); } else { permBuilder.append('-'); } if ((number & 0x8) > 0) { permBuilder.append('x'); } else { permBuilder.append('-'); } permissions = permBuilder.toString(); } catch (NumberFormatException ignore) { } } return permissions; } } -- Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/