[ 
https://issues.apache.org/jira/browse/NIFI-8727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17421589#comment-17421589
 ] 

Mark Payne commented on NIFI-8727:
----------------------------------

Thanks [~wildq] for all of your analysis and describing exactly what you're 
seeing. We don't want to change the constructor that gets used for 
StandardRepositoryRecord, as that changes the record type from CREATE to 
UPDATE. Depending on the Serializer/Deserializer that gets used in the FlowFile 
Repository, that could cause problems because it's basically going to read an 
UPDATE for a FlowFile that doesn't exist.

I'll also note that {{ProcessSession.clone(originalFlowFile)}} shouldn't really 
be used there - it should be {{ProcessSession.create(originalFlowFile)}} - 
clone() means you want the newly created child to have the exact same content 
as the original, whereas create() indicates that you want a new child with no 
content. Since you plan to overwrite the content anyway, {{create}} makes more 
sense, and it's slightly more efficient. That being said, as you noted, it 
should still work the same!

The underlying issue appears to be in 
{{StandardProcessSession.removeTemporaryClaim}}. Because the clone() event 
results in a RepositoryRecord where the content has not been modified, the call 
to {{ProcessSession.write}} doesn't decrement that claim as it needs to. So an 
easy fix is to simply update the logic there to have it perform that logic if 
either the content is modified OR if the Repository Record is CREATE (in which 
case the content claim will be non-existent or will be a temporary claim that 
is to be overwritten).

I've created a Pull Request that does just that, with supporting unit tests. 
Please give it a try.

Thanks

> claimantCount will never decrement to zero
> ------------------------------------------
>
>                 Key: NIFI-8727
>                 URL: https://issues.apache.org/jira/browse/NIFI-8727
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>    Affects Versions: 1.12.0, 1.13.0, 1.12.1, 1.13.1, 1.13.2
>         Environment: linux
>            Reporter: wangliqiang
>            Assignee: Mark Bean
>            Priority: Major
>   Original Estimate: 96h
>          Time Spent: 10m
>  Remaining Estimate: 95h 50m
>
> When running my processor code below :
> {code:java}
> //originalFlowFile has content , so ClaimantCount=1
>  FlowFile multiFlowFile = session.clone(originalFlowFile); // claim count 
> +1,so ClaimantCount=2
>  multiFlowFile = session.write(multiFlowFile, new OutputStreamCallback() {
>      @Override
>      public void process(OutputStream out) throws IOException {
>         IOUtils.write(tvMultiAlbumJson, out, Charset.forName("UTF-8"));
>      }
>  });//the new content will resuse the same resource claim , so ClaimantCount=3
>  //At this point we have two flowfile and two contentClaim ,and ClaimCount=3.
>  //When this two flowfiles dropped,the claimantCount should decrement to 
> 0,however the result is ClaimantCount=1!
>  //If we use "sh nifi.sh diagnostics --verbose dump.log" to get a dump log,we 
> will find some info like this “default/465/1623853427574-10705, Claimant 
> Count = 1, In Use = true, Awaiting Destruction = false, References (0) = []” 
>  //And the file “default/465/1623853427574-10705” will never be archived,and 
> will never be destroyed,and the content_repository will use more storage than 
> it configs.{code}
> The above is a sort of phenomenon. The reason is the code below:
> {code:java}
> //session.clone
>  public FlowFile clone(FlowFile example, final long offset, final long size) {
>      .....................................
>      final StandardRepositoryRecord record = new 
> StandardRepositoryRecord(null); //here the originalFlowFileRecord of record 
> is null
>      .....................................
>      return clone;
>  }
>  //session.commit
>  private void updateClaimCounts(final RepositoryRecord record) {
>      ..........................................................
>      if (record.isContentModified()) {
>      decrementClaimCount(originalClaim); //here the originalClaim is null
>      }
>  }{code}
> Perhaps we should not use session.clone like that,but without official note 
> it will sometimes happen to be used.
> So i change "final StandardRepositoryRecord record = new 
> StandardRepositoryRecord(null)" to "final StandardRepositoryRecord record = 
> new StandardRepositoryRecord(null, currRec);"
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to