This is an automated email from the ASF dual-hosted git repository. lhaiesp pushed a commit to branch 1.3.1 in repository https://gitbox.apache.org/repos/asf/samza.git
commit 17be8671ba317e01ed3abcae704bff82aac41aca Author: bkonold <[email protected]> AuthorDate: Tue Jan 28 15:56:44 2020 -0800 SAMZA-2447: Checkpoint dir removal should only search in valid store dirs (#1261) --- .../TransactionalStateTaskStorageManager.scala | 12 ++++++---- .../TestTransactionalStateTaskStorageManager.java | 27 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala index 20c7271..0335710 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala @@ -90,11 +90,13 @@ class TransactionalStateTaskStorageManager( val fileFilter: FileFilter = new WildcardFileFilter(taskStoreName + "-*") val checkpointDirs = storeDir.listFiles(fileFilter) - checkpointDirs - .filter(!_.getName.contains(latestCheckpointId.toString)) - .foreach(checkpointDir => { - FileUtils.deleteDirectory(checkpointDir) - }) + if (checkpointDirs != null) { + checkpointDirs + .filter(!_.getName.contains(latestCheckpointId.toString)) + .foreach(checkpointDir => { + FileUtils.deleteDirectory(checkpointDir) + }) + } }) } } diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java index f2d4972..244a35b 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java @@ -21,6 +21,7 @@ package org.apache.samza.storage; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import java.io.FileFilter; import scala.Option; import scala.collection.immutable.Map; @@ -492,6 +493,32 @@ public class TestTransactionalStateTaskStorageManager { fail("Should have thrown an exception if no changelog offset found for checkpointed store"); } + @Test + public void testRemoveOldCheckpointsWhenBaseDirContainsRegularFiles() { + TaskName taskName = new TaskName("Partition 0"); + ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class); + Map<String, SystemStream> changelogSystemStreams = mock(Map.class); + SystemAdmins systemAdmins = mock(SystemAdmins.class); + File loggedStoreBaseDir = mock(File.class); + Partition changelogPartition = new Partition(0); + TaskMode taskMode = TaskMode.Active; + StorageManagerUtil storageManagerUtil = mock(StorageManagerUtil.class); + + File mockStoreDir = mock(File.class); + String mockStoreDirName = "notDirectory"; + + when(loggedStoreBaseDir.listFiles()).thenReturn(new File[] {mockStoreDir}); + when(mockStoreDir.getName()).thenReturn(mockStoreDirName); + when(storageManagerUtil.getTaskStoreDir(eq(loggedStoreBaseDir), eq(mockStoreDirName), eq(taskName), eq(taskMode))).thenReturn(mockStoreDir); + // null here can happen if listFiles is called on a non-directory + when(mockStoreDir.listFiles(any(FileFilter.class))).thenReturn(null); + + TransactionalStateTaskStorageManager tsm = new TransactionalStateTaskStorageManager(taskName, containerStorageManager, + changelogSystemStreams, systemAdmins, loggedStoreBaseDir, changelogPartition, taskMode, storageManagerUtil); + + tsm.removeOldCheckpoints(CheckpointId.create()); + } + private TransactionalStateTaskStorageManager buildTSM(ContainerStorageManager csm, Partition changelogPartition, StorageManagerUtil smu) { TaskName taskName = new TaskName("Partition 0");
