This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a9e246956c NIFI-12936 ListGCSBucket resets its tracking state after 
configuration change
a9e246956c is described below

commit a9e246956cee822e407b20e45930e05c3fd72de6
Author: Peter Turcsanyi <turcsa...@apache.org>
AuthorDate: Fri Mar 22 22:32:27 2024 +0100

    NIFI-12936 ListGCSBucket resets its tracking state after configuration 
change
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #8550.
---
 .../nifi/processors/gcp/storage/ListGCSBucket.java |  60 ++-
 .../processors/gcp/storage/ListGCSBucketTest.java  | 451 ++++++++++++---------
 2 files changed, 298 insertions(+), 213 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index aed69ba1ee..97c6783a07 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -278,40 +278,62 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         return RELATIONSHIPS;
     }
 
-    // State tracking
+    private static final Set<PropertyDescriptor> TRACKING_RESET_PROPERTIES = 
Set.of(
+            BUCKET,
+            PREFIX,
+            LISTING_STRATEGY
+    );
+
+    // used by Tracking Timestamps tracking strategy
     public static final String CURRENT_TIMESTAMP = "currentTimestamp";
     public static final String CURRENT_KEY_PREFIX = "key-";
     private volatile long currentTimestamp = 0L;
     private final Set<String> currentKeys = Collections.synchronizedSet(new 
HashSet<>());
 
-    private volatile boolean justElectedPrimaryNode = false;
-    private volatile boolean resetEntityTrackingState = false;
+    // used by Tracking Entities tracking strategy
     private volatile ListedEntityTracker<ListableBlob> listedEntityTracker;
 
+    private volatile boolean justElectedPrimaryNode = false;
+    private volatile boolean resetTracking = false;
+
     @OnPrimaryNodeStateChange
     public void onPrimaryNodeChange(final PrimaryNodeState newState) {
         justElectedPrimaryNode = (newState == 
PrimaryNodeState.ELECTED_PRIMARY_NODE);
     }
 
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (isConfigurationRestored() && 
TRACKING_RESET_PROPERTIES.contains(descriptor)) {
+            resetTracking = true;
+        }
+    }
+
     @OnScheduled
