Yep. Looks legit to me. Will try a unit test with a mixture of flowFiles associated with content and without.
On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rbra...@softnas.com> wrote: > Joe, > > Replies below. > > Rick > > > -----Original Message----- > From: Joe Witt [mailto:joe.w...@gmail.com] > Sent: Saturday, September 12, 2015 7:02 AM > To: dev@nifi.apache.org > Subject: Re: Transfer relationship not specified (FlowFileHandlingException) > > Rick > > Can you show what is happening in the exception handling part of your code as > well? > > Yes. This version has sending (empty) flowfiles for directory entries > by-passed so only files get processed, and also has attributes disabled > (which did not help). > > Session.commit() is throwing the exception - no other errors or issues from > session.importFrom() or session.transfer(). > > Here's the entire onTrigger method: > > @Override > public void onTrigger(final ProcessContext context, final ProcessSession > session) throws ProcessException { > final boolean keepingSourceFile = > context.getProperty(KEEP_SOURCE_FILE).asBoolean(); > final ProcessorLog logger = getLogger(); > > final int queueMax = > context.getProperty(PROCESS_QUEUE_SIZE).asInteger(); > if (fileQueue.size() < queueMax && filelistLock.tryLock()) { > try { > final Set<File> filelist = getFileList(context, session); > > queueLock.lock(); > try { > filelist.removeAll(inProcess); > if (!keepingSourceFile) { > filelist.removeAll(recentlyProcessed); > } > > fileQueue.clear(); > fileQueue.addAll(filelist); > > queueLastUpdated.set(System.currentTimeMillis()); > recentlyProcessed.clear(); > > if (filelist.isEmpty()) { > context.yield(); > } > } finally { > queueLock.unlock(); > } > } finally { > filelistLock.unlock(); > } > } > > final int batchSize = > context.getProperty(PROCESS_BATCH_SIZE).asInteger(); > final List<File> files = new ArrayList<>(batchSize); > queueLock.lock(); > try { > fileQueue.drainTo(files, batchSize); > if (files.isEmpty()) { > return; > } else { > inProcess.addAll(files); > } > } finally { > queueLock.unlock(); > } > > final ListIterator<File> itr = files.listIterator(); > FlowFile flowFile = null; > try { > while (itr.hasNext()) { > final File file = itr.next(); > final Path filePath = file.toPath(); > final Path relativePath = > filePath.relativize(filePath.getParent()); > String relativePathString = relativePath.toString() + "/"; > if (relativePathString.isEmpty()) { > relativePathString = "./"; > } > final Path absPath = filePath.toAbsolutePath(); > final String absPathString = absPath.getParent().toString() + > "/"; > > final long importStart = System.nanoTime(); > String fileType = "directory"; > if (file.isFile()){ > fileType = "file"; > flowFile = session.create(); > flowFile = session.importFrom(filePath, > keepingSourceFile, flowFile); > } > else > { > logger.info("skipping directory {} and not placing into > output flow", new Object[]{file}); > continue; // ******* SKIP DIRECTORIES FOR NOW **** > } > > final long importNanos = System.nanoTime() - importStart; > final long importMillis = > TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS); > > // flowFile = session.putAttribute(flowFile, "file_type", > fileType); // directory or file > // flowFile = session.putAttribute(flowFile, > CoreAttributes.FILENAME.key(), file.getName()); > // flowFile = session.putAttribute(flowFile, > CoreAttributes.PATH.key(), relativePathString); > // flowFile = session.putAttribute(flowFile, > CoreAttributes.ABSOLUTE_PATH.key(), absPathString); > // Map<String, String> attributes = > getAttributesFromFile(filePath); > // if (attributes.size() > 0) { > // flowFile = session.putAllAttributes(flowFile, > attributes); > // } > > final String fileURI = file.toURI().toString(); > session.getProvenanceReporter().receive(flowFile, fileURI, > importMillis); > session.transfer(flowFile, REL_SUCCESS); > logger.info("added {} to flow", new Object[]{flowFile}); > > if (!isScheduled()) { // if processor stopped, put the rest > of the files back on the queue. > queueLock.lock(); > try { > while (itr.hasNext()) { > final File nextFile = itr.next(); > fileQueue.add(nextFile); > inProcess.remove(nextFile); > } > } finally { > queueLock.unlock(); > } > } > } > session.commit(); > } catch (final Exception e) { > logger.error("Failed to transfer files due to {}", e); > context.yield(); > > > // anything that we've not already processed needs to be put back > on the queue > if (flowFile != null) { > session.remove(flowFile); > } > } finally { > queueLock.lock(); > try { > inProcess.removeAll(files); > recentlyProcessed.addAll(files); > } finally { > queueLock.unlock(); > } > } > } > > } > > Also please confirm which codebase you're running against. Latest HEAD of > master? > > I'm using a snap from GitHub that's several weeks old from August 25th (it's > working fine with the original GetFile processor, which this code was derived > from) > > Thanks > Joe > > On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rbra...@softnas.com> wrote: >> So the "transfer relationship not specified" occurs down in the Provenance >> processing, where it checks to see if there are flowfile records associated >> with the session/relationship. >> >> There are. When I inspect session.flowfilesout it's equal to 6, which is >> the correct number of calls to importFrom and transfer(), so this confirms >> that transfer() is called and did record the outbound flowfiles, yet when >> the provenance subsystem looks for these records it does not find them. >> >> Not being intimate with the internals of the framework yet, not sure what >> would case this. >> >> Rick >> >> -----Original Message----- >> From: Rick Braddy >> Sent: Friday, September 11, 2015 8:26 PM >> To: dev@nifi.apache.org >> Subject: RE: Transfer relationship not specified >> (FlowFileHandlingException) >> >> Mark, >> >> The interesting thing is that session.transfer() is being called, as I have >> stepped through it in the debugger. I'm only calling importFrom() for >> actual files (not directories), as shown below. This is a modified version >> of GetFile processor. >> >> Rick >> >> final ListIterator<File> itr = files.listIterator(); >> FlowFile flowFile = null; >> try { >> while (itr.hasNext()) { >> final File file = itr.next(); >> final Path filePath = file.toPath(); >> final Path relativePath = >> filePath.relativize(filePath.getParent()); >> String relativePathString = relativePath.toString() + "/"; >> if (relativePathString.isEmpty()) { >> relativePathString = "./"; >> } >> final Path absPath = filePath.toAbsolutePath(); >> final String absPathString = >> absPath.getParent().toString() + "/"; >> >> final long importStart = System.nanoTime(); >> String fileType = "directory"; >> flowFile = session.create(); >> if (file.isFile()){ >> fileType = "file"; >> flowFile = session.importFrom(filePath, >> keepingSourceFile, flowFile); >> } >> final long importNanos = System.nanoTime() - importStart; >> final long importMillis = >> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS); >> >> flowFile = session.putAttribute(flowFile, "file_type", >> fileType); // directory or file >> flowFile = session.putAttribute(flowFile, >> CoreAttributes.FILENAME.key(), file.getName()); >> flowFile = session.putAttribute(flowFile, >> CoreAttributes.PATH.key(), relativePathString); >> flowFile = session.putAttribute(flowFile, >> CoreAttributes.ABSOLUTE_PATH.key(), absPathString); >> Map<String, String> attributes = >> getAttributesFromFile(filePath); >> if (attributes.size() > 0) { >> flowFile = session.putAllAttributes(flowFile, >> attributes); >> } >> >> session.getProvenanceReporter().receive(flowFile, >> file.toURI().toString(), importMillis); >> session.transfer(flowFile, REL_SUCCESS); >> logger.info("added {} to flow", new >> Object[]{flowFile}); >> >> if (!isScheduled()) { // if processor stopped, put the rest >> of the files back on the queue. >> queueLock.lock(); >> try { >> while (itr.hasNext()) { >> final File nextFile = itr.next(); >> fileQueue.add(nextFile); >> inProcess.remove(nextFile); >> } >> } finally { >> queueLock.unlock(); >> } >> } >> } >> session.commit(); >> } catch (final Exception e) { >> logger.error("Failed to transfer files due to {}", e); >> >> -----Original Message----- >> From: Mark Payne [mailto:marka...@hotmail.com] >> Sent: Friday, September 11, 2015 6:39 PM >> To: dev@nifi.apache.org >> Subject: RE: Transfer relationship not specified >> (FlowFileHandlingException) >> >> Rick, >> This error message isn't indicating that there's no Connection for the >> Relationship, but rather than the FlowFile was never transferred. >> I.e., there was never a call to session.transfer() for that FlowFile. >> Thanks-Mark >> >>> From: rbra...@softnas.com >>> To: dev@nifi.apache.org >>> Subject: RE: Transfer relationship not specified >>> (FlowFileHandlingException) >>> Date: Fri, 11 Sep 2015 23:25:33 +0000 >>> >>> Some more details: >>> >>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] >>> c.s.c.processors.files.GetFileData >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to >>> process session due to >>> org.apache.nifi.processor.exception.FlowFileHandlingException: >>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,clai >>> m >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1 >>> e a005,offset=0,name=printargs.c,size=190] is not known in this >>> session >>> (StandardProcessSession[id=6967]): >>> org.apache.nifi.processor.exception.FlowFileHandlingException: >>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,clai >>> m >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1 >>> e a005,offset=0,name=printargs.c,size=190] is not known in this >>> session >>> (StandardProcessSession[id=6967]) >>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] >>> c.s.c.processors.files.GetFileData >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added >>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,clai >>> m >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff0 >>> c ad6b,offset=0,name=anImage.png,size=16418] to flow >>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] >>> c.s.c.processors.files.GetFileData >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added >>> StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] >>> to flow ... >>> >>> So it seems there's some issue with each of the FlowFiles... >>> >>> -----Original Message----- >>> From: Rick Braddy [mailto:rbra...@softnas.com] >>> Sent: Friday, September 11, 2015 6:00 PM >>> To: dev@nifi.apache.org >>> Subject: Transfer relationship not specified >>> (FlowFileHandlingException) >>> >>> Hi, >>> >>> I have a processor that appears to be creating FlowFiles correctly >>> (modified a standard processor), but when it goes to commit() the session, >>> an exception is raised: >>> >>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] >>> c.s.c.processors.files.GetFileData >>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to >>> retrieve files due to {} >>> org.apache.nifi.processor.exception.FlowFileHandlingException: >>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,clai >>> m >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@939 >>> 6 4c66,offset=244,name=225120878343804,size=42] transfer relationship >>> not specified >>> >>> I'm assuming this is supposed to be indicating there's no connection >>> available to commit the transfer; however, there is a "success" >>> relationship registered during init() in same way as original processor did >>> it, and the success relationship out is connected to another processor >>> input as it should be. >>> >>> Any suggestions for troubleshooting? >>> >>> Rick >>> >>> >>