This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 19cd31b1617 NIFI-15657 Fixed 409 Conflict in Azure DevOps and 
Bitbucket for multiple Flows with shared branch (#10948)
19cd31b1617 is described below

commit 19cd31b1617ed635eab69b82ebede51008fd6e6c
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Mar 10 23:32:57 2026 +0100

    NIFI-15657 Fixed 409 Conflict in Azure DevOps and Bitbucket for multiple 
Flows with shared branch (#10948)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../bitbucket/BitbucketRepositoryClient.java       |  80 +++--
 .../bitbucket/BitbucketRepositoryClientTest.java   | 322 +++++++++++++++++++++
 .../azure/devops/AzureDevOpsRepositoryClient.java  |  93 +++---
 .../devops/AzureDevOpsRepositoryClientTest.java    | 306 ++++++++++++++++++++
 4 files changed, 744 insertions(+), 57 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java
 
b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java
index 1779663445a..8a8ea4e8e43 100644
--- 
a/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java
+++ 
b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java
@@ -90,6 +90,7 @@ public class BitbucketRepositoryClient implements 
GitRepositoryClient {
     private static final String FIELD_DISPLAY_ID = "displayId";
     private static final String FIELD_ID = "id";
     private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TARGET = "target";
     private static final String FIELD_MESSAGE_TEXT = "message";
     private static final String FIELD_AUTHOR = "author";
     private static final String FIELD_AUTHOR_TIMESTAMP = "authorTimestamp";
@@ -101,6 +102,7 @@ public class BitbucketRepositoryClient implements 
GitRepositoryClient {
     private static final String ENTRY_DIRECTORY_CLOUD = "commit_directory";
     private static final String ENTRY_FILE_DATA_CENTER = "FILE";
     private static final String ENTRY_FILE_CLOUD = "commit_file";
+    private static final int MAX_PUSH_ATTEMPTS = 3;
 
     private final ObjectMapper objectMapper = JsonMapper.builder().build();
 
@@ -427,33 +429,75 @@ public class BitbucketRepositoryClient implements 
GitRepositoryClient {
     }
 
     private String createContentCloud(final GitCreateContentRequest request, 
final String resolvedPath, final String branch) throws FlowRegistryException {
-        final StandardMultipartFormDataStreamBuilder multipartBuilder = new 
StandardMultipartFormDataStreamBuilder();
-        multipartBuilder.addPart(resolvedPath, 
StandardHttpContentType.APPLICATION_JSON, 
request.getContent().getBytes(StandardCharsets.UTF_8));
-        multipartBuilder.addPart(FIELD_MESSAGE, 
StandardHttpContentType.TEXT_PLAIN, 
request.getMessage().getBytes(StandardCharsets.UTF_8));
-        multipartBuilder.addPart(FIELD_BRANCH, 
StandardHttpContentType.TEXT_PLAIN, branch.getBytes(StandardCharsets.UTF_8));
+        final String expectedFileCommit = request.getExpectedCommitSha();
+        final URI uri = 
getRepositoryUriBuilder().addPathSegment("src").build();
 
-        // Add parents parameter for atomic commit - Bitbucket Cloud will 
reject if the branch has moved
-        final String expectedCommitSha = request.getExpectedCommitSha();
-        if (expectedCommitSha != null && !expectedCommitSha.isBlank()) {
-            multipartBuilder.addPart(FIELD_PARENTS, 
StandardHttpContentType.TEXT_PLAIN, 
expectedCommitSha.getBytes(StandardCharsets.UTF_8));
+        for (int attempt = 1; attempt <= MAX_PUSH_ATTEMPTS; attempt++) {
+            if (expectedFileCommit != null) {
+                final Optional<String> currentFileCommit = 
getLatestCommit(branch, resolvedPath);
+                if (currentFileCommit.isPresent() && 
!currentFileCommit.get().equals(expectedFileCommit)) {
+                    throw new FlowRegistryException("File [%s] has been 
modified by another commit [%s]".formatted(resolvedPath, 
currentFileCommit.get()));
+                }
+            }
+
+            final String branchHead = getBranchHeadCloud(branch);
+
+            final StandardMultipartFormDataStreamBuilder multipartBuilder = 
new StandardMultipartFormDataStreamBuilder();
+            multipartBuilder.addPart(resolvedPath, 
StandardHttpContentType.APPLICATION_JSON, 
request.getContent().getBytes(StandardCharsets.UTF_8));
+            multipartBuilder.addPart(FIELD_MESSAGE, 
StandardHttpContentType.TEXT_PLAIN, 
request.getMessage().getBytes(StandardCharsets.UTF_8));
+            multipartBuilder.addPart(FIELD_BRANCH, 
StandardHttpContentType.TEXT_PLAIN, branch.getBytes(StandardCharsets.UTF_8));
+            multipartBuilder.addPart(FIELD_PARENTS, 
StandardHttpContentType.TEXT_PLAIN, 
branchHead.getBytes(StandardCharsets.UTF_8));
+
+            final HttpResponseEntity response = 
this.webClient.getWebClientService()
+                    .post()
+                    .uri(uri)
+                    .body(multipartBuilder.build(), OptionalLong.empty())
+                    .header(AUTHORIZATION_HEADER, 
authToken.getAuthzHeaderValue())
+                    .header(CONTENT_TYPE_HEADER, 
multipartBuilder.getHttpContentType().getContentType())
+                    .retrieve();
+
+            if (response.statusCode() == HttpURLConnection.HTTP_CREATED) {
+                closeQuietly(response);
+                return getRequiredLatestCommit(branch, resolvedPath);
+            }
+
+            if (response.statusCode() == HttpURLConnection.HTTP_CONFLICT) {
+                closeQuietly(response);
+                if (attempt == MAX_PUSH_ATTEMPTS) {
+                    throw new FlowRegistryException("Push failed after %d 
attempts due to concurrent branch modifications".formatted(MAX_PUSH_ATTEMPTS));
+                }
+                logger.debug("Push attempt {} for path [{}] failed with 409 
(branch HEAD moved), retrying", attempt, resolvedPath);
+                continue;
+            }
+
+            final String errorMessage = "Error while committing content for 
repository [%s] on branch %s at path %s".formatted(repoName, branch, 
resolvedPath);
+            try {
+                throw new FlowRegistryException("%s: 
%s".formatted(errorMessage, getErrorMessage(response)));
+            } finally {
+                closeQuietly(response);
+            }
         }
 
-        final URI uri = 
getRepositoryUriBuilder().addPathSegment("src").build();
-        final String errorMessage = "Error while committing content for 
repository [%s] on branch %s at path %s"
-                .formatted(repoName, branch, resolvedPath);
+        throw new FlowRegistryException("Push failed after %d attempts due to 
concurrent branch modifications".formatted(MAX_PUSH_ATTEMPTS));
+    }
+
+    private String getBranchHeadCloud(final String branch) throws 
FlowRegistryException {
+        final HttpUriBuilder builder = 
getRepositoryUriBuilder().addPathSegment("refs").addPathSegment("branches");
+        addPathSegments(builder, branch);
+        final URI uri = builder.build();
+
         try (final HttpResponseEntity response = 
this.webClient.getWebClientService()
-                .post()
+                .get()
                 .uri(uri)
-                .body(multipartBuilder.build(), OptionalLong.empty())
                 .header(AUTHORIZATION_HEADER, authToken.getAuthzHeaderValue())
-                .header(CONTENT_TYPE_HEADER, 
multipartBuilder.getHttpContentType().getContentType())
                 .retrieve()) {
-            verifyStatusCode(response, errorMessage, 
HttpURLConnection.HTTP_CREATED);
+
+            verifyStatusCode(response, "Error while fetching branch HEAD for 
branch [%s] in repository [%s]".formatted(branch, repoName), 
HttpURLConnection.HTTP_OK);
+            final JsonNode jsonResponse = parseResponseBody(response, uri);
+            return jsonResponse.get(FIELD_TARGET).get(FIELD_HASH).asText();
         } catch (final IOException e) {
-            throw new FlowRegistryException("Failed closing Bitbucket create 
content response", e);
+            throw new FlowRegistryException("Failed closing Bitbucket branch 
HEAD response", e);
         }
-
-        return getRequiredLatestCommit(branch, resolvedPath);
     }
 
     private String createContentDataCenter(final GitCreateContentRequest 
request, final String resolvedPath, final String branch) throws 
FlowRegistryException {
diff --git 
a/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/test/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClientTest.java
 
b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/test/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClientTest.java
new file mode 100644
index 00000000000..0730b4352f9
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/test/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClientTest.java
@@ -0,0 +1,322 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.atlassian.bitbucket;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.apache.nifi.registry.flow.git.client.GitCreateContentRequest;
+import org.apache.nifi.web.client.api.HttpRequestBodySpec;
+import org.apache.nifi.web.client.api.HttpRequestHeadersSpec;
+import org.apache.nifi.web.client.api.HttpRequestUriSpec;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.OptionalLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class BitbucketRepositoryClientTest {
+
+    private static final String BRANCH = "main";
+    private static final String FILE_PATH = "flows/test.json";
+    private static final String CONTENT = "{\"flow\":\"content\"}";
+    private static final String MESSAGE = "test commit";
+    private static final String EXPECTED_FILE_COMMIT = "file-commit-sha";
+    private static final String BRANCH_HEAD_SHA = "branch-head-sha";
+    private static final String RESULT_COMMIT_SHA = "result-commit-sha";
+
+    @Mock
+    private WebClientServiceProvider webClientProvider;
+
+    @Mock
+    private WebClientService webClientService;
+
+    @Mock
+    private ComponentLog logger;
+
+    private HttpUriBuilder uriBuilder;
+
+    private HttpRequestBodySpec getBodySpec;
+    private HttpRequestBodySpec postAfterHeaders;
+
+    @BeforeEach
+    void setUp() {
+        
lenient().when(webClientProvider.getWebClientService()).thenReturn(webClientService);
+        setupUriBuilder();
+    }
+
+    private void setupUriBuilder() {
+        uriBuilder = mock(HttpUriBuilder.class);
+        
lenient().when(webClientProvider.getHttpUriBuilder()).thenReturn(uriBuilder);
+        lenient().when(uriBuilder.scheme(anyString())).thenReturn(uriBuilder);
+        lenient().when(uriBuilder.host(anyString())).thenReturn(uriBuilder);
+        lenient().when(uriBuilder.port(any(int.class))).thenReturn(uriBuilder);
+        
lenient().when(uriBuilder.addPathSegment(anyString())).thenReturn(uriBuilder);
+        lenient().when(uriBuilder.addQueryParameter(anyString(), 
anyString())).thenReturn(uriBuilder);
+        
lenient().when(uriBuilder.build()).thenReturn(URI.create("https://api.bitbucket.org/test";));
+    }
+
+    private BitbucketRepositoryClient buildCloudClient() throws 
FlowRegistryException {
+        return BitbucketRepositoryClient.builder()
+                .clientId("test-client")
+                .apiUrl("https://api.bitbucket.org";)
+                .formFactor(BitbucketFormFactor.CLOUD)
+                .authenticationType(BitbucketAuthenticationType.ACCESS_TOKEN)
+                .accessToken("test-token")
+                .workspace("test-workspace")
+                .repoName("test-repo")
+                .webClient(webClientProvider)
+                .logger(logger)
+                .build();
+    }
+
+    private void stubGetChain(final HttpResponseEntity... responses) {
+        final HttpRequestUriSpec getSpec = mock(HttpRequestUriSpec.class);
+        getBodySpec = mock(HttpRequestBodySpec.class);
+        lenient().when(webClientService.get()).thenReturn(getSpec);
+        lenient().when(getSpec.uri(any(URI.class))).thenReturn(getBodySpec);
+        lenient().when(getBodySpec.header(anyString(), 
anyString())).thenReturn(getBodySpec);
+
+        OngoingStubbing<HttpResponseEntity> stubbing = 
when(getBodySpec.retrieve());
+        for (final HttpResponseEntity response : responses) {
+            stubbing = stubbing.thenReturn(response);
+        }
+    }
+
+    private void stubPostChain(final HttpResponseEntity... responses) {
+        final HttpRequestUriSpec postSpec = mock(HttpRequestUriSpec.class);
+        final HttpRequestBodySpec postBodySpec = 
mock(HttpRequestBodySpec.class);
+        final HttpRequestHeadersSpec afterBody = 
mock(HttpRequestHeadersSpec.class);
+        postAfterHeaders = mock(HttpRequestBodySpec.class);
+        lenient().when(webClientService.post()).thenReturn(postSpec);
+        lenient().when(postSpec.uri(any(URI.class))).thenReturn(postBodySpec);
+        lenient().when(postBodySpec.body(any(InputStream.class), 
any(OptionalLong.class))).thenReturn(afterBody);
+        lenient().when(afterBody.header(anyString(), 
anyString())).thenReturn(postAfterHeaders);
+        lenient().when(postAfterHeaders.header(anyString(), 
anyString())).thenReturn(postAfterHeaders);
+
+        OngoingStubbing<HttpResponseEntity> stubbing = 
when(postAfterHeaders.retrieve());
+        for (final HttpResponseEntity response : responses) {
+            stubbing = stubbing.thenReturn(response);
+        }
+    }
+
+    private HttpResponseEntity mockResponse(final int statusCode, final String 
body) {
+        final HttpResponseEntity response = mock(HttpResponseEntity.class);
+        lenient().when(response.statusCode()).thenReturn(statusCode);
+        lenient().when(response.body()).thenReturn(new 
ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)));
+        return response;
+    }
+
+    private HttpResponseEntity branchListResponse() {
+        return mockResponse(HttpURLConnection.HTTP_OK, 
"{\"values\":[{\"name\":\"main\"}]}");
+    }
+
+    private HttpResponseEntity branchHeadResponse(final String hash) {
+        return mockResponse(HttpURLConnection.HTTP_OK, 
"{\"target\":{\"hash\":\"%s\"}}".formatted(hash));
+    }
+
+    private HttpResponseEntity commitsCheckResponse() {
+        return mockResponse(HttpURLConnection.HTTP_OK, 
"{\"values\":[{\"hash\":\"some-hash\"}]}");
+    }
+
+    private HttpResponseEntity commitsForPathResponse(final String hash) {
+        return mockResponse(HttpURLConnection.HTTP_OK, 
"{\"values\":[{\"hash\":\"%s\"}]}".formatted(hash));
+    }
+
+    private HttpResponseEntity conflictResponse() {
+        return mockResponse(HttpURLConnection.HTTP_CONFLICT, 
"{\"error\":{\"message\":\"conflict\"}}");
+    }
+
+    private HttpResponseEntity serverErrorResponse() {
+        return mockResponse(HttpURLConnection.HTTP_INTERNAL_ERROR, 
"{\"error\":{\"message\":\"internal error\"}}");
+    }
+
+    private HttpResponseEntity createdResponse() {
+        return mockResponse(HttpURLConnection.HTTP_CREATED, "{}");
+    }
+
+    private GitCreateContentRequest createRequest(final String 
existingContentSha, final String expectedCommitSha) {
+        final GitCreateContentRequest.Builder builder = 
GitCreateContentRequest.builder()
+                .branch(BRANCH)
+                .path(FILE_PATH)
+                .content(CONTENT)
+                .message(MESSAGE);
+        if (existingContentSha != null) {
+            builder.existingContentSha(existingContentSha);
+        }
+        if (expectedCommitSha != null) {
+            builder.expectedCommitSha(expectedCommitSha);
+        }
+        return builder.build();
+    }
+
+    @Test
+    void testCreateContentCloudSuccess() throws FlowRegistryException {
+        stubGetChain(
+                branchListResponse(),
+                branchHeadResponse(BRANCH_HEAD_SHA),
+                commitsCheckResponse(),
+                commitsForPathResponse(RESULT_COMMIT_SHA)
+        );
+        stubPostChain(createdResponse());
+
+        final BitbucketRepositoryClient client = buildCloudClient();
+        final String commitSha = 
client.createContent(createRequest("existing-sha", null));
+        assertEquals(RESULT_COMMIT_SHA, commitSha);
+    }
+
+    @Test
+    void testCreateContentCloudRetryOn409() throws FlowRegistryException {
+        stubGetChain(
+                branchListResponse(),
+                branchHeadResponse(BRANCH_HEAD_SHA),
+                branchHeadResponse("new-head"),
+                commitsCheckResponse(),
+                commitsForPathResponse(RESULT_COMMIT_SHA)
+        );
+        stubPostChain(conflictResponse(), createdResponse());
+
+        final BitbucketRepositoryClient client = buildCloudClient();
+        final String commitSha = 
client.createContent(createRequest("existing-sha", null));
+        assertEquals(RESULT_COMMIT_SHA, commitSha);
+    }
+
+    @Test
+    void testCreateContentCloudFileLevelConflict() throws 
FlowRegistryException {
+        stubGetChain(
+                branchListResponse(),
+                commitsCheckResponse(),
+                commitsForPathResponse("different-sha")
+        );
+
+        final BitbucketRepositoryClient client = buildCloudClient();
+        final FlowRegistryException exception = 
assertThrows(FlowRegistryException.class,
+                () -> client.createContent(createRequest("existing-sha", 
EXPECTED_FILE_COMMIT)));
+        assertTrue(exception.getMessage().contains("has been modified by 
another commit"));
+    }
+
+    @Test
+    void testCreateContentCloudMaxRetriesExhausted() throws 
FlowRegistryException {
+        stubGetChain(
+                branchListResponse(),
+                branchHeadResponse("head-1"),
+                branchHeadResponse("head-2"),
+                branchHeadResponse("head-3")
+        );
+        stubPostChain(conflictResponse(), conflictResponse(), 
conflictResponse());
+
+        final BitbucketRepositoryClient client = buildCloudClient();
+        final FlowRegistryException exception = 
assertThrows(FlowRegistryException.class,
+                () -> client.createContent(createRequest("existing-sha", 
null)));
+        assertTrue(exception.getMessage().contains("Push failed after 3 
attempts"));
+    }
+
+    @Test
+    void testCreateContentCloudNon409ErrorNotRetried() throws 
FlowRegistryException {
+        stubGetChain(
+                branchListResponse(),
+                branchHeadResponse(BRANCH_HEAD_SHA)
+        );
+        stubPostChain(serverErrorResponse());
+
+        final BitbucketRepositoryClient client = buildCloudClient();
+        assertThrows(FlowRegistryException.class,
+                () -> client.createContent(createRequest("existing-sha", 
null)));
+    }
+
+    @Test
+    void testCreateContentCloudNullExpectedCommitSha() throws 
FlowRegistryException {
+        stubGetChain(
+                branchListResponse(),
+                branchHeadResponse(BRANCH_HEAD_SHA),
+                commitsCheckResponse(),
+                commitsForPathResponse(RESULT_COMMIT_SHA)
+        );
+        stubPostChain(createdResponse());
+
+        final BitbucketRepositoryClient client = buildCloudClient();
+        final String commitSha = client.createContent(createRequest(null, 
null));
+        assertEquals(RESULT_COMMIT_SHA, commitSha);
+    }
+
+    @Test
+    void testCreateContentDataCenterUnchanged() throws FlowRegistryException {
+        final HttpRequestUriSpec getSpec = mock(HttpRequestUriSpec.class);
+        final HttpRequestBodySpec dcGetBody = mock(HttpRequestBodySpec.class);
+        lenient().when(webClientService.get()).thenReturn(getSpec);
+        lenient().when(getSpec.uri(any(URI.class))).thenReturn(dcGetBody);
+        lenient().when(dcGetBody.header(anyString(), 
anyString())).thenReturn(dcGetBody);
+
+        final HttpResponseEntity branchesResp = 
mockResponse(HttpURLConnection.HTTP_OK,
+                "{\"values\":[{\"displayId\":\"main\"}],\"isLastPage\":true}");
+        final HttpResponseEntity commitsCheckResp = 
mockResponse(HttpURLConnection.HTTP_OK,
+                "{\"values\":[{\"id\":\"c1\"}],\"isLastPage\":true}");
+        final HttpResponseEntity commitsResp = 
mockResponse(HttpURLConnection.HTTP_OK,
+                
"{\"values\":[{\"id\":\"%s\"}],\"isLastPage\":true}".formatted(RESULT_COMMIT_SHA));
+        when(dcGetBody.retrieve()).thenReturn(branchesResp, commitsCheckResp, 
commitsResp);
+
+        final HttpRequestUriSpec putSpec = mock(HttpRequestUriSpec.class);
+        final HttpRequestBodySpec putBodySpec = 
mock(HttpRequestBodySpec.class);
+        final HttpRequestHeadersSpec putAfterBody = 
mock(HttpRequestHeadersSpec.class);
+        final HttpRequestBodySpec putAfterHeaders = 
mock(HttpRequestBodySpec.class);
+        lenient().when(webClientService.put()).thenReturn(putSpec);
+        lenient().when(putSpec.uri(any(URI.class))).thenReturn(putBodySpec);
+        lenient().when(putBodySpec.body(any(InputStream.class), 
any(OptionalLong.class))).thenReturn(putAfterBody);
+        lenient().when(putAfterBody.header(anyString(), 
anyString())).thenReturn(putAfterHeaders);
+        lenient().when(putAfterHeaders.header(anyString(), 
anyString())).thenReturn(putAfterHeaders);
+        final HttpResponseEntity putResponse = 
mockResponse(HttpURLConnection.HTTP_OK, "{}");
+        lenient().when(putAfterHeaders.retrieve()).thenReturn(putResponse);
+
+        final BitbucketRepositoryClient dcClient = 
BitbucketRepositoryClient.builder()
+                .clientId("test-client")
+                .apiUrl("https://bitbucket.example.com";)
+                .formFactor(BitbucketFormFactor.DATA_CENTER)
+                .authenticationType(BitbucketAuthenticationType.ACCESS_TOKEN)
+                .accessToken("test-token")
+                .projectKey("TEST")
+                .repoName("test-repo")
+                .webClient(webClientProvider)
+                .logger(logger)
+                .build();
+
+        final GitCreateContentRequest request = createRequest(null, 
EXPECTED_FILE_COMMIT);
+        final String commitSha = dcClient.createContent(request);
+        assertEquals(RESULT_COMMIT_SHA, commitSha);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java
index 88091ff978e..dc6859317c0 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java
@@ -120,6 +120,7 @@ public class AzureDevOpsRepositoryClient implements 
GitRepositoryClient {
     private static final String CHANGE_TYPE_EDIT = "edit";
     private static final String CHANGE_TYPE_DELETE = "delete";
     private static final String CONTENT_TYPE_BASE64 = "base64encoded";
+    private static final int MAX_PUSH_ATTEMPTS = 3;
 
     // Common query parameter names and values
     private static final String VERSION_DESCRIPTOR_VERSION = 
"versionDescriptor.version";
@@ -360,25 +361,7 @@ public class AzureDevOpsRepositoryClient implements 
GitRepositoryClient {
         final String message = request.getMessage();
         logger.debug("Creating content at path [{}] on branch [{}] in repo 
[{}]", path, branch, repoName);
 
-        // Use expectedCommitSha for atomic commit if provided, otherwise 
fetch current branch HEAD
-        // Azure DevOps will reject the push if oldObjectId doesn't match the 
current branch HEAD
-        final String oldObjectId;
-        final String expectedCommitSha = request.getExpectedCommitSha();
-        if (expectedCommitSha != null && !expectedCommitSha.isBlank()) {
-            oldObjectId = expectedCommitSha;
-        } else {
-            // Fall back to fetching current branch commit id
-            final URI refUri = getUriBuilder().addPathSegment(SEGMENT_REFS)
-                    .addQueryParameter(PARAM_FILTER, FILTER_HEADS_PREFIX + 
branch)
-                    .addQueryParameter(API, API_VERSION)
-                    .build();
-            final JsonNode refResponse = executeGet(refUri);
-            oldObjectId = 
refResponse.get(JSON_FIELD_VALUE).get(0).get(JSON_FIELD_OBJECT_ID).asText();
-        }
-
-        final URI pushUri = getUriBuilder().addPathSegment(SEGMENT_PUSHES)
-                .addQueryParameter(API, API_VERSION)
-                .build();
+        final String expectedFileCommit = request.getExpectedCommitSha();
 
         final String changeType;
         if (request.getExistingContentSha() == null) {
@@ -389,11 +372,59 @@ public class AzureDevOpsRepositoryClient implements 
GitRepositoryClient {
         }
 
         final String encoded = 
Base64.getEncoder().encodeToString(request.getContent().getBytes(StandardCharsets.UTF_8));
+        final URI pushUri = getUriBuilder().addPathSegment(SEGMENT_PUSHES)
+                .addQueryParameter(API, API_VERSION)
+                .build();
+
+        for (int attempt = 1; attempt <= MAX_PUSH_ATTEMPTS; attempt++) {
+            if (expectedFileCommit != null) {
+                final Optional<String> currentFileCommit = 
getContentSha(request.getPath(), branch);
+                if (currentFileCommit.isPresent() && 
!currentFileCommit.get().equals(expectedFileCommit)) {
+                    throw new FlowRegistryException("File [%s] has been 
modified by another commit [%s]".formatted(path, currentFileCommit.get()));
+                }
+            }
 
+            final String branchHead = fetchBranchHead(branch);
+            final HttpResponseEntity response = executePush(pushUri, branch, 
branchHead, encoded, message, changeType, path);
+
+            if (response.statusCode() == HttpURLConnection.HTTP_CREATED) {
+                try {
+                    final JsonNode pushResponse = 
MAPPER.readTree(response.body());
+                    return 
pushResponse.get(SEGMENT_COMMITS).get(0).get(JSON_FIELD_COMMIT_ID).asText();
+                } catch (final IOException e) {
+                    throw new FlowRegistryException("Failed to parse push 
response from [%s]".formatted(pushUri), e);
+                }
+            }
+
+            if (response.statusCode() == HttpURLConnection.HTTP_CONFLICT) {
+                if (attempt == MAX_PUSH_ATTEMPTS) {
+                    throw new FlowRegistryException("Push failed after %d 
attempts due to concurrent branch modifications".formatted(MAX_PUSH_ATTEMPTS));
+                }
+                logger.debug("Push attempt {} for path [{}] failed with 409 
(branch HEAD moved), retrying", attempt, path);
+                continue;
+            }
+
+            throw new FlowRegistryException("Request to %s failed - 
%s".formatted(pushUri, getErrorMessage(response)));
+        }
+
+        throw new FlowRegistryException("Push failed after %d attempts due to 
concurrent branch modifications".formatted(MAX_PUSH_ATTEMPTS));
+    }
+
+    private String fetchBranchHead(final String branch) throws 
FlowRegistryException {
+        final URI refUri = getUriBuilder().addPathSegment(SEGMENT_REFS)
+                .addQueryParameter(PARAM_FILTER, FILTER_HEADS_PREFIX + branch)
+                .addQueryParameter(API, API_VERSION)
+                .build();
+        final JsonNode refResponse = executeGet(refUri);
+        return 
refResponse.get(JSON_FIELD_VALUE).get(0).get(JSON_FIELD_OBJECT_ID).asText();
+    }
+
+    private HttpResponseEntity executePush(final URI pushUri, final String 
branch, final String oldObjectId,
+                                           final String encodedContent, final 
String message,
+                                           final String changeType, final 
String path) throws FlowRegistryException {
         final PushRequest pushRequest = new PushRequest(
                 List.of(new RefUpdate(REFS_HEADS_PREFIX + branch, 
oldObjectId)),
-                List.of(new Commit(message,
-                        List.of(new Change(changeType, new Item(path), new 
NewContent(encoded, CONTENT_TYPE_BASE64)))))
+                List.of(new Commit(message, List.of(new Change(changeType, new 
Item(path), new NewContent(encodedContent, CONTENT_TYPE_BASE64)))))
         );
 
         final String json;
@@ -403,36 +434,20 @@ public class AzureDevOpsRepositoryClient implements 
GitRepositoryClient {
             throw new FlowRegistryException("Failed to serialize push 
request", e);
         }
 
-        final HttpResponseEntity response = 
this.webClient.getWebClientService()
+        return this.webClient.getWebClientService()
                 .post()
                 .uri(pushUri)
                 .header(AUTHORIZATION_HEADER, bearerToken())
                 .header(CONTENT_TYPE_HEADER, 
MediaType.APPLICATION_JSON.getMediaType())
                 .body(json)
                 .retrieve();
-
-        if (response.statusCode() != HttpURLConnection.HTTP_CREATED) {
-            throw new FlowRegistryException("Request to %s failed - 
%s".formatted(pushUri, getErrorMessage(response)));
-        }
-
-        try {
-            final JsonNode pushResponse = MAPPER.readTree(response.body());
-            return 
pushResponse.get(SEGMENT_COMMITS).get(0).get(JSON_FIELD_COMMIT_ID).asText();
-        } catch (IOException e) {
-            throw new FlowRegistryException("Failed to create content", e);
-        }
     }
 
     @Override
     public InputStream deleteContent(final String filePath, final String 
commitMessage, final String branch) throws FlowRegistryException, IOException {
         final String path = getResolvedPath(filePath);
         logger.debug("Deleting file [{}] in repo [{}] on branch [{}]", path, 
repoName, branch);
-        final URI refUri = getUriBuilder().addPathSegment(SEGMENT_REFS)
-                .addQueryParameter(PARAM_FILTER, FILTER_HEADS_PREFIX + branch)
-                .addQueryParameter(API, API_VERSION)
-                .build();
-        final JsonNode refResponse = executeGet(refUri);
-        final String oldObjectId = 
refResponse.get(JSON_FIELD_VALUE).get(0).get(JSON_FIELD_OBJECT_ID).asText();
+        final String oldObjectId = fetchBranchHead(branch);
 
         final URI pushUri = getUriBuilder().addPathSegment(SEGMENT_PUSHES)
                 .addQueryParameter(API, API_VERSION)
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/test/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClientTest.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/test/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClientTest.java
new file mode 100644
index 00000000000..ba6c1831de8
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/test/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClientTest.java
@@ -0,0 +1,306 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.nifi.azure.devops;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.apache.nifi.registry.flow.git.client.GitCreateContentRequest;
+import org.apache.nifi.web.client.api.HttpRequestBodySpec;
+import org.apache.nifi.web.client.api.HttpRequestHeadersSpec;
+import org.apache.nifi.web.client.api.HttpRequestUriSpec;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class AzureDevOpsRepositoryClientTest {
+
+    private static final String BRANCH = "main";
+    private static final String FILE_PATH = "flows/test.json";
+    private static final String CONTENT = "{\"flow\":\"content\"}";
+    private static final String MESSAGE = "test commit";
+    private static final String EXPECTED_FILE_COMMIT = "file-commit-sha";
+    private static final String BRANCH_HEAD_SHA = "branch-head-sha";
+    private static final String NEW_COMMIT_SHA = "new-commit-sha";
+
+    @Mock
+    private WebClientServiceProvider webClientProvider;
+
+    @Mock
+    private WebClientService webClientService;
+
+    @Mock
+    private OAuth2AccessTokenProvider tokenProvider;
+
+    @Mock
+    private ComponentLog logger;
+
+    private HttpUriBuilder uriBuilder;
+    private OngoingStubbing<HttpResponseEntity> getStubbing;
+    private OngoingStubbing<HttpResponseEntity> postStubbing;
+
+    @BeforeEach
+    void setUp() {
+        final AccessToken accessToken = new AccessToken("test-token", null, 
"Bearer", 3600L, null);
+        
lenient().when(tokenProvider.getAccessDetails()).thenReturn(accessToken);
+        
lenient().when(webClientProvider.getWebClientService()).thenReturn(webClientService);
+    }
+
+    private AzureDevOpsRepositoryClient buildClient() throws 
FlowRegistryException {
+        setupUriBuilder();
+        return AzureDevOpsRepositoryClient.builder()
+                .clientId("test-client")
+                .apiUrl("https://dev.azure.com";)
+                .organization("test-org")
+                .project("test-project")
+                .repoName("test-repo")
+                .oauthService(tokenProvider)
+                .webClient(webClientProvider)
+                .logger(logger)
+                .build();
+    }
+
+    private void setupUriBuilder() {
+        uriBuilder = mock(HttpUriBuilder.class);
+        
lenient().when(webClientProvider.getHttpUriBuilder()).thenReturn(uriBuilder);
+        lenient().when(uriBuilder.scheme(anyString())).thenReturn(uriBuilder);
+        lenient().when(uriBuilder.host(anyString())).thenReturn(uriBuilder);
+        lenient().when(uriBuilder.port(any(int.class))).thenReturn(uriBuilder);
+        
lenient().when(uriBuilder.addPathSegment(anyString())).thenReturn(uriBuilder);
+        lenient().when(uriBuilder.addQueryParameter(anyString(), 
anyString())).thenReturn(uriBuilder);
+        
lenient().when(uriBuilder.build()).thenReturn(URI.create("https://dev.azure.com/test";));
+    }
+
+    private void stubGetChain(final HttpResponseEntity... responses) {
+        final HttpRequestUriSpec getSpec = mock(HttpRequestUriSpec.class);
+        final HttpRequestBodySpec bodySpec = mock(HttpRequestBodySpec.class);
+        lenient().when(webClientService.get()).thenReturn(getSpec);
+        lenient().when(getSpec.uri(any(URI.class))).thenReturn(bodySpec);
+        lenient().when(bodySpec.header(anyString(), 
anyString())).thenReturn(bodySpec);
+
+        getStubbing = when(bodySpec.retrieve());
+        for (final HttpResponseEntity response : responses) {
+            getStubbing = getStubbing.thenReturn(response);
+        }
+    }
+
+    private void stubPostChain(final HttpResponseEntity... responses) {
+        final HttpRequestUriSpec postSpec = mock(HttpRequestUriSpec.class);
+        final HttpRequestBodySpec postBodySpec = 
mock(HttpRequestBodySpec.class);
+        final HttpRequestHeadersSpec afterBody = 
mock(HttpRequestHeadersSpec.class);
+        lenient().when(webClientService.post()).thenReturn(postSpec);
+        lenient().when(postSpec.uri(any(URI.class))).thenReturn(postBodySpec);
+        lenient().when(postBodySpec.header(anyString(), 
anyString())).thenReturn(postBodySpec);
+        lenient().when(postBodySpec.body(anyString())).thenReturn(afterBody);
+        lenient().when(afterBody.header(anyString(), 
anyString())).thenReturn(postBodySpec);
+
+        postStubbing = when(afterBody.retrieve());
+        for (final HttpResponseEntity response : responses) {
+            postStubbing = postStubbing.thenReturn(response);
+        }
+    }
+
+    private HttpResponseEntity mockResponse(final int statusCode, final String 
body) {
+        final HttpResponseEntity response = mock(HttpResponseEntity.class);
+        lenient().when(response.statusCode()).thenReturn(statusCode);
+        lenient().when(response.body()).thenReturn(new 
ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)));
+        return response;
+    }
+
+    private HttpResponseEntity repoInfoResponse() {
+        return mockResponse(HttpURLConnection.HTTP_OK, 
"{\"project\":{\"id\":\"proj-id\"},\"id\":\"repo-id\"}");
+    }
+
+    private HttpResponseEntity permissionsResponse() {
+        return mockResponse(HttpURLConnection.HTTP_OK, "{\"value\":[true]}");
+    }
+
+    private HttpResponseEntity contentShaResponse(final String commitId) {
+        return mockResponse(HttpURLConnection.HTTP_OK, 
"{\"value\":[{\"commitId\":\"%s\"}]}".formatted(commitId));
+    }
+
+    private HttpResponseEntity contentShaNotFoundResponse() {
+        return mockResponse(HttpURLConnection.HTTP_NOT_FOUND, "{}");
+    }
+
+    private HttpResponseEntity branchHeadResponse(final String objectId) {
+        return mockResponse(HttpURLConnection.HTTP_OK, 
"{\"value\":[{\"objectId\":\"%s\"}]}".formatted(objectId));
+    }
+
+    private HttpResponseEntity pushSuccessResponse(final String commitId) {
+        return mockResponse(HttpURLConnection.HTTP_CREATED, 
"{\"commits\":[{\"commitId\":\"%s\"}]}".formatted(commitId));
+    }
+
+    private HttpResponseEntity conflictResponse() {
+        return mockResponse(HttpURLConnection.HTTP_CONFLICT, 
"{\"message\":\"conflict\"}");
+    }
+
+    private HttpResponseEntity serverErrorResponse() {
+        return mockResponse(HttpURLConnection.HTTP_INTERNAL_ERROR, 
"{\"message\":\"internal error\"}");
+    }
+
+    private GitCreateContentRequest createRequest(final String 
existingContentSha, final String expectedCommitSha) {
+        final GitCreateContentRequest.Builder builder = 
GitCreateContentRequest.builder()
+                .branch(BRANCH)
+                .path(FILE_PATH)
+                .content(CONTENT)
+                .message(MESSAGE);
+        if (existingContentSha != null) {
+            builder.existingContentSha(existingContentSha);
+        }
+        if (expectedCommitSha != null) {
+            builder.expectedCommitSha(expectedCommitSha);
+        }
+        return builder.build();
+    }
+
+    @Test
+    void testCreateContentSuccess() throws FlowRegistryException {
+        stubGetChain(
+                repoInfoResponse(),
+                permissionsResponse(),
+                branchHeadResponse(BRANCH_HEAD_SHA)
+        );
+        stubPostChain(pushSuccessResponse(NEW_COMMIT_SHA));
+
+        final AzureDevOpsRepositoryClient client = buildClient();
+        final String commitSha = 
client.createContent(createRequest("existing-sha", null));
+        assertEquals(NEW_COMMIT_SHA, commitSha);
+    }
+
+    @Test
+    void testCreateContentRetryOn409ThenSuccess() throws FlowRegistryException 
{
+        stubGetChain(
+                repoInfoResponse(),
+                permissionsResponse(),
+                contentShaResponse(EXPECTED_FILE_COMMIT),
+                branchHeadResponse(BRANCH_HEAD_SHA),
+                contentShaResponse(EXPECTED_FILE_COMMIT),
+                branchHeadResponse("new-branch-head")
+        );
+        stubPostChain(conflictResponse(), pushSuccessResponse(NEW_COMMIT_SHA));
+
+        final AzureDevOpsRepositoryClient client = buildClient();
+        final String commitSha = 
client.createContent(createRequest("existing-sha", EXPECTED_FILE_COMMIT));
+        assertEquals(NEW_COMMIT_SHA, commitSha);
+    }
+
+    @Test
+    void testCreateContentFileLevelConflict() throws FlowRegistryException {
+        stubGetChain(
+                repoInfoResponse(),
+                permissionsResponse(),
+                contentShaResponse("different-commit-sha")
+        );
+
+        final AzureDevOpsRepositoryClient client = buildClient();
+        final FlowRegistryException exception = 
assertThrows(FlowRegistryException.class,
+                () -> client.createContent(createRequest("existing-sha", 
EXPECTED_FILE_COMMIT)));
+        assertTrue(exception.getMessage().contains("has been modified by 
another commit"));
+    }
+
+    @Test
+    void testCreateContentMaxRetriesExhausted() throws FlowRegistryException {
+        stubGetChain(
+                repoInfoResponse(),
+                permissionsResponse(),
+                branchHeadResponse(BRANCH_HEAD_SHA),
+                branchHeadResponse("head-2"),
+                branchHeadResponse("head-3")
+        );
+        stubPostChain(conflictResponse(), conflictResponse(), 
conflictResponse());
+
+        final AzureDevOpsRepositoryClient client = buildClient();
+        final FlowRegistryException exception = 
assertThrows(FlowRegistryException.class,
+                () -> client.createContent(createRequest("existing-sha", 
null)));
+        assertTrue(exception.getMessage().contains("Push failed after 3 
attempts"));
+    }
+
+    @Test
+    void testCreateContentNon409ErrorNotRetried() throws FlowRegistryException 
{
+        stubGetChain(
+                repoInfoResponse(),
+                permissionsResponse(),
+                branchHeadResponse(BRANCH_HEAD_SHA)
+        );
+        stubPostChain(serverErrorResponse());
+
+        final AzureDevOpsRepositoryClient client = buildClient();
+        final FlowRegistryException exception = 
assertThrows(FlowRegistryException.class,
+                () -> client.createContent(createRequest("existing-sha", 
null)));
+        assertTrue(exception.getMessage().contains("failed"));
+    }
+
+    @Test
+    void testCreateContentNullExpectedCommitSha() throws FlowRegistryException 
{
+        stubGetChain(
+                repoInfoResponse(),
+                permissionsResponse(),
+                contentShaNotFoundResponse(),
+                branchHeadResponse(BRANCH_HEAD_SHA)
+        );
+        stubPostChain(pushSuccessResponse(NEW_COMMIT_SHA));
+
+        final AzureDevOpsRepositoryClient client = buildClient();
+        final String commitSha = client.createContent(createRequest(null, 
null));
+        assertEquals(NEW_COMMIT_SHA, commitSha);
+    }
+
+    @Test
+    void testDeleteContentUsesFetchBranchHead() throws FlowRegistryException, 
IOException {
+        stubGetChain(
+                repoInfoResponse(),
+                permissionsResponse(),
+                branchHeadResponse(BRANCH_HEAD_SHA)
+        );
+        final HttpResponseEntity deleteResponse = 
mockResponse(HttpURLConnection.HTTP_CREATED,
+                "{\"commits\":[{\"commitId\":\"delete-commit\"}]}");
+        stubPostChain(deleteResponse);
+
+        final AzureDevOpsRepositoryClient client = buildClient();
+        try (final InputStream result = client.deleteContent(FILE_PATH, 
MESSAGE, BRANCH)) {
+            final String body = new String(result.readAllBytes(), 
StandardCharsets.UTF_8);
+            assertTrue(body.contains("delete-commit"));
+        }
+    }
+}


Reply via email to