-    public void initListedEntityTracker(ProcessContext context) {
-        final boolean isTrackingEntityStrategy = 
BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
-        if (listedEntityTracker != null && (resetEntityTrackingState || 
!isTrackingEntityStrategy)) {
+    public void initTrackingStrategy(ProcessContext context) throws 
IOException {
+        final String listingStrategy = 
context.getProperty(LISTING_STRATEGY).getValue();
+        final boolean isTrackingTimestampsStrategy = 
BY_TIMESTAMPS.getValue().equals(listingStrategy);
+        final boolean isTrackingEntityStrategy = 
BY_ENTITIES.getValue().equals(listingStrategy);
+
+        if (resetTracking || !isTrackingTimestampsStrategy) {
+            context.getStateManager().clear(Scope.CLUSTER);
+            currentTimestamp = 0L;
+            currentKeys.clear();
+        }
+
+        if (listedEntityTracker != null && (resetTracking || 
!isTrackingEntityStrategy)) {
             try {
                 listedEntityTracker.clearListedEntities();
+                listedEntityTracker = null;
             } catch (IOException e) {
                 throw new RuntimeException("Failed to reset previously listed 
entities", e);
             }
         }
-        resetEntityTrackingState = false;
 
-        if (isTrackingEntityStrategy) {
-            if (listedEntityTracker == null) {
-                listedEntityTracker = createListedEntityTracker();
-            }
-        } else {
-            listedEntityTracker = null;
+        if (isTrackingEntityStrategy && listedEntityTracker == null) {
+            listedEntityTracker = createListedEntityTracker();
         }
+
+        resetTracking = false;
     }
 
     protected ListedEntityTracker<ListableBlob> createListedEntityTracker() {
@@ -1027,4 +1049,16 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             return count;
         }
     }
+
+    long getCurrentTimestamp() {
+        return currentTimestamp;
+    }
+
+    ListedEntityTracker<ListableBlob> getListedEntityTracker() {
+        return listedEntityTracker;
+    }
+
+    boolean isResetTracking() {
+        return resetTracking;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index 82852acbfa..f220c1be6f 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -17,33 +17,24 @@
 package org.apache.nifi.processors.gcp.storage;
 
 import com.google.api.gax.paging.Page;
+import com.google.cloud.PageImpl;
 import com.google.cloud.storage.Acl;
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.BlobInfo;
 import com.google.cloud.storage.Storage;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneId;
-import java.time.ZoneOffset;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.LogMessage;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
@@ -51,6 +42,21 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
 import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR;
 import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.COMPONENT_COUNT_ATTR;
@@ -75,11 +81,13 @@ import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -87,6 +95,7 @@ import static org.mockito.Mockito.when;
  */
 @MockitoSettings(strictness = Strictness.LENIENT)
 public class ListGCSBucketTest extends AbstractGCSTest {
+
     private static final String PREFIX = "test-prefix";
     private static final Boolean USE_GENERATIONS = true;
 
@@ -113,16 +122,29 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     private static final Long CREATE_TIME = 1234L;
     private static final Long UPDATE_TIME = 4567L;
     private final static Long GENERATION = 5L;
+    private static final long TIMESTAMP = 1234567890;
 
     @Mock
     Storage storage;
 
+    @Mock
+    Page<Blob> mockBlobPage;
+
     @Captor
     ArgumentCaptor<Storage.BlobListOption> argumentCaptor;
 
-    @Override
-    public ListGCSBucket getProcessor() {
-        return new ListGCSBucket() {
+    private TestRunner runner;
+
+    private ListGCSBucket processor;
+
+    private MockStateManager mockStateManager;
+
+    @Mock
+    private DistributedMapCacheClient mockCache;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        processor = new ListGCSBucket() {
             @Override
             protected Storage getCloudService() {
                 return storage;
@@ -133,21 +155,25 @@ public class ListGCSBucketTest extends AbstractGCSTest {
                 return storage;
             }
         };
+
+        runner = buildNewRunner(processor);
+        runner.setProperty(ListGCSBucket.BUCKET, BUCKET);
+        runner.assertValid();
+
+        mockStateManager = runner.getStateManager();
+    }
+
+    @Override
+    public ListGCSBucket getProcessor() {
+        return processor;
     }
 
     @Override
     protected void addRequiredPropertiesToRunner(TestRunner runner) {
-        runner.setProperty(ListGCSBucket.BUCKET, BUCKET);
     }
 
     @Test
     public void testRestoreFreshState() throws Exception {
-        reset(storage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         
assertFalse(runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getStateVersion().isPresent(),
 "Cluster StateMap should be fresh (version -1L)");
         assertTrue(processor.getStateKeys().isEmpty());
 
@@ -157,17 +183,10 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals(0L, processor.getStateTimestamp());
 
         assertTrue(processor.getStateKeys().isEmpty());
-
     }
 
     @Test
     public void testRestorePreviousState() throws Exception {
-        reset(storage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Map<String, String> state = new LinkedHashMap<>();
         state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(4L));
         state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "test-key-0");
@@ -188,12 +207,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testPersistState() throws Exception {
-        reset(storage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         assertFalse(
                 
runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getStateVersion().isPresent(),
                 "Cluster StateMap should be fresh"
@@ -215,13 +228,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     }
 
     @Test
-    public void testFailedPersistState() throws Exception {
-        reset(storage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testFailedPersistState() {
         runner.getStateManager().setFailOnStateSet(Scope.CLUSTER, true);
 
         final Set<String> keys = new HashSet<>(Arrays.asList("test-key-0", 
"test-key-1"));
@@ -241,60 +248,10 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         // We could do more specific things like check the contents of the 
LogMessage,
         // but that seems too nitpicky.
-
-    }
-
-    @Mock
-    Page<Blob> mockBlobPage;
-
-    private Blob buildMockBlob(final String bucket, final String key, final 
long updateTime) {
-        final Blob blob = mock(Blob.class);
-        when(blob.getBucket()).thenReturn(bucket);
-        when(blob.getName()).thenReturn(key);
-        
when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
-        
when(blob.getCreateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
-        return blob;
-    }
-
-    private Blob buildMockBlobWithoutBucket(final String bucket, final String 
key, final long updateTime) {
-        final Blob blob = mock(Blob.class);
-        when(blob.getName()).thenReturn(key);
-        
when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
-        
when(blob.getCreateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
-        return blob;
-    }
-
-    private OffsetDateTime offsetDateTime(final long value) {
-        final Instant instant = Instant.ofEpochMilli(value);
-        final LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, 
ZoneId.of("UTC"));
-        return OffsetDateTime.of(localDateTime, ZoneOffset.UTC);
-    }
-
-    private void verifyConfigVerification(final TestRunner runner, final 
ListGCSBucket processor, final int expectedCount) {
-        final List<ConfigVerificationResult> verificationResults = 
processor.verify(runner.getProcessContext(), runner.getLogger(), 
Collections.emptyMap());
-        assertEquals(3, verificationResults.size());
-        final ConfigVerificationResult cloudServiceResult = 
verificationResults.get(0);
-        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
cloudServiceResult.getOutcome());
-
-        final ConfigVerificationResult iamPermissionsResult = 
verificationResults.get(1);
-        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
iamPermissionsResult.getOutcome());
-
-        final ConfigVerificationResult listingResult = 
verificationResults.get(2);
-        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
listingResult.getOutcome());
-
-        assertTrue(
-                
listingResult.getExplanation().matches(String.format(".*finding %s blobs.*", 
expectedCount)),
-                String.format("Expected %s blobs to be counted, but 
explanation was: %s", expectedCount, listingResult.getExplanation()));
     }
 
     @Test
-    public void testSuccessfulList() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testSuccessfulList() {
         final Iterable<Blob> mockList = Arrays.asList(
                 buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
                 buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
@@ -331,11 +288,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     }
 
     @Test
-    public void testNoTrackingListing() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
+    public void testNoTrackingListing() {
         runner.setProperty(ListGCSBucket.LISTING_STRATEGY, 
ListGCSBucket.NO_TRACKING);
         runner.assertValid();
 
@@ -381,12 +334,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testOldValues() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Iterable<Blob> mockList = Collections.singletonList(
                 buildMockBlob("blob-bucket-1", "blob-key-1", 2L)
         );
@@ -409,16 +356,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals("2", 
runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_TIMESTAMP));
     }
 
-
-
     @Test
     public void testEmptyList() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Iterable<Blob> mockList = Collections.emptyList();
 
         when(mockBlobPage.getValues()).thenReturn(mockList);
@@ -438,12 +377,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testListWithStateAndFilesComingInAlphabeticalOrder() throws 
Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Map<String, String> state = new LinkedHashMap<>();
         state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
         state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1");
@@ -482,12 +415,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testListWithStateAndFilesComingNotInAlphabeticalOrder() throws 
Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Map<String, String> state = new LinkedHashMap<>();
         state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
         state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
@@ -531,12 +458,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testListWithStateAndNewFilesComingWithTheSameTimestamp() 
throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Map<String, String> state = new LinkedHashMap<>();
         state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
         state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
@@ -586,12 +507,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testListWithStateAndNewFilesComingWithTheCurrentTimestamp() 
throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Map<String, String> state = new LinkedHashMap<>();
         state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
         state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
@@ -639,13 +554,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     }
 
     @Test
-    public void testAttributesSet() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testAttributesSet() {
         final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
         when(blob.getSize()).thenReturn(SIZE);
         when(blob.getCacheControl()).thenReturn(CACHE_CONTROL);
@@ -705,13 +614,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     }
 
     @Test
-    public void testAclOwnerUser() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testAclOwnerUser() {
         final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
         final Acl.User mockUser = mock(Acl.User.class);
         when(mockUser.getEmail()).thenReturn(OWNER_USER_EMAIL);
@@ -734,15 +637,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals("user", flowFile.getAttribute(OWNER_TYPE_ATTR));
     }
 
-
     @Test
-    public void testAclOwnerGroup() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testAclOwnerGroup() {
         final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
         final Acl.Group mockGroup = mock(Acl.Group.class);
         when(mockGroup.getEmail()).thenReturn(OWNER_GROUP_EMAIL);
@@ -765,16 +661,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals("group", flowFile.getAttribute(OWNER_TYPE_ATTR));
     }
 
-
-
     @Test
-    public void testAclOwnerDomain() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testAclOwnerDomain() {
         final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
         final Acl.Domain mockDomain = mock(Acl.Domain.class);
         when(mockDomain.getDomain()).thenReturn(OWNER_DOMAIN);
@@ -796,16 +684,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals("domain", flowFile.getAttribute(OWNER_TYPE_ATTR));
     }
 
-
-
     @Test
-    public void testAclOwnerProject() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testAclOwnerProject() {
         final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
         final Acl.Project mockProject = mock(Acl.Project.class);
         when(mockProject.getProjectId()).thenReturn(OWNER_PROJECT_ID);
@@ -828,15 +708,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals("project", flowFile.getAttribute(OWNER_TYPE_ATTR));
     }
 
-
     @Test
-    public void testYieldOnBadStateRestore() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testYieldOnBadStateRestore() {
         final Iterable<Blob> mockList = Collections.emptyList();
 
         runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
@@ -848,12 +721,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     }
 
     @Test
-    public void testListOptionsPrefix() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-
+    public void testListOptionsPrefix() {
         runner.setProperty(ListGCSBucket.PREFIX, PREFIX);
         runner.assertValid();
 
@@ -869,14 +737,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals(Storage.BlobListOption.prefix(PREFIX), 
argumentCaptor.getValue());
     }
 
-
     @Test
-    public void testListOptionsVersions() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-
+    public void testListOptionsVersions() {
         runner.setProperty(ListGCSBucket.USE_GENERATIONS, 
String.valueOf(USE_GENERATIONS));
         runner.assertValid();
 
@@ -892,4 +754,193 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         Storage.BlobListOption option = argumentCaptor.getValue();
         assertEquals(Storage.BlobListOption.versions(true), option);
     }
+
+    @Test
+    void testResetTimestampTrackingWhenBucketModified() throws Exception {
+        setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS);
+
+        assertFalse(processor.isResetTracking());
+
+        runner.run();
+
+        assertEquals(TIMESTAMP, processor.getCurrentTimestamp());
+
+        runner.setProperty(ListGCSBucket.BUCKET, "otherBucket");
+
+        assertTrue(processor.isResetTracking());
+
+        runner.run();
+
+        assertEquals(0, processor.getCurrentTimestamp());
+        mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP, 
Scope.CLUSTER);
+
+        assertFalse(processor.isResetTracking());
+    }
+
+    @Test
+    void testResetTimestampTrackingWhenPrefixModified() throws Exception {
+        setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS);
+
+        assertFalse(processor.isResetTracking());
+
+        runner.run();
+
+        assertEquals(TIMESTAMP, processor.getCurrentTimestamp());
+
+        runner.setProperty(ListGCSBucket.PREFIX, "prefix2");
+
+        assertTrue(processor.isResetTracking());
+
+        runner.run();
+
+        assertEquals(0, processor.getCurrentTimestamp());
+        mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP, 
Scope.CLUSTER);
+
+        assertFalse(processor.isResetTracking());
+    }
+
+    @Test
+    void testResetTimestampTrackingWhenStrategyModified() throws Exception {
+        setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS);
+
+        assertFalse(processor.isResetTracking());
+
+        runner.run();
+
+        assertEquals(TIMESTAMP, processor.getCurrentTimestamp());
+
+        runner.setProperty(ListGCSBucket.LISTING_STRATEGY, 
ListGCSBucket.NO_TRACKING);
+
+        assertTrue(processor.isResetTracking());
+
+        runner.run();
+
+        assertEquals(0, processor.getCurrentTimestamp());
+        mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP, 
Scope.CLUSTER);
+
+        assertFalse(processor.isResetTracking());
+    }
+
+    @Test
+    void testResetEntityTrackingWhenBucketModified() throws Exception {
+        setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES);
+
+        assertFalse(processor.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(processor.getListedEntityTracker());
+
+        runner.setProperty(ListGCSBucket.BUCKET, "otherBucket");
+
+        assertTrue(processor.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(processor.getListedEntityTracker());
+        verify(mockCache).remove(eq("ListedEntities::" + 
processor.getIdentifier()), any());
+
+        assertFalse(processor.isResetTracking());
+    }
+
+    @Test
+    void testResetEntityTrackingWhenPrefixModified() throws Exception {
+        setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES);
+
+        assertFalse(processor.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(processor.getListedEntityTracker());
+
+        runner.setProperty(ListGCSBucket.PREFIX, "prefix2");
+
+        assertTrue(processor.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(processor.getListedEntityTracker());
+        verify(mockCache).remove(eq("ListedEntities::" + 
processor.getIdentifier()), any());
+
+        assertFalse(processor.isResetTracking());
+    }
+
+    @Test
+    void testResetEntityTrackingWhenStrategyModified() throws Exception {
+        setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES);
+
+        assertFalse(processor.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(processor.getListedEntityTracker());
+
+        runner.setProperty(ListGCSBucket.LISTING_STRATEGY, 
ListGCSBucket.NO_TRACKING);
+
+        assertTrue(processor.isResetTracking());
+
+        runner.run();
+
+        assertNull(processor.getListedEntityTracker());
+        verify(mockCache).remove(eq("ListedEntities::" + 
processor.getIdentifier()), any());
+
+        assertFalse(processor.isResetTracking());
+    }
+
+    private void setUpResetTrackingTest(AllowableValue listingStrategy) throws 
Exception {
+        runner.setProperty(ListGCSBucket.LISTING_STRATEGY, listingStrategy);
+        runner.setProperty(ListGCSBucket.PREFIX, "prefix1");
+
+        if (listingStrategy == ListGCSBucket.BY_TIMESTAMPS) {
+            mockStateManager.setState(Map.of(ListGCSBucket.CURRENT_TIMESTAMP, 
Long.toString(TIMESTAMP), ListGCSBucket.CURRENT_KEY_PREFIX + "0", "file"), 
Scope.CLUSTER);
+        } else if (listingStrategy == ListGCSBucket.BY_ENTITIES) {
+            String serviceId = "DistributedMapCacheClient";
+            when(mockCache.getIdentifier()).thenReturn(serviceId);
+            runner.addControllerService(serviceId, mockCache);
+            runner.enableControllerService(mockCache);
+            runner.setProperty(ListGCSBucket.TRACKING_STATE_CACHE, serviceId);
+        }
+
+        when(storage.list(anyString(), 
any(Storage.BlobListOption.class))).thenReturn(new PageImpl<>(null, null, 
null));
+    }
+
+    private Blob buildMockBlob(final String bucket, final String key, final 
long updateTime) {
+        final Blob blob = mock(Blob.class);
+        when(blob.getBucket()).thenReturn(bucket);
+        when(blob.getName()).thenReturn(key);
+        
when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
+        
when(blob.getCreateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
+        return blob;
+    }
+
+    private Blob buildMockBlobWithoutBucket(final String bucket, final String 
key, final long updateTime) {
+        final Blob blob = mock(Blob.class);
+        when(blob.getName()).thenReturn(key);
+        
when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
+        
when(blob.getCreateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
+        return blob;
+    }
+
+    private OffsetDateTime offsetDateTime(final long value) {
+        final Instant instant = Instant.ofEpochMilli(value);
+        final LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, 
ZoneId.of("UTC"));
+        return OffsetDateTime.of(localDateTime, ZoneOffset.UTC);
+    }
+
+    private void verifyConfigVerification(final TestRunner runner, final 
ListGCSBucket processor, final int expectedCount) {
+        final List<ConfigVerificationResult> verificationResults = 
processor.verify(runner.getProcessContext(), runner.getLogger(), 
Collections.emptyMap());
+        assertEquals(3, verificationResults.size());
+        final ConfigVerificationResult cloudServiceResult = 
verificationResults.get(0);
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
cloudServiceResult.getOutcome());
+
+        final ConfigVerificationResult iamPermissionsResult = 
verificationResults.get(1);
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
iamPermissionsResult.getOutcome());
+
+        final ConfigVerificationResult listingResult = 
verificationResults.get(2);
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
listingResult.getOutcome());
+
+        assertTrue(
+                
listingResult.getExplanation().matches(String.format(".*finding %s blobs.*", 
expectedCount)),
+                String.format("Expected %s blobs to be counted, but 
explanation was: %s", expectedCount, listingResult.getExplanation()));
+    }
 }
\ No newline at end of file

Reply via email to