We test the ability to create new files in the filesystem, this is done by
adding an entry in the desired directory list.
The file will also be created in the host filesystem with matching filename.

Signed-off-by: Amjad Alsharafi <amjadsharaf...@gmail.com>
---
 tests/qemu-iotests/fat16.py        | 124 +++++++++++++++++++++++++++--
 tests/qemu-iotests/tests/vvfat     |  29 +++++--
 tests/qemu-iotests/tests/vvfat.out |   4 +-
 3 files changed, 144 insertions(+), 13 deletions(-)

diff --git a/tests/qemu-iotests/fat16.py b/tests/qemu-iotests/fat16.py
index 6ac5508d8d..e86bdd0b10 100644
--- a/tests/qemu-iotests/fat16.py
+++ b/tests/qemu-iotests/fat16.py
@@ -16,9 +16,11 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 from typing import List
+import string
 
 SECTOR_SIZE = 512
 DIRENTRY_SIZE = 32
+ALLOWED_FILE_CHARS = set("!#$%&'()-@^_`{}~" + string.digits + 
string.ascii_uppercase)
 
 
 class MBR:
@@ -265,7 +267,7 @@ def write_fat_entry(self, cluster: int, value: int):
             + self.fats[fat_offset + 2 :]
         )
         self.fats_dirty_sectors.add(fat_offset // SECTOR_SIZE)
-    
+
     def flush_fats(self):
         """
         Write the FATs back to the disk.
@@ -293,7 +295,7 @@ def next_cluster(self, cluster: int) -> int | None:
             raise Exception("Invalid FAT entry")
         else:
             return fat_entry
-    
+
     def next_free_cluster(self) -> int:
         """
         Find the next free cluster.
@@ -338,6 +340,67 @@ def read_directory(self, cluster: int) -> 
List[FatDirectoryEntry]:
             cluster = self.next_cluster(cluster)
         return entries
 
+    def add_direntry(self, cluster: int | None, name: str, ext: str, 
attributes: int):
+        """
+        Add a new directory entry to the given cluster.
+        If the cluster is `None`, then it will be added to the root directory.
+        """
+
+        def find_free_entry(data: bytes):
+            for i in range(0, len(data), DIRENTRY_SIZE):
+                entry = data[i : i + DIRENTRY_SIZE]
+                if entry[0] == 0 or entry[0] == 0xE5:
+                    return i
+            return None
+
+        assert len(name) <= 8, "Name must be 8 characters or less"
+        assert len(ext) <= 3, "Ext must be 3 characters or less"
+        assert attributes % 0x15 != 0x15, "Invalid attributes"
+
+        # initial dummy data
+        new_entry = FatDirectoryEntry(b"\0" * 32, 0, 0)
+        new_entry.name = name.ljust(8, " ")
+        new_entry.ext = ext.ljust(3, " ")
+        new_entry.attributes = attributes
+        new_entry.reserved = 0
+        new_entry.create_time_tenth = 0
+        new_entry.create_time = 0
+        new_entry.create_date = 0
+        new_entry.last_access_date = 0
+        new_entry.last_mod_time = 0
+        new_entry.last_mod_date = 0
+        new_entry.cluster = self.next_free_cluster()
+        new_entry.size_bytes = 0
+
+        # mark as EOF
+        self.write_fat_entry(new_entry.cluster, 0xFFFF)
+
+        if cluster is None:
+            for i in range(self.boot_sector.root_dir_size()):
+                sector_data = self.read_sectors(
+                    self.boot_sector.root_dir_start() + i, 1
+                )
+                offset = find_free_entry(sector_data)
+                if offset is not None:
+                    new_entry.sector = self.boot_sector.root_dir_start() + i
+                    new_entry.offset = offset
+                    self.update_direntry(new_entry)
+                    return new_entry
+        else:
+            while cluster is not None:
+                data = self.read_cluster(cluster)
+                offset = find_free_entry(data)
+                if offset is not None:
+                    new_entry.sector = 
self.boot_sector.first_sector_of_cluster(
+                        cluster
+                    ) + (offset // SECTOR_SIZE)
+                    new_entry.offset = offset % SECTOR_SIZE
+                    self.update_direntry(new_entry)
+                    return new_entry
+                cluster = self.next_cluster(cluster)
+
+        raise Exception("No free directory entries")
+
     def update_direntry(self, entry: FatDirectoryEntry):
         """
         Write the directory entry back to the disk.
@@ -406,9 +469,10 @@ def truncate_file(self, entry: FatDirectoryEntry, 
new_size: int):
             raise Exception(f"{entry.whole_name()} is a directory")
 
         def clusters_from_size(size: int):
-            return (size + self.boot_sector.cluster_bytes() - 1) // 
self.boot_sector.cluster_bytes()
+            return (
+                size + self.boot_sector.cluster_bytes() - 1
+            ) // self.boot_sector.cluster_bytes()
 
-        
         # First, allocate new FATs if we need to
         required_clusters = clusters_from_size(new_size)
         current_clusters = clusters_from_size(entry.size_bytes)
@@ -438,7 +502,7 @@ def clusters_from_size(size: int):
                 self.write_fat_entry(cluster, new_cluster)
                 self.write_fat_entry(new_cluster, 0xFFFF)
                 cluster = new_cluster
-            
+
         elif required_clusters < current_clusters:
             # Truncate the file
             cluster = entry.cluster
@@ -464,7 +528,9 @@ def clusters_from_size(size: int):
             count += 1
             affected_clusters.add(cluster)
             cluster = self.next_cluster(cluster)
-        assert count == required_clusters, f"Expected {required_clusters} 
clusters, got {count}"
+        assert (
+            count == required_clusters
+        ), f"Expected {required_clusters} clusters, got {count}"
 
         # update the size
         entry.size_bytes = new_size
@@ -505,3 +571,49 @@ def write_file(self, entry: FatDirectoryEntry, data: 
bytes):
             cluster = self.next_cluster(cluster)
 
         assert len(data) == 0, "Data was not written completely, clusters 
missing"
+
+    def create_file(self, path: str):
+        """
+        Create a new file at the given path.
+        """
+        assert path[0] == "/", "Path must start with /"
+
+        path = path[1:]  # remove the leading /
+
+        parts = path.split("/")
+
+        directory_cluster = None
+        directory = self.read_root_directory()
+
+        parts, filename = parts[:-1], parts[-1]
+
+        for i, part in enumerate(parts):
+            current_entry = None
+            for entry in directory:
+                if entry.whole_name() == part:
+                    current_entry = entry
+                    break
+            if current_entry is None:
+                return None
+
+            if current_entry.attributes & 0x10 == 0:
+                raise Exception(f"{current_entry.whole_name()} is not a 
directory")
+            else:
+                directory = self.read_directory(current_entry.cluster)
+                directory_cluster = current_entry.cluster
+
+        # add new entry to the directory
+
+        filename, ext = filename.split(".")
+
+        if len(ext) > 3:
+            raise Exception("Ext must be 3 characters or less")
+        if len(filename) > 8:
+            raise Exception("Name must be 8 characters or less")
+
+        for c in filename + ext:
+
+            if c not in ALLOWED_FILE_CHARS:
+                raise Exception("Invalid character in filename")
+
+        return self.add_direntry(directory_cluster, filename, ext, 0)
diff --git a/tests/qemu-iotests/tests/vvfat b/tests/qemu-iotests/tests/vvfat
index e0e23d1ab8..d8d802589d 100755
--- a/tests/qemu-iotests/tests/vvfat
+++ b/tests/qemu-iotests/tests/vvfat
@@ -323,7 +323,7 @@ class TestVVFatDriver(QMPTestCase):
 
         with open(os.path.join(filesystem, "file0.txt"), "rb") as f:
             self.assertEqual(f.read(), new_content)
-    
+
     def test_write_large_file(self):
         """
         Test writing a large file
@@ -342,7 +342,7 @@ class TestVVFatDriver(QMPTestCase):
 
         with open(os.path.join(filesystem, "large1.txt"), "rb") as f:
             self.assertEqual(f.read(), new_content)
-    
+
     def test_truncate_file_change_clusters_less(self):
         """
         Test truncating a file by reducing the number of clusters
@@ -359,7 +359,6 @@ class TestVVFatDriver(QMPTestCase):
         with open(os.path.join(filesystem, "large1.txt"), "rb") as f:
             self.assertEqual(f.read(), b"A")
 
-    
     def test_write_file_change_clusters_less(self):
         """
         Test truncating a file by reducing the number of clusters
@@ -376,7 +375,7 @@ class TestVVFatDriver(QMPTestCase):
 
         with open(os.path.join(filesystem, "large2.txt"), "rb") as f:
             self.assertEqual(f.read(), new_content)
-    
+
     def test_write_file_change_clusters_more(self):
         """
         Test truncating a file by increasing the number of clusters
@@ -392,7 +391,27 @@ class TestVVFatDriver(QMPTestCase):
 
         with open(os.path.join(filesystem, "large2.txt"), "rb") as f:
             self.assertEqual(f.read(), new_content)
-    
+
+    def test_create_file(self):
+        """
+        Test creating a new file
+        """
+        fat16 = self.init_fat16()
+
+        new_file = fat16.create_file("/NEWFILE.TXT")
+
+        self.assertIsNotNone(new_file)
+        self.assertEqual(new_file.size_bytes, 0)
+
+        new_content = b"Hello, world! New file\n"
+        fat16.write_file(new_file, new_content)
+
+        self.assertEqual(fat16.read_file(new_file), new_content)
+
+        with open(os.path.join(filesystem, "newfile.txt"), "rb") as f:
+            self.assertEqual(f.read(), new_content)
+
+    # TODO: support deleting files
 
 
 if __name__ == "__main__":
diff --git a/tests/qemu-iotests/tests/vvfat.out 
b/tests/qemu-iotests/tests/vvfat.out
index fa16b5ccef..6323079e08 100755
--- a/tests/qemu-iotests/tests/vvfat.out
+++ b/tests/qemu-iotests/tests/vvfat.out
@@ -1,5 +1,5 @@
-.............
+..............
 ----------------------------------------------------------------------
-Ran 13 tests
+Ran 14 tests
 
 OK
-- 
2.45.0


Reply via email to