This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new a9e246956c NIFI-12936 ListGCSBucket resets its tracking state after configuration change a9e246956c is described below commit a9e246956cee822e407b20e45930e05c3fd72de6 Author: Peter Turcsanyi <turcsa...@apache.org> AuthorDate: Fri Mar 22 22:32:27 2024 +0100 NIFI-12936 ListGCSBucket resets its tracking state after configuration change Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #8550. --- .../nifi/processors/gcp/storage/ListGCSBucket.java | 60 ++- .../processors/gcp/storage/ListGCSBucketTest.java | 451 ++++++++++++--------- 2 files changed, 298 insertions(+), 213 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java index aed69ba1ee..97c6783a07 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java @@ -278,40 +278,62 @@ public class ListGCSBucket extends AbstractGCSProcessor { return RELATIONSHIPS; } - // State tracking + private static final Set<PropertyDescriptor> TRACKING_RESET_PROPERTIES = Set.of( + BUCKET, + PREFIX, + LISTING_STRATEGY + ); + + // used by Tracking Timestamps tracking strategy public static final String CURRENT_TIMESTAMP = "currentTimestamp"; public static final String CURRENT_KEY_PREFIX = "key-"; private volatile long currentTimestamp = 0L; private final Set<String> currentKeys = Collections.synchronizedSet(new HashSet<>()); - private volatile boolean justElectedPrimaryNode = false; - private volatile boolean resetEntityTrackingState = false; + // used by Tracking Entities tracking strategy private volatile ListedEntityTracker<ListableBlob> listedEntityTracker; + private volatile boolean justElectedPrimaryNode = false; + private volatile boolean resetTracking = false; + @OnPrimaryNodeStateChange public void onPrimaryNodeChange(final PrimaryNodeState newState) { justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE); } + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (isConfigurationRestored() && TRACKING_RESET_PROPERTIES.contains(descriptor)) { + resetTracking = true; + } + } + @OnScheduled - public void initListedEntityTracker(ProcessContext context) { - final boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue()); - if (listedEntityTracker != null && (resetEntityTrackingState || !isTrackingEntityStrategy)) { + public void initTrackingStrategy(ProcessContext context) throws IOException { + final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue(); + final boolean isTrackingTimestampsStrategy = BY_TIMESTAMPS.getValue().equals(listingStrategy); + final boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(listingStrategy); + + if (resetTracking || !isTrackingTimestampsStrategy) { + context.getStateManager().clear(Scope.CLUSTER); + currentTimestamp = 0L; + currentKeys.clear(); + } + + if (listedEntityTracker != null && (resetTracking || !isTrackingEntityStrategy)) { try { listedEntityTracker.clearListedEntities(); + listedEntityTracker = null; } catch (IOException e) { throw new RuntimeException("Failed to reset previously listed entities", e); } } - resetEntityTrackingState = false; - if (isTrackingEntityStrategy) { - if (listedEntityTracker == null) { - listedEntityTracker = createListedEntityTracker(); - } - } else { - listedEntityTracker = null; + if (isTrackingEntityStrategy && listedEntityTracker == null) { + listedEntityTracker = createListedEntityTracker(); } + + resetTracking = false; } protected ListedEntityTracker<ListableBlob> createListedEntityTracker() { @@ -1027,4 +1049,16 @@ public class ListGCSBucket extends AbstractGCSProcessor { return count; } } + + long getCurrentTimestamp() { + return currentTimestamp; + } + + ListedEntityTracker<ListableBlob> getListedEntityTracker() { + return listedEntityTracker; + } + + boolean isResetTracking() { + return resetTracking; + } } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java index 82852acbfa..f220c1be6f 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java @@ -17,33 +17,24 @@ package org.apache.nifi.processors.gcp.storage; import com.google.api.gax.paging.Page; +import com.google.cloud.PageImpl; import com.google.cloud.storage.Acl; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.OffsetDateTime; -import java.time.ZoneId; -import java.time.ZoneOffset; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.LogMessage; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -51,6 +42,21 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR; import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR; import static org.apache.nifi.processors.gcp.storage.StorageAttributes.COMPONENT_COUNT_ATTR; @@ -75,11 +81,13 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -87,6 +95,7 @@ import static org.mockito.Mockito.when; */ @MockitoSettings(strictness = Strictness.LENIENT) public class ListGCSBucketTest extends AbstractGCSTest { + private static final String PREFIX = "test-prefix"; private static final Boolean USE_GENERATIONS = true; @@ -113,16 +122,29 @@ public class ListGCSBucketTest extends AbstractGCSTest { private static final Long CREATE_TIME = 1234L; private static final Long UPDATE_TIME = 4567L; private final static Long GENERATION = 5L; + private static final long TIMESTAMP = 1234567890; @Mock Storage storage; + @Mock + Page<Blob> mockBlobPage; + @Captor ArgumentCaptor<Storage.BlobListOption> argumentCaptor; - @Override - public ListGCSBucket getProcessor() { - return new ListGCSBucket() { + private TestRunner runner; + + private ListGCSBucket processor; + + private MockStateManager mockStateManager; + + @Mock + private DistributedMapCacheClient mockCache; + + @BeforeEach + public void beforeEach() throws Exception { + processor = new ListGCSBucket() { @Override protected Storage getCloudService() { return storage; @@ -133,21 +155,25 @@ public class ListGCSBucketTest extends AbstractGCSTest { return storage; } }; + + runner = buildNewRunner(processor); + runner.setProperty(ListGCSBucket.BUCKET, BUCKET); + runner.assertValid(); + + mockStateManager = runner.getStateManager(); + } + + @Override + public ListGCSBucket getProcessor() { + return processor; } @Override protected void addRequiredPropertiesToRunner(TestRunner runner) { - runner.setProperty(ListGCSBucket.BUCKET, BUCKET); } @Test public void testRestoreFreshState() throws Exception { - reset(storage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - assertFalse(runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getStateVersion().isPresent(), "Cluster StateMap should be fresh (version -1L)"); assertTrue(processor.getStateKeys().isEmpty()); @@ -157,17 +183,10 @@ public class ListGCSBucketTest extends AbstractGCSTest { assertEquals(0L, processor.getStateTimestamp()); assertTrue(processor.getStateKeys().isEmpty()); - } @Test public void testRestorePreviousState() throws Exception { - reset(storage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - final Map<String, String> state = new LinkedHashMap<>(); state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(4L)); state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "test-key-0"); @@ -188,12 +207,6 @@ public class ListGCSBucketTest extends AbstractGCSTest { @Test public void testPersistState() throws Exception { - reset(storage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - assertFalse( runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getStateVersion().isPresent(), "Cluster StateMap should be fresh" @@ -215,13 +228,7 @@ public class ListGCSBucketTest extends AbstractGCSTest { } @Test - public void testFailedPersistState() throws Exception { - reset(storage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - + public void testFailedPersistState() { runner.getStateManager().setFailOnStateSet(Scope.CLUSTER, true); final Set<String> keys = new HashSet<>(Arrays.asList("test-key-0", "test-key-1")); @@ -241,60 +248,10 @@ public class ListGCSBucketTest extends AbstractGCSTest { // We could do more specific things like check the contents of the LogMessage, // but that seems too nitpicky. - - } - - @Mock - Page<Blob> mockBlobPage; - - private Blob buildMockBlob(final String bucket, final String key, final long updateTime) { - final Blob blob = mock(Blob.class); - when(blob.getBucket()).thenReturn(bucket); - when(blob.getName()).thenReturn(key); - when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime)); - when(blob.getCreateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime)); - return blob; - } - - private Blob buildMockBlobWithoutBucket(final String bucket, final String key, final long updateTime) { - final Blob blob = mock(Blob.class); - when(blob.getName()).thenReturn(key); - when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime)); - when(blob.getCreateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime)); - return blob; - } - - private OffsetDateTime offsetDateTime(final long value) { - final Instant instant = Instant.ofEpochMilli(value); - final LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("UTC")); - return OffsetDateTime.of(localDateTime, ZoneOffset.UTC); - } - - private void verifyConfigVerification(final TestRunner runner, final ListGCSBucket processor, final int expectedCount) { - final List<ConfigVerificationResult> verificationResults = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); - assertEquals(3, verificationResults.size()); - final ConfigVerificationResult cloudServiceResult = verificationResults.get(0); - assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, cloudServiceResult.getOutcome()); - - final ConfigVerificationResult iamPermissionsResult = verificationResults.get(1); - assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, iamPermissionsResult.getOutcome()); - - final ConfigVerificationResult listingResult = verificationResults.get(2); - assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, listingResult.getOutcome()); - - assertTrue( - listingResult.getExplanation().matches(String.format(".*finding %s blobs.*", expectedCount)), - String.format("Expected %s blobs to be counted, but explanation was: %s", expectedCount, listingResult.getExplanation())); } @Test - public void testSuccessfulList() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - + public void testSuccessfulList() { final Iterable<Blob> mockList = Arrays.asList( buildMockBlob("blob-bucket-1", "blob-key-1", 2L), buildMockBlob("blob-bucket-2", "blob-key-2", 3L) @@ -331,11 +288,7 @@ public class ListGCSBucketTest extends AbstractGCSTest { } @Test - public void testNoTrackingListing() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); + public void testNoTrackingListing() { runner.setProperty(ListGCSBucket.LISTING_STRATEGY, ListGCSBucket.NO_TRACKING); runner.assertValid(); @@ -381,12 +334,6 @@ public class ListGCSBucketTest extends AbstractGCSTest { @Test public void testOldValues() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - final Iterable<Blob> mockList = Collections.singletonList( buildMockBlob("blob-bucket-1", "blob-key-1", 2L) ); @@ -409,16 +356,8 @@ public class ListGCSBucketTest extends AbstractGCSTest { assertEquals("2", runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_TIMESTAMP)); } - - @Test public void testEmptyList() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - final Iterable<Blob> mockList = Collections.emptyList(); when(mockBlobPage.getValues()).thenReturn(mockList); @@ -438,12 +377,6 @@ public class ListGCSBucketTest extends AbstractGCSTest { @Test public void testListWithStateAndFilesComingInAlphabeticalOrder() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - final Map<String, String> state = new LinkedHashMap<>(); state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L)); state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1"); @@ -482,12 +415,6 @@ public class ListGCSBucketTest extends AbstractGCSTest { @Test public void testListWithStateAndFilesComingNotInAlphabeticalOrder() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - final Map<String, String> state = new LinkedHashMap<>(); state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L)); state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"); @@ -531,12 +458,6 @@ public class ListGCSBucketTest extends AbstractGCSTest { @Test public void testListWithStateAndNewFilesComingWithTheSameTimestamp() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - final Map<String, String> state = new LinkedHashMap<>(); state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L)); state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"); @@ -586,12 +507,6 @@ public class ListGCSBucketTest extends AbstractGCSTest { @Test public void testListWithStateAndNewFilesComingWithTheCurrentTimestamp() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - final Map<String, String> state = new LinkedHashMap<>(); state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L)); state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"); @@ -639,13 +554,7 @@ public class ListGCSBucketTest extends AbstractGCSTest { } @Test - public void testAttributesSet() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - + public void testAttributesSet() { final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L); when(blob.getSize()).thenReturn(SIZE); when(blob.getCacheControl()).thenReturn(CACHE_CONTROL); @@ -705,13 +614,7 @@ public class ListGCSBucketTest extends AbstractGCSTest { } @Test - public void testAclOwnerUser() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - + public void testAclOwnerUser() { final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L); final Acl.User mockUser = mock(Acl.User.class); when(mockUser.getEmail()).thenReturn(OWNER_USER_EMAIL); @@ -734,15 +637,8 @@ public class ListGCSBucketTest extends AbstractGCSTest { assertEquals("user", flowFile.getAttribute(OWNER_TYPE_ATTR)); } - @Test - public void testAclOwnerGroup() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - + public void testAclOwnerGroup() { final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L); final Acl.Group mockGroup = mock(Acl.Group.class); when(mockGroup.getEmail()).thenReturn(OWNER_GROUP_EMAIL); @@ -765,16 +661,8 @@ public class ListGCSBucketTest extends AbstractGCSTest { assertEquals("group", flowFile.getAttribute(OWNER_TYPE_ATTR)); } - - @Test - public void testAclOwnerDomain() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - + public void testAclOwnerDomain() { final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L); final Acl.Domain mockDomain = mock(Acl.Domain.class); when(mockDomain.getDomain()).thenReturn(OWNER_DOMAIN); @@ -796,16 +684,8 @@ public class ListGCSBucketTest extends AbstractGCSTest { assertEquals("domain", flowFile.getAttribute(OWNER_TYPE_ATTR)); } - - @Test - public void testAclOwnerProject() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - + public void testAclOwnerProject() { final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L); final Acl.Project mockProject = mock(Acl.Project.class); when(mockProject.getProjectId()).thenReturn(OWNER_PROJECT_ID); @@ -828,15 +708,8 @@ public class ListGCSBucketTest extends AbstractGCSTest { assertEquals("project", flowFile.getAttribute(OWNER_TYPE_ATTR)); } - @Test - public void testYieldOnBadStateRestore() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - + public void testYieldOnBadStateRestore() { final Iterable<Blob> mockList = Collections.emptyList(); runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true); @@ -848,12 +721,7 @@ public class ListGCSBucketTest extends AbstractGCSTest { } @Test - public void testListOptionsPrefix() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - + public void testListOptionsPrefix() { runner.setProperty(ListGCSBucket.PREFIX, PREFIX); runner.assertValid(); @@ -869,14 +737,8 @@ public class ListGCSBucketTest extends AbstractGCSTest { assertEquals(Storage.BlobListOption.prefix(PREFIX), argumentCaptor.getValue()); } - @Test - public void testListOptionsVersions() throws Exception { - reset(storage, mockBlobPage); - final ListGCSBucket processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - + public void testListOptionsVersions() { runner.setProperty(ListGCSBucket.USE_GENERATIONS, String.valueOf(USE_GENERATIONS)); runner.assertValid(); @@ -892,4 +754,193 @@ public class ListGCSBucketTest extends AbstractGCSTest { Storage.BlobListOption option = argumentCaptor.getValue(); assertEquals(Storage.BlobListOption.versions(true), option); } + + @Test + void testResetTimestampTrackingWhenBucketModified() throws Exception { + setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS); + + assertFalse(processor.isResetTracking()); + + runner.run(); + + assertEquals(TIMESTAMP, processor.getCurrentTimestamp()); + + runner.setProperty(ListGCSBucket.BUCKET, "otherBucket"); + + assertTrue(processor.isResetTracking()); + + runner.run(); + + assertEquals(0, processor.getCurrentTimestamp()); + mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP, Scope.CLUSTER); + + assertFalse(processor.isResetTracking()); + } + + @Test + void testResetTimestampTrackingWhenPrefixModified() throws Exception { + setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS); + + assertFalse(processor.isResetTracking()); + + runner.run(); + + assertEquals(TIMESTAMP, processor.getCurrentTimestamp()); + + runner.setProperty(ListGCSBucket.PREFIX, "prefix2"); + + assertTrue(processor.isResetTracking()); + + runner.run(); + + assertEquals(0, processor.getCurrentTimestamp()); + mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP, Scope.CLUSTER); + + assertFalse(processor.isResetTracking()); + } + + @Test + void testResetTimestampTrackingWhenStrategyModified() throws Exception { + setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS); + + assertFalse(processor.isResetTracking()); + + runner.run(); + + assertEquals(TIMESTAMP, processor.getCurrentTimestamp()); + + runner.setProperty(ListGCSBucket.LISTING_STRATEGY, ListGCSBucket.NO_TRACKING); + + assertTrue(processor.isResetTracking()); + + runner.run(); + + assertEquals(0, processor.getCurrentTimestamp()); + mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP, Scope.CLUSTER); + + assertFalse(processor.isResetTracking()); + } + + @Test + void testResetEntityTrackingWhenBucketModified() throws Exception { + setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES); + + assertFalse(processor.isResetTracking()); + + runner.run(); + + assertNotNull(processor.getListedEntityTracker()); + + runner.setProperty(ListGCSBucket.BUCKET, "otherBucket"); + + assertTrue(processor.isResetTracking()); + + runner.run(); + + assertNotNull(processor.getListedEntityTracker()); + verify(mockCache).remove(eq("ListedEntities::" + processor.getIdentifier()), any()); + + assertFalse(processor.isResetTracking()); + } + + @Test + void testResetEntityTrackingWhenPrefixModified() throws Exception { + setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES); + + assertFalse(processor.isResetTracking()); + + runner.run(); + + assertNotNull(processor.getListedEntityTracker()); + + runner.setProperty(ListGCSBucket.PREFIX, "prefix2"); + + assertTrue(processor.isResetTracking()); + + runner.run(); + + assertNotNull(processor.getListedEntityTracker()); + verify(mockCache).remove(eq("ListedEntities::" + processor.getIdentifier()), any()); + + assertFalse(processor.isResetTracking()); + } + + @Test + void testResetEntityTrackingWhenStrategyModified() throws Exception { + setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES); + + assertFalse(processor.isResetTracking()); + + runner.run(); + + assertNotNull(processor.getListedEntityTracker()); + + runner.setProperty(ListGCSBucket.LISTING_STRATEGY, ListGCSBucket.NO_TRACKING); + + assertTrue(processor.isResetTracking()); + + runner.run(); + + assertNull(processor.getListedEntityTracker()); + verify(mockCache).remove(eq("ListedEntities::" + processor.getIdentifier()), any()); + + assertFalse(processor.isResetTracking()); + } + + private void setUpResetTrackingTest(AllowableValue listingStrategy) throws Exception { + runner.setProperty(ListGCSBucket.LISTING_STRATEGY, listingStrategy); + runner.setProperty(ListGCSBucket.PREFIX, "prefix1"); + + if (listingStrategy == ListGCSBucket.BY_TIMESTAMPS) { + mockStateManager.setState(Map.of(ListGCSBucket.CURRENT_TIMESTAMP, Long.toString(TIMESTAMP), ListGCSBucket.CURRENT_KEY_PREFIX + "0", "file"), Scope.CLUSTER); + } else if (listingStrategy == ListGCSBucket.BY_ENTITIES) { + String serviceId = "DistributedMapCacheClient"; + when(mockCache.getIdentifier()).thenReturn(serviceId); + runner.addControllerService(serviceId, mockCache); + runner.enableControllerService(mockCache); + runner.setProperty(ListGCSBucket.TRACKING_STATE_CACHE, serviceId); + } + + when(storage.list(anyString(), any(Storage.BlobListOption.class))).thenReturn(new PageImpl<>(null, null, null)); + } + + private Blob buildMockBlob(final String bucket, final String key, final long updateTime) { + final Blob blob = mock(Blob.class); + when(blob.getBucket()).thenReturn(bucket); + when(blob.getName()).thenReturn(key); + when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime)); + when(blob.getCreateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime)); + return blob; + } + + private Blob buildMockBlobWithoutBucket(final String bucket, final String key, final long updateTime) { + final Blob blob = mock(Blob.class); + when(blob.getName()).thenReturn(key); + when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime)); + when(blob.getCreateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime)); + return blob; + } + + private OffsetDateTime offsetDateTime(final long value) { + final Instant instant = Instant.ofEpochMilli(value); + final LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("UTC")); + return OffsetDateTime.of(localDateTime, ZoneOffset.UTC); + } + + private void verifyConfigVerification(final TestRunner runner, final ListGCSBucket processor, final int expectedCount) { + final List<ConfigVerificationResult> verificationResults = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); + assertEquals(3, verificationResults.size()); + final ConfigVerificationResult cloudServiceResult = verificationResults.get(0); + assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, cloudServiceResult.getOutcome()); + + final ConfigVerificationResult iamPermissionsResult = verificationResults.get(1); + assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, iamPermissionsResult.getOutcome()); + + final ConfigVerificationResult listingResult = verificationResults.get(2); + assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, listingResult.getOutcome()); + + assertTrue( + listingResult.getExplanation().matches(String.format(".*finding %s blobs.*", expectedCount)), + String.format("Expected %s blobs to be counted, but explanation was: %s", expectedCount, listingResult.getExplanation())); + } } \ No newline at end of file