This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 19cd31b1617 NIFI-15657 Fixed 409 Conflict in Azure DevOps and
Bitbucket for multiple Flows with shared branch (#10948)
19cd31b1617 is described below
commit 19cd31b1617ed635eab69b82ebede51008fd6e6c
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Mar 10 23:32:57 2026 +0100
NIFI-15657 Fixed 409 Conflict in Azure DevOps and Bitbucket for multiple
Flows with shared branch (#10948)
Signed-off-by: David Handermann <[email protected]>
---
.../bitbucket/BitbucketRepositoryClient.java | 80 +++--
.../bitbucket/BitbucketRepositoryClientTest.java | 322 +++++++++++++++++++++
.../azure/devops/AzureDevOpsRepositoryClient.java | 93 +++---
.../devops/AzureDevOpsRepositoryClientTest.java | 306 ++++++++++++++++++++
4 files changed, 744 insertions(+), 57 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java
b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java
index 1779663445a..8a8ea4e8e43 100644
---
a/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java
+++
b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java
@@ -90,6 +90,7 @@ public class BitbucketRepositoryClient implements
GitRepositoryClient {
private static final String FIELD_DISPLAY_ID = "displayId";
private static final String FIELD_ID = "id";
private static final String FIELD_HASH = "hash";
+ private static final String FIELD_TARGET = "target";
private static final String FIELD_MESSAGE_TEXT = "message";
private static final String FIELD_AUTHOR = "author";
private static final String FIELD_AUTHOR_TIMESTAMP = "authorTimestamp";
@@ -101,6 +102,7 @@ public class BitbucketRepositoryClient implements
GitRepositoryClient {
private static final String ENTRY_DIRECTORY_CLOUD = "commit_directory";
private static final String ENTRY_FILE_DATA_CENTER = "FILE";
private static final String ENTRY_FILE_CLOUD = "commit_file";
+ private static final int MAX_PUSH_ATTEMPTS = 3;
private final ObjectMapper objectMapper = JsonMapper.builder().build();
@@ -427,33 +429,75 @@ public class BitbucketRepositoryClient implements
GitRepositoryClient {
}
private String createContentCloud(final GitCreateContentRequest request,
final String resolvedPath, final String branch) throws FlowRegistryException {
- final StandardMultipartFormDataStreamBuilder multipartBuilder = new
StandardMultipartFormDataStreamBuilder();
- multipartBuilder.addPart(resolvedPath,
StandardHttpContentType.APPLICATION_JSON,
request.getContent().getBytes(StandardCharsets.UTF_8));
- multipartBuilder.addPart(FIELD_MESSAGE,
StandardHttpContentType.TEXT_PLAIN,
request.getMessage().getBytes(StandardCharsets.UTF_8));
- multipartBuilder.addPart(FIELD_BRANCH,
StandardHttpContentType.TEXT_PLAIN, branch.getBytes(StandardCharsets.UTF_8));
+ final String expectedFileCommit = request.getExpectedCommitSha();
+ final URI uri =
getRepositoryUriBuilder().addPathSegment("src").build();
- // Add parents parameter for atomic commit - Bitbucket Cloud will
reject if the branch has moved
- final String expectedCommitSha = request.getExpectedCommitSha();
- if (expectedCommitSha != null && !expectedCommitSha.isBlank()) {
- multipartBuilder.addPart(FIELD_PARENTS,
StandardHttpContentType.TEXT_PLAIN,
expectedCommitSha.getBytes(StandardCharsets.UTF_8));
+ for (int attempt = 1; attempt <= MAX_PUSH_ATTEMPTS; attempt++) {
+ if (expectedFileCommit != null) {
+ final Optional<String> currentFileCommit =
getLatestCommit(branch, resolvedPath);
+ if (currentFileCommit.isPresent() &&
!currentFileCommit.get().equals(expectedFileCommit)) {
+ throw new FlowRegistryException("File [%s] has been
modified by another commit [%s]".formatted(resolvedPath,
currentFileCommit.get()));
+ }
+ }
+
+ final String branchHead = getBranchHeadCloud(branch);
+
+ final StandardMultipartFormDataStreamBuilder multipartBuilder =
new StandardMultipartFormDataStreamBuilder();
+ multipartBuilder.addPart(resolvedPath,
StandardHttpContentType.APPLICATION_JSON,
request.getContent().getBytes(StandardCharsets.UTF_8));
+ multipartBuilder.addPart(FIELD_MESSAGE,
StandardHttpContentType.TEXT_PLAIN,
request.getMessage().getBytes(StandardCharsets.UTF_8));
+ multipartBuilder.addPart(FIELD_BRANCH,
StandardHttpContentType.TEXT_PLAIN, branch.getBytes(StandardCharsets.UTF_8));
+ multipartBuilder.addPart(FIELD_PARENTS,
StandardHttpContentType.TEXT_PLAIN,
branchHead.getBytes(StandardCharsets.UTF_8));
+
+ final HttpResponseEntity response =
this.webClient.getWebClientService()
+ .post()
+ .uri(uri)
+ .body(multipartBuilder.build(), OptionalLong.empty())
+ .header(AUTHORIZATION_HEADER,
authToken.getAuthzHeaderValue())
+ .header(CONTENT_TYPE_HEADER,
multipartBuilder.getHttpContentType().getContentType())
+ .retrieve();
+
+ if (response.statusCode() == HttpURLConnection.HTTP_CREATED) {
+ closeQuietly(response);
+ return getRequiredLatestCommit(branch, resolvedPath);
+ }
+
+ if (response.statusCode() == HttpURLConnection.HTTP_CONFLICT) {
+ closeQuietly(response);
+ if (attempt == MAX_PUSH_ATTEMPTS) {
+ throw new FlowRegistryException("Push failed after %d
attempts due to concurrent branch modifications".formatted(MAX_PUSH_ATTEMPTS));
+ }
+ logger.debug("Push attempt {} for path [{}] failed with 409
(branch HEAD moved), retrying", attempt, resolvedPath);
+ continue;
+ }
+
+ final String errorMessage = "Error while committing content for
repository [%s] on branch %s at path %s".formatted(repoName, branch,
resolvedPath);
+ try {
+ throw new FlowRegistryException("%s:
%s".formatted(errorMessage, getErrorMessage(response)));
+ } finally {
+ closeQuietly(response);
+ }
}
- final URI uri =
getRepositoryUriBuilder().addPathSegment("src").build();
- final String errorMessage = "Error while committing content for
repository [%s] on branch %s at path %s"
- .formatted(repoName, branch, resolvedPath);
+ throw new FlowRegistryException("Push failed after %d attempts due to
concurrent branch modifications".formatted(MAX_PUSH_ATTEMPTS));
+ }
+
+ private String getBranchHeadCloud(final String branch) throws
FlowRegistryException {
+ final HttpUriBuilder builder =
getRepositoryUriBuilder().addPathSegment("refs").addPathSegment("branches");
+ addPathSegments(builder, branch);
+ final URI uri = builder.build();
+
try (final HttpResponseEntity response =
this.webClient.getWebClientService()
- .post()
+ .get()
.uri(uri)
- .body(multipartBuilder.build(), OptionalLong.empty())
.header(AUTHORIZATION_HEADER, authToken.getAuthzHeaderValue())
- .header(CONTENT_TYPE_HEADER,
multipartBuilder.getHttpContentType().getContentType())
.retrieve()) {
- verifyStatusCode(response, errorMessage,
HttpURLConnection.HTTP_CREATED);
+
+ verifyStatusCode(response, "Error while fetching branch HEAD for
branch [%s] in repository [%s]".formatted(branch, repoName),
HttpURLConnection.HTTP_OK);
+ final JsonNode jsonResponse = parseResponseBody(response, uri);
+ return jsonResponse.get(FIELD_TARGET).get(FIELD_HASH).asText();
} catch (final IOException e) {
- throw new FlowRegistryException("Failed closing Bitbucket create
content response", e);
+ throw new FlowRegistryException("Failed closing Bitbucket branch
HEAD response", e);
}
-
- return getRequiredLatestCommit(branch, resolvedPath);
}
private String createContentDataCenter(final GitCreateContentRequest
request, final String resolvedPath, final String branch) throws
FlowRegistryException {
diff --git
a/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/test/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClientTest.java
b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/test/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClientTest.java
new file mode 100644
index 00000000000..0730b4352f9
--- /dev/null
+++
b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/test/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClientTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.atlassian.bitbucket;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.apache.nifi.registry.flow.git.client.GitCreateContentRequest;
+import org.apache.nifi.web.client.api.HttpRequestBodySpec;
+import org.apache.nifi.web.client.api.HttpRequestHeadersSpec;
+import org.apache.nifi.web.client.api.HttpRequestUriSpec;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class BitbucketRepositoryClientTest {
+
+ private static final String BRANCH = "main";
+ private static final String FILE_PATH = "flows/test.json";
+ private static final String CONTENT = "{\"flow\":\"content\"}";
+ private static final String MESSAGE = "test commit";
+ private static final String EXPECTED_FILE_COMMIT = "file-commit-sha";
+ private static final String BRANCH_HEAD_SHA = "branch-head-sha";
+ private static final String RESULT_COMMIT_SHA = "result-commit-sha";
+
+ @Mock
+ private WebClientServiceProvider webClientProvider;
+
+ @Mock
+ private WebClientService webClientService;
+
+ @Mock
+ private ComponentLog logger;
+
+ private HttpUriBuilder uriBuilder;
+
+ private HttpRequestBodySpec getBodySpec;
+ private HttpRequestBodySpec postAfterHeaders;
+
+ @BeforeEach
+ void setUp() {
+
lenient().when(webClientProvider.getWebClientService()).thenReturn(webClientService);
+ setupUriBuilder();
+ }
+
+ private void setupUriBuilder() {
+ uriBuilder = mock(HttpUriBuilder.class);
+
lenient().when(webClientProvider.getHttpUriBuilder()).thenReturn(uriBuilder);
+ lenient().when(uriBuilder.scheme(anyString())).thenReturn(uriBuilder);
+ lenient().when(uriBuilder.host(anyString())).thenReturn(uriBuilder);
+ lenient().when(uriBuilder.port(any(int.class))).thenReturn(uriBuilder);
+
lenient().when(uriBuilder.addPathSegment(anyString())).thenReturn(uriBuilder);
+ lenient().when(uriBuilder.addQueryParameter(anyString(),
anyString())).thenReturn(uriBuilder);
+
lenient().when(uriBuilder.build()).thenReturn(URI.create("https://api.bitbucket.org/test"));
+ }
+
+ private BitbucketRepositoryClient buildCloudClient() throws
FlowRegistryException {
+ return BitbucketRepositoryClient.builder()
+ .clientId("test-client")
+ .apiUrl("https://api.bitbucket.org")
+ .formFactor(BitbucketFormFactor.CLOUD)
+ .authenticationType(BitbucketAuthenticationType.ACCESS_TOKEN)
+ .accessToken("test-token")
+ .workspace("test-workspace")
+ .repoName("test-repo")
+ .webClient(webClientProvider)
+ .logger(logger)
+ .build();
+ }
+
+ private void stubGetChain(final HttpResponseEntity... responses) {
+ final HttpRequestUriSpec getSpec = mock(HttpRequestUriSpec.class);
+ getBodySpec = mock(HttpRequestBodySpec.class);
+ lenient().when(webClientService.get()).thenReturn(getSpec);
+ lenient().when(getSpec.uri(any(URI.class))).thenReturn(getBodySpec);
+ lenient().when(getBodySpec.header(anyString(),
anyString())).thenReturn(getBodySpec);
+
+ OngoingStubbing<HttpResponseEntity> stubbing =
when(getBodySpec.retrieve());
+ for (final HttpResponseEntity response : responses) {
+ stubbing = stubbing.thenReturn(response);
+ }
+ }
+
+ private void stubPostChain(final HttpResponseEntity... responses) {
+ final HttpRequestUriSpec postSpec = mock(HttpRequestUriSpec.class);
+ final HttpRequestBodySpec postBodySpec =
mock(HttpRequestBodySpec.class);
+ final HttpRequestHeadersSpec afterBody =
mock(HttpRequestHeadersSpec.class);
+ postAfterHeaders = mock(HttpRequestBodySpec.class);
+ lenient().when(webClientService.post()).thenReturn(postSpec);
+ lenient().when(postSpec.uri(any(URI.class))).thenReturn(postBodySpec);
+ lenient().when(postBodySpec.body(any(InputStream.class),
any(OptionalLong.class))).thenReturn(afterBody);
+ lenient().when(afterBody.header(anyString(),
anyString())).thenReturn(postAfterHeaders);
+ lenient().when(postAfterHeaders.header(anyString(),
anyString())).thenReturn(postAfterHeaders);
+
+ OngoingStubbing<HttpResponseEntity> stubbing =
when(postAfterHeaders.retrieve());
+ for (final HttpResponseEntity response : responses) {
+ stubbing = stubbing.thenReturn(response);
+ }
+ }
+
+ private HttpResponseEntity mockResponse(final int statusCode, final String
body) {
+ final HttpResponseEntity response = mock(HttpResponseEntity.class);
+ lenient().when(response.statusCode()).thenReturn(statusCode);
+ lenient().when(response.body()).thenReturn(new
ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)));
+ return response;
+ }
+
+ private HttpResponseEntity branchListResponse() {
+ return mockResponse(HttpURLConnection.HTTP_OK,
"{\"values\":[{\"name\":\"main\"}]}");
+ }
+
+ private HttpResponseEntity branchHeadResponse(final String hash) {
+ return mockResponse(HttpURLConnection.HTTP_OK,
"{\"target\":{\"hash\":\"%s\"}}".formatted(hash));
+ }
+
+ private HttpResponseEntity commitsCheckResponse() {
+ return mockResponse(HttpURLConnection.HTTP_OK,
"{\"values\":[{\"hash\":\"some-hash\"}]}");
+ }
+
+ private HttpResponseEntity commitsForPathResponse(final String hash) {
+ return mockResponse(HttpURLConnection.HTTP_OK,
"{\"values\":[{\"hash\":\"%s\"}]}".formatted(hash));
+ }
+
+ private HttpResponseEntity conflictResponse() {
+ return mockResponse(HttpURLConnection.HTTP_CONFLICT,
"{\"error\":{\"message\":\"conflict\"}}");
+ }
+
+ private HttpResponseEntity serverErrorResponse() {
+ return mockResponse(HttpURLConnection.HTTP_INTERNAL_ERROR,
"{\"error\":{\"message\":\"internal error\"}}");
+ }
+
+ private HttpResponseEntity createdResponse() {
+ return mockResponse(HttpURLConnection.HTTP_CREATED, "{}");
+ }
+
+ private GitCreateContentRequest createRequest(final String
existingContentSha, final String expectedCommitSha) {
+ final GitCreateContentRequest.Builder builder =
GitCreateContentRequest.builder()
+ .branch(BRANCH)
+ .path(FILE_PATH)
+ .content(CONTENT)
+ .message(MESSAGE);
+ if (existingContentSha != null) {
+ builder.existingContentSha(existingContentSha);
+ }
+ if (expectedCommitSha != null) {
+ builder.expectedCommitSha(expectedCommitSha);
+ }
+ return builder.build();
+ }
+
+ @Test
+ void testCreateContentCloudSuccess() throws FlowRegistryException {
+ stubGetChain(
+ branchListResponse(),
+ branchHeadResponse(BRANCH_HEAD_SHA),
+ commitsCheckResponse(),
+ commitsForPathResponse(RESULT_COMMIT_SHA)
+ );
+ stubPostChain(createdResponse());
+
+ final BitbucketRepositoryClient client = buildCloudClient();
+ final String commitSha =
client.createContent(createRequest("existing-sha", null));
+ assertEquals(RESULT_COMMIT_SHA, commitSha);
+ }
+
+ @Test
+ void testCreateContentCloudRetryOn409() throws FlowRegistryException {
+ stubGetChain(
+ branchListResponse(),
+ branchHeadResponse(BRANCH_HEAD_SHA),
+ branchHeadResponse("new-head"),
+ commitsCheckResponse(),
+ commitsForPathResponse(RESULT_COMMIT_SHA)
+ );
+ stubPostChain(conflictResponse(), createdResponse());
+
+ final BitbucketRepositoryClient client = buildCloudClient();
+ final String commitSha =
client.createContent(createRequest("existing-sha", null));
+ assertEquals(RESULT_COMMIT_SHA, commitSha);
+ }
+
+ @Test
+ void testCreateContentCloudFileLevelConflict() throws
FlowRegistryException {
+ stubGetChain(
+ branchListResponse(),
+ commitsCheckResponse(),
+ commitsForPathResponse("different-sha")
+ );
+
+ final BitbucketRepositoryClient client = buildCloudClient();
+ final FlowRegistryException exception =
assertThrows(FlowRegistryException.class,
+ () -> client.createContent(createRequest("existing-sha",
EXPECTED_FILE_COMMIT)));
+ assertTrue(exception.getMessage().contains("has been modified by
another commit"));
+ }
+
+ @Test
+ void testCreateContentCloudMaxRetriesExhausted() throws
FlowRegistryException {
+ stubGetChain(
+ branchListResponse(),
+ branchHeadResponse("head-1"),
+ branchHeadResponse("head-2"),
+ branchHeadResponse("head-3")
+ );
+ stubPostChain(conflictResponse(), conflictResponse(),
conflictResponse());
+
+ final BitbucketRepositoryClient client = buildCloudClient();
+ final FlowRegistryException exception =
assertThrows(FlowRegistryException.class,
+ () -> client.createContent(createRequest("existing-sha",
null)));
+ assertTrue(exception.getMessage().contains("Push failed after 3
attempts"));
+ }
+
+ @Test
+ void testCreateContentCloudNon409ErrorNotRetried() throws
FlowRegistryException {
+ stubGetChain(
+ branchListResponse(),
+ branchHeadResponse(BRANCH_HEAD_SHA)
+ );
+ stubPostChain(serverErrorResponse());
+
+ final BitbucketRepositoryClient client = buildCloudClient();
+ assertThrows(FlowRegistryException.class,
+ () -> client.createContent(createRequest("existing-sha",
null)));
+ }
+
+ @Test
+ void testCreateContentCloudNullExpectedCommitSha() throws
FlowRegistryException {
+ stubGetChain(
+ branchListResponse(),
+ branchHeadResponse(BRANCH_HEAD_SHA),
+ commitsCheckResponse(),
+ commitsForPathResponse(RESULT_COMMIT_SHA)
+ );
+ stubPostChain(createdResponse());
+
+ final BitbucketRepositoryClient client = buildCloudClient();
+ final String commitSha = client.createContent(createRequest(null,
null));
+ assertEquals(RESULT_COMMIT_SHA, commitSha);
+ }
+
+ @Test
+ void testCreateContentDataCenterUnchanged() throws FlowRegistryException {
+ final HttpRequestUriSpec getSpec = mock(HttpRequestUriSpec.class);
+ final HttpRequestBodySpec dcGetBody = mock(HttpRequestBodySpec.class);
+ lenient().when(webClientService.get()).thenReturn(getSpec);
+ lenient().when(getSpec.uri(any(URI.class))).thenReturn(dcGetBody);
+ lenient().when(dcGetBody.header(anyString(),
anyString())).thenReturn(dcGetBody);
+
+ final HttpResponseEntity branchesResp =
mockResponse(HttpURLConnection.HTTP_OK,
+ "{\"values\":[{\"displayId\":\"main\"}],\"isLastPage\":true}");
+ final HttpResponseEntity commitsCheckResp =
mockResponse(HttpURLConnection.HTTP_OK,
+ "{\"values\":[{\"id\":\"c1\"}],\"isLastPage\":true}");
+ final HttpResponseEntity commitsResp =
mockResponse(HttpURLConnection.HTTP_OK,
+
"{\"values\":[{\"id\":\"%s\"}],\"isLastPage\":true}".formatted(RESULT_COMMIT_SHA));
+ when(dcGetBody.retrieve()).thenReturn(branchesResp, commitsCheckResp,
commitsResp);
+
+ final HttpRequestUriSpec putSpec = mock(HttpRequestUriSpec.class);
+ final HttpRequestBodySpec putBodySpec =
mock(HttpRequestBodySpec.class);
+ final HttpRequestHeadersSpec putAfterBody =
mock(HttpRequestHeadersSpec.class);
+ final HttpRequestBodySpec putAfterHeaders =
mock(HttpRequestBodySpec.class);
+ lenient().when(webClientService.put()).thenReturn(putSpec);
+ lenient().when(putSpec.uri(any(URI.class))).thenReturn(putBodySpec);
+ lenient().when(putBodySpec.body(any(InputStream.class),
any(OptionalLong.class))).thenReturn(putAfterBody);
+ lenient().when(putAfterBody.header(anyString(),
anyString())).thenReturn(putAfterHeaders);
+ lenient().when(putAfterHeaders.header(anyString(),
anyString())).thenReturn(putAfterHeaders);
+ final HttpResponseEntity putResponse =
mockResponse(HttpURLConnection.HTTP_OK, "{}");
+ lenient().when(putAfterHeaders.retrieve()).thenReturn(putResponse);
+
+ final BitbucketRepositoryClient dcClient =
BitbucketRepositoryClient.builder()
+ .clientId("test-client")
+ .apiUrl("https://bitbucket.example.com")
+ .formFactor(BitbucketFormFactor.DATA_CENTER)
+ .authenticationType(BitbucketAuthenticationType.ACCESS_TOKEN)
+ .accessToken("test-token")
+ .projectKey("TEST")
+ .repoName("test-repo")
+ .webClient(webClientProvider)
+ .logger(logger)
+ .build();
+
+ final GitCreateContentRequest request = createRequest(null,
EXPECTED_FILE_COMMIT);
+ final String commitSha = dcClient.createContent(request);
+ assertEquals(RESULT_COMMIT_SHA, commitSha);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java
index 88091ff978e..dc6859317c0 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java
@@ -120,6 +120,7 @@ public class AzureDevOpsRepositoryClient implements
GitRepositoryClient {
private static final String CHANGE_TYPE_EDIT = "edit";
private static final String CHANGE_TYPE_DELETE = "delete";
private static final String CONTENT_TYPE_BASE64 = "base64encoded";
+ private static final int MAX_PUSH_ATTEMPTS = 3;
// Common query parameter names and values
private static final String VERSION_DESCRIPTOR_VERSION =
"versionDescriptor.version";
@@ -360,25 +361,7 @@ public class AzureDevOpsRepositoryClient implements
GitRepositoryClient {
final String message = request.getMessage();
logger.debug("Creating content at path [{}] on branch [{}] in repo
[{}]", path, branch, repoName);
- // Use expectedCommitSha for atomic commit if provided, otherwise
fetch current branch HEAD
- // Azure DevOps will reject the push if oldObjectId doesn't match the
current branch HEAD
- final String oldObjectId;
- final String expectedCommitSha = request.getExpectedCommitSha();
- if (expectedCommitSha != null && !expectedCommitSha.isBlank()) {
- oldObjectId = expectedCommitSha;
- } else {
- // Fall back to fetching current branch commit id
- final URI refUri = getUriBuilder().addPathSegment(SEGMENT_REFS)
- .addQueryParameter(PARAM_FILTER, FILTER_HEADS_PREFIX +
branch)
- .addQueryParameter(API, API_VERSION)
- .build();
- final JsonNode refResponse = executeGet(refUri);
- oldObjectId =
refResponse.get(JSON_FIELD_VALUE).get(0).get(JSON_FIELD_OBJECT_ID).asText();
- }
-
- final URI pushUri = getUriBuilder().addPathSegment(SEGMENT_PUSHES)
- .addQueryParameter(API, API_VERSION)
- .build();
+ final String expectedFileCommit = request.getExpectedCommitSha();
final String changeType;
if (request.getExistingContentSha() == null) {
@@ -389,11 +372,59 @@ public class AzureDevOpsRepositoryClient implements
GitRepositoryClient {
}
final String encoded =
Base64.getEncoder().encodeToString(request.getContent().getBytes(StandardCharsets.UTF_8));
+ final URI pushUri = getUriBuilder().addPathSegment(SEGMENT_PUSHES)
+ .addQueryParameter(API, API_VERSION)
+ .build();
+
+ for (int attempt = 1; attempt <= MAX_PUSH_ATTEMPTS; attempt++) {
+ if (expectedFileCommit != null) {
+ final Optional<String> currentFileCommit =
getContentSha(request.getPath(), branch);
+ if (currentFileCommit.isPresent() &&
!currentFileCommit.get().equals(expectedFileCommit)) {
+ throw new FlowRegistryException("File [%s] has been
modified by another commit [%s]".formatted(path, currentFileCommit.get()));
+ }
+ }
+ final String branchHead = fetchBranchHead(branch);
+ final HttpResponseEntity response = executePush(pushUri, branch,
branchHead, encoded, message, changeType, path);
+
+ if (response.statusCode() == HttpURLConnection.HTTP_CREATED) {
+ try {
+ final JsonNode pushResponse =
MAPPER.readTree(response.body());
+ return
pushResponse.get(SEGMENT_COMMITS).get(0).get(JSON_FIELD_COMMIT_ID).asText();
+ } catch (final IOException e) {
+ throw new FlowRegistryException("Failed to parse push
response from [%s]".formatted(pushUri), e);
+ }
+ }
+
+ if (response.statusCode() == HttpURLConnection.HTTP_CONFLICT) {
+ if (attempt == MAX_PUSH_ATTEMPTS) {
+ throw new FlowRegistryException("Push failed after %d
attempts due to concurrent branch modifications".formatted(MAX_PUSH_ATTEMPTS));
+ }
+ logger.debug("Push attempt {} for path [{}] failed with 409
(branch HEAD moved), retrying", attempt, path);
+ continue;
+ }
+
+ throw new FlowRegistryException("Request to %s failed -
%s".formatted(pushUri, getErrorMessage(response)));
+ }
+
+ throw new FlowRegistryException("Push failed after %d attempts due to
concurrent branch modifications".formatted(MAX_PUSH_ATTEMPTS));
+ }
+
+ private String fetchBranchHead(final String branch) throws
FlowRegistryException {
+ final URI refUri = getUriBuilder().addPathSegment(SEGMENT_REFS)
+ .addQueryParameter(PARAM_FILTER, FILTER_HEADS_PREFIX + branch)
+ .addQueryParameter(API, API_VERSION)
+ .build();
+ final JsonNode refResponse = executeGet(refUri);
+ return
refResponse.get(JSON_FIELD_VALUE).get(0).get(JSON_FIELD_OBJECT_ID).asText();
+ }
+
+ private HttpResponseEntity executePush(final URI pushUri, final String
branch, final String oldObjectId,
+ final String encodedContent, final
String message,
+ final String changeType, final
String path) throws FlowRegistryException {
final PushRequest pushRequest = new PushRequest(
List.of(new RefUpdate(REFS_HEADS_PREFIX + branch,
oldObjectId)),
- List.of(new Commit(message,
- List.of(new Change(changeType, new Item(path), new
NewContent(encoded, CONTENT_TYPE_BASE64)))))
+ List.of(new Commit(message, List.of(new Change(changeType, new
Item(path), new NewContent(encodedContent, CONTENT_TYPE_BASE64)))))
);
final String json;
@@ -403,36 +434,20 @@ public class AzureDevOpsRepositoryClient implements
GitRepositoryClient {
throw new FlowRegistryException("Failed to serialize push
request", e);
}
- final HttpResponseEntity response =
this.webClient.getWebClientService()
+ return this.webClient.getWebClientService()
.post()
.uri(pushUri)
.header(AUTHORIZATION_HEADER, bearerToken())
.header(CONTENT_TYPE_HEADER,
MediaType.APPLICATION_JSON.getMediaType())
.body(json)
.retrieve();
-
- if (response.statusCode() != HttpURLConnection.HTTP_CREATED) {
- throw new FlowRegistryException("Request to %s failed -
%s".formatted(pushUri, getErrorMessage(response)));
- }
-
- try {
- final JsonNode pushResponse = MAPPER.readTree(response.body());
- return
pushResponse.get(SEGMENT_COMMITS).get(0).get(JSON_FIELD_COMMIT_ID).asText();
- } catch (IOException e) {
- throw new FlowRegistryException("Failed to create content", e);
- }
}
@Override
public InputStream deleteContent(final String filePath, final String
commitMessage, final String branch) throws FlowRegistryException, IOException {
final String path = getResolvedPath(filePath);
logger.debug("Deleting file [{}] in repo [{}] on branch [{}]", path,
repoName, branch);
- final URI refUri = getUriBuilder().addPathSegment(SEGMENT_REFS)
- .addQueryParameter(PARAM_FILTER, FILTER_HEADS_PREFIX + branch)
- .addQueryParameter(API, API_VERSION)
- .build();
- final JsonNode refResponse = executeGet(refUri);
- final String oldObjectId =
refResponse.get(JSON_FIELD_VALUE).get(0).get(JSON_FIELD_OBJECT_ID).asText();
+ final String oldObjectId = fetchBranchHead(branch);
final URI pushUri = getUriBuilder().addPathSegment(SEGMENT_PUSHES)
.addQueryParameter(API, API_VERSION)
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/test/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClientTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/test/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClientTest.java
new file mode 100644
index 00000000000..ba6c1831de8
--- /dev/null
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/test/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClientTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.azure.devops;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.apache.nifi.registry.flow.git.client.GitCreateContentRequest;
+import org.apache.nifi.web.client.api.HttpRequestBodySpec;
+import org.apache.nifi.web.client.api.HttpRequestHeadersSpec;
+import org.apache.nifi.web.client.api.HttpRequestUriSpec;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class AzureDevOpsRepositoryClientTest {
+
+ private static final String BRANCH = "main";
+ private static final String FILE_PATH = "flows/test.json";
+ private static final String CONTENT = "{\"flow\":\"content\"}";
+ private static final String MESSAGE = "test commit";
+ private static final String EXPECTED_FILE_COMMIT = "file-commit-sha";
+ private static final String BRANCH_HEAD_SHA = "branch-head-sha";
+ private static final String NEW_COMMIT_SHA = "new-commit-sha";
+
+ @Mock
+ private WebClientServiceProvider webClientProvider;
+
+ @Mock
+ private WebClientService webClientService;
+
+ @Mock
+ private OAuth2AccessTokenProvider tokenProvider;
+
+ @Mock
+ private ComponentLog logger;
+
+ private HttpUriBuilder uriBuilder;
+ private OngoingStubbing<HttpResponseEntity> getStubbing;
+ private OngoingStubbing<HttpResponseEntity> postStubbing;
+
+ @BeforeEach
+ void setUp() {
+ final AccessToken accessToken = new AccessToken("test-token", null,
"Bearer", 3600L, null);
+
lenient().when(tokenProvider.getAccessDetails()).thenReturn(accessToken);
+
lenient().when(webClientProvider.getWebClientService()).thenReturn(webClientService);
+ }
+
+ private AzureDevOpsRepositoryClient buildClient() throws
FlowRegistryException {
+ setupUriBuilder();
+ return AzureDevOpsRepositoryClient.builder()
+ .clientId("test-client")
+ .apiUrl("https://dev.azure.com")
+ .organization("test-org")
+ .project("test-project")
+ .repoName("test-repo")
+ .oauthService(tokenProvider)
+ .webClient(webClientProvider)
+ .logger(logger)
+ .build();
+ }
+
+ private void setupUriBuilder() {
+ uriBuilder = mock(HttpUriBuilder.class);
+
lenient().when(webClientProvider.getHttpUriBuilder()).thenReturn(uriBuilder);
+ lenient().when(uriBuilder.scheme(anyString())).thenReturn(uriBuilder);
+ lenient().when(uriBuilder.host(anyString())).thenReturn(uriBuilder);
+ lenient().when(uriBuilder.port(any(int.class))).thenReturn(uriBuilder);
+
lenient().when(uriBuilder.addPathSegment(anyString())).thenReturn(uriBuilder);
+ lenient().when(uriBuilder.addQueryParameter(anyString(),
anyString())).thenReturn(uriBuilder);
+
lenient().when(uriBuilder.build()).thenReturn(URI.create("https://dev.azure.com/test"));
+ }
+
+ private void stubGetChain(final HttpResponseEntity... responses) {
+ final HttpRequestUriSpec getSpec = mock(HttpRequestUriSpec.class);
+ final HttpRequestBodySpec bodySpec = mock(HttpRequestBodySpec.class);
+ lenient().when(webClientService.get()).thenReturn(getSpec);
+ lenient().when(getSpec.uri(any(URI.class))).thenReturn(bodySpec);
+ lenient().when(bodySpec.header(anyString(),
anyString())).thenReturn(bodySpec);
+
+ getStubbing = when(bodySpec.retrieve());
+ for (final HttpResponseEntity response : responses) {
+ getStubbing = getStubbing.thenReturn(response);
+ }
+ }
+
+ private void stubPostChain(final HttpResponseEntity... responses) {
+ final HttpRequestUriSpec postSpec = mock(HttpRequestUriSpec.class);
+ final HttpRequestBodySpec postBodySpec =
mock(HttpRequestBodySpec.class);
+ final HttpRequestHeadersSpec afterBody =
mock(HttpRequestHeadersSpec.class);
+ lenient().when(webClientService.post()).thenReturn(postSpec);
+ lenient().when(postSpec.uri(any(URI.class))).thenReturn(postBodySpec);
+ lenient().when(postBodySpec.header(anyString(),
anyString())).thenReturn(postBodySpec);
+ lenient().when(postBodySpec.body(anyString())).thenReturn(afterBody);
+ lenient().when(afterBody.header(anyString(),
anyString())).thenReturn(postBodySpec);
+
+ postStubbing = when(afterBody.retrieve());
+ for (final HttpResponseEntity response : responses) {
+ postStubbing = postStubbing.thenReturn(response);
+ }
+ }
+
+ private HttpResponseEntity mockResponse(final int statusCode, final String
body) {
+ final HttpResponseEntity response = mock(HttpResponseEntity.class);
+ lenient().when(response.statusCode()).thenReturn(statusCode);
+ lenient().when(response.body()).thenReturn(new
ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)));
+ return response;
+ }
+
+ private HttpResponseEntity repoInfoResponse() {
+ return mockResponse(HttpURLConnection.HTTP_OK,
"{\"project\":{\"id\":\"proj-id\"},\"id\":\"repo-id\"}");
+ }
+
+ private HttpResponseEntity permissionsResponse() {
+ return mockResponse(HttpURLConnection.HTTP_OK, "{\"value\":[true]}");
+ }
+
+ private HttpResponseEntity contentShaResponse(final String commitId) {
+ return mockResponse(HttpURLConnection.HTTP_OK,
"{\"value\":[{\"commitId\":\"%s\"}]}".formatted(commitId));
+ }
+
+ private HttpResponseEntity contentShaNotFoundResponse() {
+ return mockResponse(HttpURLConnection.HTTP_NOT_FOUND, "{}");
+ }
+
+ private HttpResponseEntity branchHeadResponse(final String objectId) {
+ return mockResponse(HttpURLConnection.HTTP_OK,
"{\"value\":[{\"objectId\":\"%s\"}]}".formatted(objectId));
+ }
+
+ private HttpResponseEntity pushSuccessResponse(final String commitId) {
+ return mockResponse(HttpURLConnection.HTTP_CREATED,
"{\"commits\":[{\"commitId\":\"%s\"}]}".formatted(commitId));
+ }
+
+ private HttpResponseEntity conflictResponse() {
+ return mockResponse(HttpURLConnection.HTTP_CONFLICT,
"{\"message\":\"conflict\"}");
+ }
+
+ private HttpResponseEntity serverErrorResponse() {
+ return mockResponse(HttpURLConnection.HTTP_INTERNAL_ERROR,
"{\"message\":\"internal error\"}");
+ }
+
+ private GitCreateContentRequest createRequest(final String
existingContentSha, final String expectedCommitSha) {
+ final GitCreateContentRequest.Builder builder =
GitCreateContentRequest.builder()
+ .branch(BRANCH)
+ .path(FILE_PATH)
+ .content(CONTENT)
+ .message(MESSAGE);
+ if (existingContentSha != null) {
+ builder.existingContentSha(existingContentSha);
+ }
+ if (expectedCommitSha != null) {
+ builder.expectedCommitSha(expectedCommitSha);
+ }
+ return builder.build();
+ }
+
+ @Test
+ void testCreateContentSuccess() throws FlowRegistryException {
+ stubGetChain(
+ repoInfoResponse(),
+ permissionsResponse(),
+ branchHeadResponse(BRANCH_HEAD_SHA)
+ );
+ stubPostChain(pushSuccessResponse(NEW_COMMIT_SHA));
+
+ final AzureDevOpsRepositoryClient client = buildClient();
+ final String commitSha =
client.createContent(createRequest("existing-sha", null));
+ assertEquals(NEW_COMMIT_SHA, commitSha);
+ }
+
+ @Test
+ void testCreateContentRetryOn409ThenSuccess() throws FlowRegistryException
{
+ stubGetChain(
+ repoInfoResponse(),
+ permissionsResponse(),
+ contentShaResponse(EXPECTED_FILE_COMMIT),
+ branchHeadResponse(BRANCH_HEAD_SHA),
+ contentShaResponse(EXPECTED_FILE_COMMIT),
+ branchHeadResponse("new-branch-head")
+ );
+ stubPostChain(conflictResponse(), pushSuccessResponse(NEW_COMMIT_SHA));
+
+ final AzureDevOpsRepositoryClient client = buildClient();
+ final String commitSha =
client.createContent(createRequest("existing-sha", EXPECTED_FILE_COMMIT));
+ assertEquals(NEW_COMMIT_SHA, commitSha);
+ }
+
+ @Test
+ void testCreateContentFileLevelConflict() throws FlowRegistryException {
+ stubGetChain(
+ repoInfoResponse(),
+ permissionsResponse(),
+ contentShaResponse("different-commit-sha")
+ );
+
+ final AzureDevOpsRepositoryClient client = buildClient();
+ final FlowRegistryException exception =
assertThrows(FlowRegistryException.class,
+ () -> client.createContent(createRequest("existing-sha",
EXPECTED_FILE_COMMIT)));
+ assertTrue(exception.getMessage().contains("has been modified by
another commit"));
+ }
+
+ @Test
+ void testCreateContentMaxRetriesExhausted() throws FlowRegistryException {
+ stubGetChain(
+ repoInfoResponse(),
+ permissionsResponse(),
+ branchHeadResponse(BRANCH_HEAD_SHA),
+ branchHeadResponse("head-2"),
+ branchHeadResponse("head-3")
+ );
+ stubPostChain(conflictResponse(), conflictResponse(),
conflictResponse());
+
+ final AzureDevOpsRepositoryClient client = buildClient();
+ final FlowRegistryException exception =
assertThrows(FlowRegistryException.class,
+ () -> client.createContent(createRequest("existing-sha",
null)));
+ assertTrue(exception.getMessage().contains("Push failed after 3
attempts"));
+ }
+
+ @Test
+ void testCreateContentNon409ErrorNotRetried() throws FlowRegistryException
{
+ stubGetChain(
+ repoInfoResponse(),
+ permissionsResponse(),
+ branchHeadResponse(BRANCH_HEAD_SHA)
+ );
+ stubPostChain(serverErrorResponse());
+
+ final AzureDevOpsRepositoryClient client = buildClient();
+ final FlowRegistryException exception =
assertThrows(FlowRegistryException.class,
+ () -> client.createContent(createRequest("existing-sha",
null)));
+ assertTrue(exception.getMessage().contains("failed"));
+ }
+
+ @Test
+ void testCreateContentNullExpectedCommitSha() throws FlowRegistryException
{
+ stubGetChain(
+ repoInfoResponse(),
+ permissionsResponse(),
+ contentShaNotFoundResponse(),
+ branchHeadResponse(BRANCH_HEAD_SHA)
+ );
+ stubPostChain(pushSuccessResponse(NEW_COMMIT_SHA));
+
+ final AzureDevOpsRepositoryClient client = buildClient();
+ final String commitSha = client.createContent(createRequest(null,
null));
+ assertEquals(NEW_COMMIT_SHA, commitSha);
+ }
+
+ @Test
+ void testDeleteContentUsesFetchBranchHead() throws FlowRegistryException,
IOException {
+ stubGetChain(
+ repoInfoResponse(),
+ permissionsResponse(),
+ branchHeadResponse(BRANCH_HEAD_SHA)
+ );
+ final HttpResponseEntity deleteResponse =
mockResponse(HttpURLConnection.HTTP_CREATED,
+ "{\"commits\":[{\"commitId\":\"delete-commit\"}]}");
+ stubPostChain(deleteResponse);
+
+ final AzureDevOpsRepositoryClient client = buildClient();
+ try (final InputStream result = client.deleteContent(FILE_PATH,
MESSAGE, BRANCH)) {
+ final String body = new String(result.readAllBytes(),
StandardCharsets.UTF_8);
+ assertTrue(body.contains("delete-commit"));
+ }
+ }
+}