Author: Armin Rigo <[email protected]>
Branch: py3.5-scandir
Changeset: r86384:7a230ed585cb
Date: 2016-08-21 21:12 +0200
http://bitbucket.org/pypy/pypy/changeset/7a230ed585cb/

Log:    in-progress: is_file() etc.

diff --git a/pypy/module/posix/interp_scandir.py 
b/pypy/module/posix/interp_scandir.py
--- a/pypy/module/posix/interp_scandir.py
+++ b/pypy/module/posix/interp_scandir.py
@@ -1,6 +1,5 @@
 from rpython.rlib import rgc
 from rpython.rlib import rposix_scandir
-from rpython.rtyper.lltypesystem import lltype
 
 from pypy.interpreter.gateway import unwrap_spec, WrappedDefault, interp2app
 from pypy.interpreter.error import OperationError, oefmt, wrap_oserror2
@@ -58,7 +57,7 @@
     def fail(self, err=None):
         dirp = self.dirp
         if dirp:
-            self.dirp = lltype.nullptr(lltype.typeOf(dirp).TO)
+            self.dirp = rposix_scandir.NULL_DIRP
             rposix_scandir.closedir(dirp)
         if err is None:
             raise OperationError(self.space.w_StopIteration, self.space.w_None)
@@ -66,8 +65,8 @@
             raise err
 
     def next_w(self):
-        # XXX not safe against being called on several threads for
-        # the same object, but I think that CPython has the same problem
+        # XXX not safe against being called on several threads for the
+        # same ScandirIterator, but I think that CPython has the same problem
         if not self.dirp:
             self.fail()
         #
@@ -75,20 +74,20 @@
         while True:
             try:
                 entry = rposix_scandir.nextentry(self.dirp)
-            except StopIteration:
-                self.fail()
             except OSError as e:
                 self.fail(wrap_oserror(space, e))
+            if not entry:
+                self.fail()
             assert rposix_scandir.has_name_bytes(entry)
             name = rposix_scandir.get_name_bytes(entry)
             if name != '.' and name != '..':
                 break
         #
+        known_type = rposix_scandir.get_known_type(entry)
         w_name = space.newbytes(name)
-        result_is_bytes = self.result_is_bytes
-        if not result_is_bytes:
+        if not self.result_is_bytes:
             w_name = space.fsdecode(w_name)
-        direntry = W_DirEntry(w_name, self.w_path_prefix, result_is_bytes)
+        direntry = W_DirEntry(w_name, self.w_path_prefix, known_type)
         return space.wrap(direntry)
 
 
@@ -103,10 +102,10 @@
 class W_DirEntry(W_Root):
     w_path = None
 
-    def __init__(self, w_name, w_path_prefix, result_is_bytes):
+    def __init__(self, w_name, w_path_prefix, known_type):
         self.w_name = w_name
         self.w_path_prefix = w_path_prefix
-        self.result_is_bytes = result_is_bytes
+        self.known_type = known_type
 
     def fget_name(self, space):
         return self.w_name
@@ -118,6 +117,44 @@
             self.w_path = w_path
         return w_path
 
