Yep.  Looks legit to me.  Will try a unit test with a mixture of
flowFiles associated with content and without.

On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rbra...@softnas.com> wrote:
> Joe,
>
> Replies below.
>
> Rick
>
>
> -----Original Message-----
> From: Joe Witt [mailto:joe.w...@gmail.com]
> Sent: Saturday, September 12, 2015 7:02 AM
> To: dev@nifi.apache.org
> Subject: Re: Transfer relationship not specified (FlowFileHandlingException)
>
> Rick
>
> Can you show what is happening in the exception handling part of your code as 
> well?
>
> Yes.  This version has sending (empty) flowfiles for directory entries 
> by-passed so only files get processed, and also has attributes disabled 
> (which did not help).
>
> Session.commit() is throwing the exception - no other errors or issues from 
> session.importFrom() or session.transfer().
>
> Here's the entire onTrigger method:
>
>     @Override
>     public void onTrigger(final ProcessContext context, final ProcessSession 
> session) throws ProcessException {
>         final boolean keepingSourceFile = 
> context.getProperty(KEEP_SOURCE_FILE).asBoolean();
>         final ProcessorLog logger = getLogger();
>
>         final int queueMax = 
> context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
>         if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
>             try {
>                 final Set<File> filelist = getFileList(context, session);
>
>                 queueLock.lock();
>                 try {
>                     filelist.removeAll(inProcess);
>                     if (!keepingSourceFile) {
>                         filelist.removeAll(recentlyProcessed);
>                     }
>
>                     fileQueue.clear();
>                     fileQueue.addAll(filelist);
>
>                     queueLastUpdated.set(System.currentTimeMillis());
>                     recentlyProcessed.clear();
>
>                     if (filelist.isEmpty()) {
>                         context.yield();
>                     }
>                 } finally {
>                     queueLock.unlock();
>                 }
>             } finally {
>                 filelistLock.unlock();
>             }
>         }
>
>         final int batchSize = 
> context.getProperty(PROCESS_BATCH_SIZE).asInteger();
>         final List<File> files = new ArrayList<>(batchSize);
>         queueLock.lock();
>         try {
>             fileQueue.drainTo(files, batchSize);
>             if (files.isEmpty()) {
>                 return;
>             } else {
>                 inProcess.addAll(files);
>             }
>         } finally {
>             queueLock.unlock();
>         }
>
>         final ListIterator<File> itr = files.listIterator();
>         FlowFile flowFile = null;
>         try {
>             while (itr.hasNext()) {
>                 final File file = itr.next();
>                 final Path filePath = file.toPath();
>                 final Path relativePath = 
> filePath.relativize(filePath.getParent());
>                 String relativePathString = relativePath.toString() + "/";
>                 if (relativePathString.isEmpty()) {
>                     relativePathString = "./";
>                 }
>                 final Path absPath = filePath.toAbsolutePath();
>                 final String absPathString = absPath.getParent().toString() + 
> "/";
>
>                 final long importStart = System.nanoTime();
>                 String fileType = "directory";
>                 if (file.isFile()){
>                     fileType = "file";
>                     flowFile = session.create();
>                     flowFile = session.importFrom(filePath, 
> keepingSourceFile, flowFile);
>                 }
>                 else
>                 {
>                    logger.info("skipping directory {} and not placing into 
> output flow", new Object[]{file});
>                    continue; // ******* SKIP DIRECTORIES FOR NOW ****
>                 }
>
>                 final long importNanos = System.nanoTime() - importStart;
>                 final long importMillis = 
> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>
>               //  flowFile = session.putAttribute(flowFile, "file_type", 
> fileType); // directory or file
>               //  flowFile = session.putAttribute(flowFile, 
> CoreAttributes.FILENAME.key(), file.getName());
>               //  flowFile = session.putAttribute(flowFile, 
> CoreAttributes.PATH.key(), relativePathString);
>               //  flowFile = session.putAttribute(flowFile, 
> CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>               //  Map<String, String> attributes = 
> getAttributesFromFile(filePath);
>               //  if (attributes.size() > 0) {
>               //      flowFile = session.putAllAttributes(flowFile, 
> attributes);
>               //  }
>
>                 final String fileURI = file.toURI().toString();
>                 session.getProvenanceReporter().receive(flowFile, fileURI, 
> importMillis);
>                 session.transfer(flowFile, REL_SUCCESS);
>                 logger.info("added {} to flow", new Object[]{flowFile});
>
>                 if (!isScheduled()) {  // if processor stopped, put the rest 
> of the files back on the queue.
>                     queueLock.lock();
>                     try {
>                         while (itr.hasNext()) {
>                             final File nextFile = itr.next();
>                             fileQueue.add(nextFile);
>                             inProcess.remove(nextFile);
>                         }
>                     } finally {
>                         queueLock.unlock();
>                     }
>                 }
>             }
>             session.commit();
>         } catch (final Exception e) {
>             logger.error("Failed to transfer files due to {}", e);
>             context.yield();
>
>
>             // anything that we've not already processed needs to be put back 
> on the queue
>             if (flowFile != null) {
>                 session.remove(flowFile);
>             }
>         } finally {
>             queueLock.lock();
>             try {
>                 inProcess.removeAll(files);
>                 recentlyProcessed.addAll(files);
>             } finally {
>                 queueLock.unlock();
>             }
>         }
>     }
>
> }
>
> Also please confirm which codebase you're running against.  Latest HEAD of 
> master?
>
> I'm using a snap from GitHub that's several weeks old from August 25th (it's 
> working fine with the original GetFile processor, which this code was derived 
> from)
>
> Thanks
> Joe
>
> On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rbra...@softnas.com> wrote:
>> So the "transfer relationship not specified" occurs down in the Provenance 
>> processing, where it checks to see if there are flowfile records associated 
>> with the session/relationship.
>>
>> There are.  When I inspect session.flowfilesout it's equal to 6, which is 
>> the correct number of calls to importFrom and transfer(), so this confirms 
>> that transfer() is called and did record the outbound flowfiles, yet when 
>> the provenance subsystem looks for these records it does not find them.
>>
>> Not being intimate with the internals of the framework yet, not sure what 
>> would case this.
>>
>> Rick
>>
>> -----Original Message-----
>> From: Rick Braddy
>> Sent: Friday, September 11, 2015 8:26 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Mark,
>>
>> The interesting thing is that session.transfer() is being called, as I have 
>> stepped through it in the debugger.  I'm only calling importFrom() for 
>> actual files (not directories), as shown below.  This is a modified version 
>> of GetFile processor.
>>
>> Rick
>>
>>        final ListIterator<File> itr = files.listIterator();
>>         FlowFile flowFile = null;
>>         try {
>>             while (itr.hasNext()) {
>>                 final File file = itr.next();
>>                 final Path filePath = file.toPath();
>>                 final Path relativePath = 
>> filePath.relativize(filePath.getParent());
>>                 String relativePathString = relativePath.toString() + "/";
>>                 if (relativePathString.isEmpty()) {
>>                     relativePathString = "./";
>>                 }
>>                 final Path absPath = filePath.toAbsolutePath();
>>                 final String absPathString =
>> absPath.getParent().toString() + "/";
>>
>>                 final long importStart = System.nanoTime();
>>                 String fileType = "directory";
>>                 flowFile = session.create();
>>                 if (file.isFile()){
>>                     fileType = "file";
>>                     flowFile = session.importFrom(filePath, 
>> keepingSourceFile, flowFile);
>>                 }
>>                 final long importNanos = System.nanoTime() - importStart;
>>                 final long importMillis =
>> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>>
>>                 flowFile = session.putAttribute(flowFile, "file_type", 
>> fileType); // directory or file
>>                 flowFile = session.putAttribute(flowFile, 
>> CoreAttributes.FILENAME.key(), file.getName());
>>                 flowFile = session.putAttribute(flowFile, 
>> CoreAttributes.PATH.key(), relativePathString);
>>                 flowFile = session.putAttribute(flowFile, 
>> CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>>                 Map<String, String> attributes = 
>> getAttributesFromFile(filePath);
>>                 if (attributes.size() > 0) {
>>                     flowFile = session.putAllAttributes(flowFile, 
>> attributes);
>>                 }
>>
>>                 session.getProvenanceReporter().receive(flowFile, 
>> file.toURI().toString(), importMillis);
>>                 session.transfer(flowFile, REL_SUCCESS);
>>                 logger.info("added {} to flow", new
>> Object[]{flowFile});
>>
>>                 if (!isScheduled()) {  // if processor stopped, put the rest 
>> of the files back on the queue.
>>                     queueLock.lock();
>>                     try {
>>                         while (itr.hasNext()) {
>>                             final File nextFile = itr.next();
>>                             fileQueue.add(nextFile);
>>                             inProcess.remove(nextFile);
>>                         }
>>                     } finally {
>>                         queueLock.unlock();
>>                     }
>>                 }
>>             }
>>             session.commit();
>>         } catch (final Exception e) {
>>             logger.error("Failed to transfer files due to {}", e);
>>
>> -----Original Message-----
>> From: Mark Payne [mailto:marka...@hotmail.com]
>> Sent: Friday, September 11, 2015 6:39 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Rick,
>> This error message isn't indicating that there's no Connection for the 
>> Relationship, but rather than the FlowFile was never transferred.
>> I.e., there was never a call to session.transfer() for that FlowFile.
>> Thanks-Mark
>>
>>> From: rbra...@softnas.com
>>> To: dev@nifi.apache.org
>>> Subject: RE: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>> Date: Fri, 11 Sep 2015 23:25:33 +0000
>>>
>>> Some more details:
>>>
>>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3]
>>> c.s.c.processors.files.GetFileData
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to
>>> process session due to
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,clai
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1
>>> e a005,offset=0,name=printargs.c,size=190] is not known in this
>>> session
>>> (StandardProcessSession[id=6967]):
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,clai
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1
>>> e a005,offset=0,name=printargs.c,size=190] is not known in this
>>> session
>>> (StandardProcessSession[id=6967])
>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3]
>>> c.s.c.processors.files.GetFileData
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added
>>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,clai
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff0
>>> c ad6b,offset=0,name=anImage.png,size=16418] to flow
>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3]
>>> c.s.c.processors.files.GetFileData 
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added 
>>> StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0]
>>>  to flow ...
>>>
>>> So it seems there's some issue with each of the FlowFiles...
>>>
>>> -----Original Message-----
>>> From: Rick Braddy [mailto:rbra...@softnas.com]
>>> Sent: Friday, September 11, 2015 6:00 PM
>>> To: dev@nifi.apache.org
>>> Subject: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>>
>>> Hi,
>>>
>>> I have a processor that appears to be creating FlowFiles correctly 
>>> (modified a standard processor), but when it goes to commit() the session, 
>>> an exception is raised:
>>>
>>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6]
>>> c.s.c.processors.files.GetFileData
>>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to
>>> retrieve files due to {}
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,clai
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@939
>>> 6 4c66,offset=244,name=225120878343804,size=42] transfer relationship
>>> not specified
>>>
>>> I'm assuming this is supposed to be indicating there's no connection 
>>> available to commit the transfer; however, there is a "success" 
>>> relationship registered during init() in same way as original processor did 
>>> it, and the success relationship out is connected to another processor 
>>> input as it should be.
>>>
>>> Any suggestions for troubleshooting?
>>>
>>> Rick
>>>
>>>
>>

Reply via email to