This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 954760f CAMEL-16725: camel-jira - New issues consumer should recover
if the JIRA server have deleted the last known issue id since last poll. Also
fix when a poll of new issues returned 2 or more entries then the new last
known id should be the last id, and not the first, to avoid polling in
duplicates on next poll.
954760f is described below
commit 954760fdaa0f33736cedaa8d270f80accedc59af
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Jun 25 10:16:13 2021 +0200
CAMEL-16725: camel-jira - New issues consumer should recover if the JIRA
server have deleted the last known issue id since last poll. Also fix when a
poll of new issues returned 2 or more entries then the new last known id should
be the last id, and not the first, to avoid polling in duplicates on next poll.
---
.../component/jira/consumer/NewIssuesConsumer.java | 79 ++++++++++++++++------
.../jira/consumer/WatchUpdatesConsumer.java | 3 +-
2 files changed, 60 insertions(+), 22 deletions(-)
diff --git
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
index 6525e59..6818f80 100644
---
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
+++
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
@@ -16,12 +16,16 @@
*/
package org.apache.camel.component.jira.consumer;
+import java.util.Collections;
import java.util.List;
+import com.atlassian.jira.rest.client.api.RestClientException;
import com.atlassian.jira.rest.client.api.domain.Issue;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.jira.JiraEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Consumes new JIRA issues.
@@ -31,6 +35,8 @@ import org.apache.camel.component.jira.JiraEndpoint;
*/
public class NewIssuesConsumer extends AbstractJiraConsumer {
+ private static final transient Logger LOG =
LoggerFactory.getLogger(NewIssuesConsumer.class);
+
private final String jql;
private long latestIssueId = -1;
@@ -44,37 +50,70 @@ public class NewIssuesConsumer extends AbstractJiraConsumer
{
super.doStart();
// read the actual issues, the next poll outputs only the new issues
added after the route start
// grab only the top
- List<Issue> issues = getIssues(jql, 0, 1, 1);
- // in case there aren't any issues...
- if (!issues.isEmpty()) {
- latestIssueId = issues.get(0).getId();
+ latestIssueId = findLatestIssueId();
+ }
+
+ protected long findLatestIssueId() {
+ // read the actual issues, the next poll outputs only the new issues
added after the route start
+ // grab only the top
+ try {
+ List<Issue> issues = getIssues(jql, 0, 1, 1);
+ // in case there aren't any issues...
+ if (!issues.isEmpty()) {
+ return issues.get(0).getId();
+ }
+ } catch (Exception e) {
+ // ignore
}
+ return -1;
}
protected int doPoll() throws Exception {
// it may happen the poll() is called while the route is doing the
initial load,
// this way we need to wait for the latestIssueId being associated to
the last indexed issue id
- int nMessages = 0;
- if (latestIssueId > -1) {
- List<Issue> newIssues = getNewIssues();
- // In the end, we want only *new* issues oldest to newest.
- for (int i = newIssues.size() - 1; i > -1; i--) {
- Issue newIssue = newIssues.get(i);
- Exchange e = createExchange(true);
- e.getIn().setBody(newIssue);
- getProcessor().process(e);
- }
- nMessages = newIssues.size();
+ List<Issue> newIssues = getNewIssues();
+ // In the end, we want only *new* issues oldest to newest.
+ for (int i = newIssues.size() - 1; i > -1; i--) {
+ Issue newIssue = newIssues.get(i);
+ Exchange e = createExchange(true);
+ e.getIn().setBody(newIssue);
+ getProcessor().process(e);
}
- return nMessages;
+ return newIssues.size();
}
private List<Issue> getNewIssues() {
- // search only for issues created after the latest id
- String jqlFilter = "id > " + latestIssueId + " AND " + jql;
- List<Issue> issues = getIssues(jqlFilter, 0, 50, ((JiraEndpoint)
getEndpoint()).getMaxResults());
+ String jqlFilter;
+ if (latestIssueId > -1) {
+ // search only for issues created after the latest id
+ jqlFilter = "id > " + latestIssueId + " AND " + jql;
+ } else {
+ jqlFilter = jql;
+ }
+ // the last issue may be deleted, so to recover we re-find it and go
from there
+ List<Issue> issues;
+ try {
+ issues = getIssues(jqlFilter, 0, 50,
getEndpoint().getMaxResults());
+ } catch (RestClientException e) {
+ if (e.getStatusCode().isPresent()) {
+ int code = e.getStatusCode().get();
+ if (code == 400) {
+ String msg = e.getMessage();
+ if (msg != null && msg.contains("does not exist for the
field 'id'")) {
+ LOG.warn("Last issue id: " + latestIssueId + " no
longer exists (could have been deleted)."
+ + " Will recover by fetching last issue id
from JIRA and try again on next poll");
+ latestIssueId = findLatestIssueId();
+ return Collections.EMPTY_LIST;
+ }
+ }
+ }
+ throw e;
+ }
+
if (!issues.isEmpty()) {
- latestIssueId = issues.get(0).getId();
+ // remember last id we have processed
+ int last = issues.size() - 1;
+ latestIssueId = issues.get(last).getId();
}
return issues;
}
diff --git
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
index ba9db3e..f91dc8a 100644
---
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
+++
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.component.jira.consumer;
import java.lang.reflect.Method;
@@ -97,7 +96,7 @@ public class WatchUpdatesConsumer extends
AbstractJiraConsumer {
Object changedField = get.invoke(changed);
if (!Objects.equals(originalField, changedField)) {
- if (!((JiraEndpoint) getEndpoint()).isSendOnlyUpdatedField()) {
+ if (!getEndpoint().isSendOnlyUpdatedField()) {
processExchange(changed, changed.getKey(), fieldName);
} else {
processExchange(changedField, changed.getKey(), fieldName);