+    def is_dir(self, follow_symlinks):
+        known_type = self.known_type
+        if known_type != rposix_scandir.DT_UNKNOWN:
+            if known_type == rposix_scandir.DT_DIR:
+                return True
+            if known_type != rposix_scandir.DT_LNK or not follow_symlinks:
+                return False
+        xxxx
+
+    def is_file(self, follow_symlinks):
+        known_type = self.known_type
+        if known_type != rposix_scandir.DT_UNKNOWN:
+            if known_type == rposix_scandir.DT_REG:
+                return True
+            if known_type != rposix_scandir.DT_LNK or not follow_symlinks:
+                return False
+        xxxx
+
+    def is_symlink(self):
+        known_type = self.known_type
+        if known_type != rposix_scandir.DT_UNKNOWN:
+            return known_type == rposix_scandir.DT_LNK
+        xxxx
+
+    @unwrap_spec(follow_symlinks=int)
+    def descr_is_dir(self, space, __kwonly__, follow_symlinks=1):
+        """return True if the entry is a directory; cached per entry"""
+        return space.wrap(self.is_dir(follow_symlinks))
+
+    @unwrap_spec(follow_symlinks=int)
+    def descr_is_file(self, space, __kwonly__, follow_symlinks=1):
+        """return True if the entry is a file; cached per entry"""
+        return space.wrap(self.is_file(follow_symlinks))
+
+    def descr_is_symlink(self, space):
+        """return True if the entry is a symbolic link; cached per entry"""
+        return space.wrap(self.is_symlink())
+
 W_DirEntry.typedef = TypeDef(
     'posix.DirEntry',
     name = GetSetProperty(W_DirEntry.fget_name,
@@ -126,5 +163,8 @@
     path = GetSetProperty(W_DirEntry.fget_path,
                           doc="the entry's full path name; equivalent to "
                               "os.path.join(scandir_path, entry.name)"),
+    is_dir = interp2app(W_DirEntry.descr_is_dir),
+    is_file = interp2app(W_DirEntry.descr_is_file),
+    is_symlink = interp2app(W_DirEntry.descr_is_symlink),
 )
 W_DirEntry.typedef.acceptable_as_base_class = False
diff --git a/pypy/module/posix/test/test_scandir.py 
b/pypy/module/posix/test/test_scandir.py
--- a/pypy/module/posix/test/test_scandir.py
+++ b/pypy/module/posix/test/test_scandir.py
@@ -7,7 +7,22 @@
     d = os.path.join(str(udir), dirname)
     os.mkdir(d)
     for key, value in content.items():
-        xxx
+        filename = os.path.join(d, key)
+        if value == 'dir':
+            os.mkdir(filename)
+        elif value == 'file':
+            with open(filename, 'w') as f:
+                pass
+        elif value == 'symlink-file':
+            os.symlink(str(udir.ensure('some_file')), filename)
+        elif value == 'symlink-dir':
+            os.symlink(str(udir), filename)
+        elif value == 'symlink-broken':
+            os.symlink(filename + '-broken', filename)
+        elif value == 'symlink-error':
+            os.symlink(filename, filename)
+        else:
+            raise NotImplementedError(repr(value))
     return d.decode(sys.getfilesystemencoding())
 
 
@@ -18,6 +33,15 @@
         space = cls.space
         cls.w_posix = space.appexec([], test_posix2.GET_POSIX)
         cls.w_dir_empty = space.wrap(_make_dir('empty', {}))
+        cls.w_dir0 = space.wrap(_make_dir('dir0', {'f1': 'file',
+                                                   'f2': 'file',
+                                                   'f3': 'file'}))
+        cls.w_dir1 = space.wrap(_make_dir('dir1', {'file1': 'file'}))
+        cls.w_dir2 = space.wrap(_make_dir('dir2', {'subdir2': 'dir'}))
+        cls.w_dir3 = space.wrap(_make_dir('dir3', {'sfile3': 'symlink-file'}))
+        cls.w_dir4 = space.wrap(_make_dir('dir4', {'sdir4': 'symlink-dir'}))
+        cls.w_dir5 = space.wrap(_make_dir('dir5', {'sbrok5': 
'symlink-broken'}))
+        cls.w_dir6 = space.wrap(_make_dir('dir6', {'serr6': 'symlink-error'}))
 
     def test_scandir_empty(self):
         posix = self.posix
@@ -25,6 +49,12 @@
         assert list(sd) == []
         assert list(sd) == []
 
+    def test_scandir_files(self):
+        posix = self.posix
+        sd = posix.scandir(self.dir0)
+        names = [d.name for d in sd]
+        assert sorted(names) == ['f1', 'f2', 'f3']
+
     def test_unicode_versus_bytes(self):
         posix = self.posix
         d = next(posix.scandir())
@@ -47,3 +77,56 @@
         assert type(d.name) is bytes
         assert type(d.path) is bytes
         assert d.path == b'/' + d.name
+
+    def test_dir1(self):
+        posix = self.posix
+        d = next(posix.scandir(self.dir1))
+        assert d.name == 'file1'
+        assert     d.is_file()
+        assert not d.is_dir()
+        assert not d.is_symlink()
+        raises(TypeError, d.is_file, True)
+
+    def test_dir2(self):
+        posix = self.posix
+        d = next(posix.scandir(self.dir2))
+        assert d.name == 'subdir2'
+        assert not d.is_file()
+        assert     d.is_dir()
+        assert not d.is_symlink()
+
+    def test_dir3(self):
+        posix = self.posix
+        d = next(posix.scandir(self.dir3))
+        assert d.name == 'sfile3'
+        assert     d.is_file()
+        assert not d.is_dir()
+        assert     d.is_symlink()
+        assert     d.is_file(follow_symlinks=True)
+        assert not d.is_file(follow_symlinks=False)
+
+    def test_dir4(self):
+        posix = self.posix
+        d = next(posix.scandir(self.dir4))
+        assert d.name == 'sdir4'
+        assert not d.is_file()
+        assert     d.is_dir()
+        assert     d.is_symlink()
+        assert     d.is_dir(follow_symlinks=True)
+        assert not d.is_dir(follow_symlinks=False)
+
+    def test_dir5(self):
+        posix = self.posix
+        d = next(posix.scandir(self.dir5))
+        assert d.name == 'sbrok5'
+        assert not d.is_file()
+        assert not d.is_dir()
+        assert     d.is_symlink()
+
+    def test_dir6(self):
+        posix = self.posix
+        d = next(posix.scandir(self.dir6))
+        assert d.name == 'serr6'
+        raises(OSError, d.is_file)
+        raises(OSError, d.is_dir)
+        assert d.is_symlink()
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to