[ https://issues.apache.org/jira/browse/NIFI-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ZhangCheng updated NIFI-7403: ----------------------------- Description: PutSQL processor support <Support Fragmented Transactions>,if we set this property true, I think it means The PutSQL processor will excute these sqls of one transaction Transactionally!! But we find that when we set the <Rollback On Failure> false, those sqls of one transaction do not excute transactionally,some sucess and some failure. I think it's wrong. I think, if we set <Support Fragmented Transactions> true, it should be executed Transactionally, no matter <Rollback On Failure> is true or false. I see the code, only PutSQL has the <Support Fragmented Transactions>, it maybe improve this feature at a small cost. modify code design: step1: at account of that maybe Other Processors support the <Support Fragmented Transactions>(such as PutDatabaseRecord), we should move the <Support Fragmented Transactions> from PutSQL.java to Put.java {code:java} public static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder() .name("Support Fragmented Transactions") ... {code} step2: in Put.java onTrigger, after the `putFlowFiles` and before the `onCompleted.apply`, we try to get the value of <Rollback On Failure>, if true , check the `transferredFlowFiles` , if there are flowfiles don't route to `Success`, we should reroute these `transferredFlowFiles`(retry > failure),and do `onFailed`(if it's not null) {code:java} try { putFlowFiles(context, session, functionContext, connection, flowFiles, result); } catch (DiscontinuedException e) { // Whether it was an error or semi normal is depends on the implementation and reason why it wanted to discontinue. // So, no logging is needed here. } ... if(context.getProperty(SUPPORT_TRANSACTIONS).asBoolean()){ //TODO do sth } // OnCompleted processing. if (onCompleted != null) { onCompleted.apply(context, session, functionContext, connection); } // Transfer FlowFiles. transferFlowFiles.apply(context, session, functionContext, result); {code} step3 Additionally, I think the Put.java can extract the RelationShips of the processors those use the Put.java(PutSQL PutDatabaseRecord, PutHiveQL...We can see that these processors who use the Put.java have the same Relationships, I this this is the `Put`'s common feature) {code:java} static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is routed to this relationship after the database is successfully updated") .build(); static final Relationship REL_RETRY = new Relationship.Builder() .name("retry") .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") .build(); static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, " + "such as an invalid query or an integrity constraint violation") .build(); {code} was: PutSQL processor support <Support Fragmented Transactions>,if we set this property true, I think it means The PutSQL processor will excute these sqls of one transaction Transactionally!! But we find that when we set the <Rollback On Failure> false, those sqls of one transaction do not excute transactionally,some sucess and some failure. I think it's wrong. I think, if we set <Support Fragmented Transactions> true, it should be executed Transactionally, no matter <Rollback On Failure> is true or false. I see the code, only PutSQL has the <Support Fragmented Transactions>, it should be improve this feature at a small cost. modify code design: step1: at account of that maybe Other Processors support the <Support Fragmented Transactions>(such as PutDatabaseRecord), we should move the <Support Fragmented Transactions> from PutSQL.java to Put.java {code:java} public static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder() .name("Support Fragmented Transactions") ... {code} step2: in Put.java onTrigger, after the `putFlowFiles` and before the `onCompleted.apply`, we try to get the value of <Rollback On Failure>, if true , check the `transferredFlowFiles` , if there are flowfiles don't route to `Success`, we should reroute these `transferredFlowFiles`(retry > failure),and do `onFailed`(if it's not null) {code:java} try { putFlowFiles(context, session, functionContext, connection, flowFiles, result); } catch (DiscontinuedException e) { // Whether it was an error or semi normal is depends on the implementation and reason why it wanted to discontinue. // So, no logging is needed here. } ... if(context.getProperty(SUPPORT_TRANSACTIONS).asBoolean()){ //TODO do sth } // OnCompleted processing. if (onCompleted != null) { onCompleted.apply(context, session, functionContext, connection); } // Transfer FlowFiles. transferFlowFiles.apply(context, session, functionContext, result); {code} step3 Additionally, I think the Put.java can extract the RelationShips of the processors those use the Put.java(PutSQL PutDatabaseRecord, PutHiveQL...We can see that these processors who use the Put.java have the same Relationships, I this this is the `Put`'s common feature) {code:java} static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is routed to this relationship after the database is successfully updated") .build(); static final Relationship REL_RETRY = new Relationship.Builder() .name("retry") .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") .build(); static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, " + "such as an invalid query or an integrity constraint violation") .build(); {code} > Put.java improvement(PutSQL's transactions support) > --------------------------------------------------- > > Key: NIFI-7403 > URL: https://issues.apache.org/jira/browse/NIFI-7403 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions > Affects Versions: 1.11.4 > Reporter: ZhangCheng > Priority: Major > > PutSQL processor support <Support Fragmented Transactions>,if we set this > property true, I think it means The PutSQL processor will excute these sqls > of one transaction Transactionally!! > But we find that when we set the <Rollback On Failure> false, those sqls of > one transaction do not excute transactionally,some sucess and some failure. I > think it's wrong. > I think, if we set <Support Fragmented Transactions> true, it should be > executed Transactionally, no matter <Rollback On Failure> is true or false. > I see the code, only PutSQL has the <Support Fragmented Transactions>, it > maybe improve this feature at a small cost. > modify code design: > step1: at account of that maybe Other Processors support the <Support > Fragmented Transactions>(such as PutDatabaseRecord), we should move the > <Support Fragmented Transactions> from PutSQL.java to Put.java > {code:java} > public static final PropertyDescriptor SUPPORT_TRANSACTIONS = new > PropertyDescriptor.Builder() > .name("Support Fragmented Transactions") > ... > {code} > step2: in Put.java onTrigger, after the `putFlowFiles` and before the > `onCompleted.apply`, we try to get the value of <Rollback On Failure>, if > true , check the `transferredFlowFiles` , if there are flowfiles don't route > to `Success`, we should reroute these `transferredFlowFiles`(retry > > failure),and do `onFailed`(if it's not null) > {code:java} > try { > putFlowFiles(context, session, functionContext, > connection, flowFiles, result); > } catch (DiscontinuedException e) { > // Whether it was an error or semi normal is depends on > the implementation and reason why it wanted to discontinue. > // So, no logging is needed here. > } > ... > > if(context.getProperty(SUPPORT_TRANSACTIONS).asBoolean()){ > //TODO do sth > } > // OnCompleted processing. > if (onCompleted != null) { > onCompleted.apply(context, session, functionContext, > connection); > } > // Transfer FlowFiles. > transferFlowFiles.apply(context, session, functionContext, > result); > {code} > step3 Additionally, I think the Put.java can extract the RelationShips of the > processors those use the Put.java(PutSQL PutDatabaseRecord, PutHiveQL...We > can see that these processors who use the Put.java have the same > Relationships, I this this is the `Put`'s common feature) > {code:java} > static final Relationship REL_SUCCESS = new Relationship.Builder() > .name("success") > .description("A FlowFile is routed to this relationship after the > database is successfully updated") > .build(); > static final Relationship REL_RETRY = new Relationship.Builder() > .name("retry") > .description("A FlowFile is routed to this relationship if the > database cannot be updated but attempting the operation again may succeed") > .build(); > static final Relationship REL_FAILURE = new Relationship.Builder() > .name("failure") > .description("A FlowFile is routed to this relationship if the > database cannot be updated and retrying the operation will also fail, " > + "such as an invalid query or an integrity constraint > violation") > .build(); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)