Author: Armin Rigo <[email protected]>
Branch: py3.5
Changeset: r86402:8927e9200840
Date: 2016-08-22 11:01 +0200
http://bitbucket.org/pypy/pypy/changeset/8927e9200840/
Log: hg merge py3.5-scandir
diff --git a/pypy/module/posix/interp_scandir.py
b/pypy/module/posix/interp_scandir.py
--- a/pypy/module/posix/interp_scandir.py
+++ b/pypy/module/posix/interp_scandir.py
@@ -1,30 +1,26 @@
+import stat
+from errno import ENOENT
from rpython.rlib import rgc
-from rpython.rlib import rposix_scandir
-from rpython.rtyper.lltypesystem import lltype
+from rpython.rlib import rposix, rposix_scandir, rposix_stat
from pypy.interpreter.gateway import unwrap_spec, WrappedDefault, interp2app
from pypy.interpreter.error import OperationError, oefmt, wrap_oserror2
from pypy.interpreter.typedef import TypeDef, GetSetProperty
from pypy.interpreter.baseobjspace import W_Root
-from pypy.module.posix.interp_posix import unwrap_fd
+from pypy.module.posix.interp_posix import unwrap_fd, build_stat_result
-@unwrap_spec(w_path=WrappedDefault(u"."))
-def scandir(space, w_path):
+def scandir(space, w_path=None):
"scandir(path='.') -> iterator of DirEntry objects for given path"
+ if space.is_none(w_path):
+ w_path = space.newunicode(u".")
if space.isinstance_w(w_path, space.w_bytes):
path_bytes = space.str0_w(w_path)
result_is_bytes = True
else:
- try:
- path_bytes = space.fsencode_w(w_path)
- except OperationError as operr:
- if operr.async(space):
- raise
- fd = unwrap_fd(space, w_path, "string, bytes or integer")
- XXXX
+ path_bytes = space.fsencode_w(w_path)
result_is_bytes = False
try:
@@ -37,13 +33,20 @@
w_path_prefix = space.newbytes(path_prefix)
if not result_is_bytes:
w_path_prefix = space.fsdecode(w_path_prefix)
- return W_ScandirIterator(space, dirp, w_path_prefix, result_is_bytes)
+ if rposix.HAVE_FSTATAT:
+ dirfd = rposix.c_dirfd(dirp)
+ else:
+ dirfd = -1
+ return W_ScandirIterator(space, dirp, dirfd, w_path_prefix,
result_is_bytes)
class W_ScandirIterator(W_Root):
- def __init__(self, space, dirp, w_path_prefix, result_is_bytes):
+ _in_next = False
+
+ def __init__(self, space, dirp, dirfd, w_path_prefix, result_is_bytes):
self.space = space
self.dirp = dirp
+ self.dirfd = dirfd
self.w_path_prefix = w_path_prefix
self.result_is_bytes = result_is_bytes
@@ -58,7 +61,8 @@
def fail(self, err=None):
dirp = self.dirp
if dirp:
- self.dirp = lltype.nullptr(lltype.typeOf(dirp).TO)
+ self.dirfd = -1
+ self.dirp = rposix_scandir.NULL_DIRP
rposix_scandir.closedir(dirp)
if err is None:
raise OperationError(self.space.w_StopIteration, self.space.w_None)
@@ -66,29 +70,31 @@
raise err
def next_w(self):
- # XXX not safe against being called on several threads for
- # the same object, but I think that CPython has the same problem
if not self.dirp:
self.fail()
- #
- space = self.space
- while True:
- try:
- entry = rposix_scandir.nextentry(self.dirp)
- except StopIteration:
- self.fail()
- except OSError as e:
- self.fail(wrap_oserror(space, e))
- assert rposix_scandir.has_name_bytes(entry)
- name = rposix_scandir.get_name_bytes(entry)
- if name != '.' and name != '..':
- break
- #
- w_name = space.newbytes(name)
- result_is_bytes = self.result_is_bytes
- if not result_is_bytes:
- w_name = space.fsdecode(w_name)
- direntry = W_DirEntry(w_name, self.w_path_prefix, result_is_bytes)
+ if self._in_next:
+ self.fail(oefmt(self.space.w_RuntimeError,
+ "cannot use ScandirIterator from multiple threads
concurrently"))
+ self._in_next = True
+ try:
+ #
+ space = self.space
+ while True:
+ try:
+ entry = rposix_scandir.nextentry(self.dirp)
+ except OSError as e:
+ self.fail(wrap_oserror(space, e))
+ if not entry:
+ self.fail()
+ assert rposix_scandir.has_name_bytes(entry)
+ name = rposix_scandir.get_name_bytes(entry)
+ if name != '.' and name != '..':
+ break
+ #
+ known_type = rposix_scandir.get_known_type(entry)
+ finally:
+ self._in_next = False
+ direntry = W_DirEntry(self, name, known_type)
return space.wrap(direntry)
@@ -100,13 +106,31 @@
W_ScandirIterator.typedef.acceptable_as_base_class = False
+class FileNotFound(Exception):
+ pass
+
+assert 0 <= rposix_scandir.DT_UNKNOWN <= 255
+assert 0 <= rposix_scandir.DT_REG <= 255
+assert 0 <= rposix_scandir.DT_DIR <= 255
+assert 0 <= rposix_scandir.DT_LNK <= 255
+FLAG_STAT = 256
+FLAG_LSTAT = 512
+
+
class W_DirEntry(W_Root):
w_path = None
- def __init__(self, w_name, w_path_prefix, result_is_bytes):
+ def __init__(self, scandir_iterator, name, known_type):
+ self.space = scandir_iterator.space
+ self.scandir_iterator = scandir_iterator
+ self.name = name # always bytes on Posix
+ self.flags = known_type
+ assert known_type == (known_type & 255)
+ #
+ w_name = self.space.newbytes(name)
+ if not scandir_iterator.result_is_bytes:
+ w_name = self.space.fsdecode(w_name)
self.w_name = w_name
- self.w_path_prefix = w_path_prefix
- self.result_is_bytes = result_is_bytes
def fget_name(self, space):
return self.w_name
@@ -114,10 +138,148 @@
def fget_path(self, space):
w_path = self.w_path
if w_path is None:
- w_path = space.add(self.w_path_prefix, self.w_name)
+ w_path_prefix = self.scandir_iterator.w_path_prefix
+ w_path = space.add(w_path_prefix, self.w_name)
self.w_path = w_path
return w_path
+ # The internal methods, used to implement the public methods at
+ # the end of the class. Every method only calls methods *before*
+ # it in program order, so there is no cycle.
+
+ def get_lstat(self):
+ """Get the lstat() of the direntry."""
+ if (self.flags & FLAG_LSTAT) == 0:
+ # Unlike CPython, try to use fstatat() if possible
+ dirfd = self.scandir_iterator.dirfd
+ if dirfd != -1:
+ st = rposix_stat.fstatat(self.name, dirfd,
+ follow_symlinks=False)
+ else:
+ path = self.space.fsencode_w(self.fget_path(self.space))
+ st = rposix_stat.lstat(path)
+ self.d_lstat = st
+ self.flags |= FLAG_LSTAT
+ return self.d_lstat
+
+ def get_stat(self):
+ """Get the stat() of the direntry. This is implemented in
+ such a way that it won't do both a stat() and a lstat().
+ """
+ if (self.flags & FLAG_STAT) == 0:
+ # We don't have the 'd_stat'. If the known_type says the
+ # direntry is not a DT_LNK, then try to get and cache the
+ # 'd_lstat' instead. Then, or if we already have a
+ # 'd_lstat' from before, *and* if the 'd_lstat' is not a
+ # S_ISLNK, we can reuse it unchanged for 'd_stat'.
+ #
+ # Note how, in the common case where the known_type says
+ # it is a DT_REG or DT_DIR, then we call and cache lstat()
+ # and that's it. Also note that in a d_type-less OS or on
+ # a filesystem that always answer DT_UNKNOWN, this method
+ # will instead only call at most stat(), but not cache it
+ # as 'd_lstat'.
+ known_type = self.flags & 255
+ if (known_type != rposix_scandir.DT_UNKNOWN and
+ known_type != rposix_scandir.DT_LNK):
+ self.get_lstat() # fill the 'd_lstat' cache
+ have_lstat = True
+ else:
+ have_lstat = (self.flags & FLAG_LSTAT) != 0
+
+ if have_lstat:
+ # We have the lstat() but not the stat(). They are
+ # the same, unless the 'd_lstat' is a S_IFLNK.
+ must_call_stat = stat.S_ISLNK(self.d_lstat.st_mode)
+ else:
+ must_call_stat = True
+
+ if must_call_stat:
+ # Must call stat(). Try to use fstatat() if possible
+ dirfd = self.scandir_iterator.dirfd
+ if dirfd != -1:
+ st = rposix_stat.fstatat(self.name, dirfd,
+ follow_symlinks=True)
+ else:
+ path = self.space.fsencode_w(self.fget_path(self.space))
+ st = rposix_stat.stat(path)
+ else:
+ st = self.d_lstat
+
+ self.d_stat = st
+ self.flags |= FLAG_STAT
+ return self.d_stat
+
+ def get_stat_or_lstat(self, follow_symlinks):
+ if follow_symlinks:
+ return self.get_stat()
+ else:
+ return self.get_lstat()
+
+ def check_mode(self, follow_symlinks):
+ """Get the stat() or lstat() of the direntry, and return the
+ S_IFMT. If calling stat()/lstat() gives us ENOENT, return -1
+ instead; it is better to give up and answer "no, not this type"
+ to requests, rather than propagate the error.
+ """
+ try:
+ st = self.get_stat_or_lstat(follow_symlinks)
+ except OSError as e:
+ if e.errno == ENOENT: # not found
+ return -1
+ raise wrap_oserror2(self.space, e, self.fget_path(self.space))
+ return stat.S_IFMT(st.st_mode)
+
+ def is_dir(self, follow_symlinks):
+ known_type = self.flags & 255
+ if known_type != rposix_scandir.DT_UNKNOWN:
+ if known_type == rposix_scandir.DT_DIR:
+ return True
+ elif follow_symlinks and known_type == rposix_scandir.DT_LNK:
+ pass # don't know in this case
+ else:
+ return False
+ return self.check_mode(follow_symlinks) == stat.S_IFDIR
+
+ def is_file(self, follow_symlinks):
+ known_type = self.flags & 255
+ if known_type != rposix_scandir.DT_UNKNOWN:
+ if known_type == rposix_scandir.DT_REG:
+ return True
+ elif follow_symlinks and known_type == rposix_scandir.DT_LNK:
+ pass # don't know in this case
+ else:
+ return False
+ return self.check_mode(follow_symlinks) == stat.S_IFREG
+
+ def is_symlink(self):
+ """Check if the direntry is a symlink. May get the lstat()."""
+ known_type = self.flags & 255
+ if known_type != rposix_scandir.DT_UNKNOWN:
+ return known_type == rposix_scandir.DT_LNK
+ return self.check_mode(follow_symlinks=False) == stat.S_IFLNK
+
+ @unwrap_spec(follow_symlinks=int)
+ def descr_is_dir(self, space, __kwonly__, follow_symlinks=1):
+ """return True if the entry is a directory; cached per entry"""
+ return space.wrap(self.is_dir(follow_symlinks))
+
+ @unwrap_spec(follow_symlinks=int)
+ def descr_is_file(self, space, __kwonly__, follow_symlinks=1):
+ """return True if the entry is a file; cached per entry"""
+ return space.wrap(self.is_file(follow_symlinks))
+
+ def descr_is_symlink(self, space):
+ """return True if the entry is a symbolic link; cached per entry"""
+ return space.wrap(self.is_symlink())
+
+ @unwrap_spec(follow_symlinks=int)
+ def descr_stat(self, space, __kwonly__, follow_symlinks=1):
+ """return stat_result object for the entry; cached per entry"""
+ st = self.get_stat_or_lstat(follow_symlinks)
+ return build_stat_result(self.space, st)
+
+
W_DirEntry.typedef = TypeDef(
'posix.DirEntry',
name = GetSetProperty(W_DirEntry.fget_name,
@@ -126,5 +288,9 @@
path = GetSetProperty(W_DirEntry.fget_path,
doc="the entry's full path name; equivalent to "
"os.path.join(scandir_path, entry.name)"),
+ is_dir = interp2app(W_DirEntry.descr_is_dir),
+ is_file = interp2app(W_DirEntry.descr_is_file),
+ is_symlink = interp2app(W_DirEntry.descr_is_symlink),
+ stat = interp2app(W_DirEntry.descr_stat),
)
W_DirEntry.typedef.acceptable_as_base_class = False
diff --git a/pypy/module/posix/test/test_scandir.py
b/pypy/module/posix/test/test_scandir.py
--- a/pypy/module/posix/test/test_scandir.py
+++ b/pypy/module/posix/test/test_scandir.py
@@ -7,7 +7,22 @@
d = os.path.join(str(udir), dirname)
os.mkdir(d)
for key, value in content.items():
- xxx
+ filename = os.path.join(d, key)
+ if value == 'dir':
+ os.mkdir(filename)
+ elif value == 'file':
+ with open(filename, 'w') as f:
+ pass
+ elif value == 'symlink-file':
+ os.symlink(str(udir.ensure('some_file')), filename)
+ elif value == 'symlink-dir':
+ os.symlink(str(udir), filename)
+ elif value == 'symlink-broken':
+ os.symlink(filename + '-broken', filename)
+ elif value == 'symlink-error':
+ os.symlink(filename, filename)
+ else:
+ raise NotImplementedError(repr(value))
return d.decode(sys.getfilesystemencoding())
@@ -18,6 +33,15 @@
space = cls.space
cls.w_posix = space.appexec([], test_posix2.GET_POSIX)
cls.w_dir_empty = space.wrap(_make_dir('empty', {}))
+ cls.w_dir0 = space.wrap(_make_dir('dir0', {'f1': 'file',
+ 'f2': 'file',
+ 'f3': 'file'}))
+ cls.w_dir1 = space.wrap(_make_dir('dir1', {'file1': 'file'}))
+ cls.w_dir2 = space.wrap(_make_dir('dir2', {'subdir2': 'dir'}))
+ cls.w_dir3 = space.wrap(_make_dir('dir3', {'sfile3': 'symlink-file'}))
+ cls.w_dir4 = space.wrap(_make_dir('dir4', {'sdir4': 'symlink-dir'}))
+ cls.w_dir5 = space.wrap(_make_dir('dir5', {'sbrok5':
'symlink-broken'}))
+ cls.w_dir6 = space.wrap(_make_dir('dir6', {'serr6': 'symlink-error'}))
def test_scandir_empty(self):
posix = self.posix
@@ -25,12 +49,22 @@
assert list(sd) == []
assert list(sd) == []
+ def test_scandir_files(self):
+ posix = self.posix
+ sd = posix.scandir(self.dir0)
+ names = [d.name for d in sd]
+ assert sorted(names) == ['f1', 'f2', 'f3']
+
def test_unicode_versus_bytes(self):
posix = self.posix
d = next(posix.scandir())
assert type(d.name) is str
assert type(d.path) is str
assert d.path == './' + d.name
+ d = next(posix.scandir(None))
+ assert type(d.name) is str
+ assert type(d.path) is str
+ assert d.path == './' + d.name
d = next(posix.scandir(u'.'))
assert type(d.name) is str
assert type(d.path) is str
@@ -47,3 +81,79 @@
assert type(d.name) is bytes
assert type(d.path) is bytes
assert d.path == b'/' + d.name
+
+ def test_stat1(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir1))
+ assert d.name == 'file1'
+ assert d.stat().st_mode & 0o170000 == 0o100000 # S_IFREG
+ assert d.stat().st_size == 0
+
+ def test_stat4(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir4))
+ assert d.name == 'sdir4'
+ assert d.stat().st_mode & 0o170000 == 0o040000 # S_IFDIR
+ assert d.stat(follow_symlinks=True).st_mode &0o170000 == 0o040000
+ assert d.stat(follow_symlinks=False).st_mode&0o170000 == 0o120000
#IFLNK
+
+ def test_dir1(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir1))
+ assert d.name == 'file1'
+ assert d.is_file()
+ assert not d.is_dir()
+ assert not d.is_symlink()
+ raises(TypeError, d.is_file, True)
+ assert d.is_file(follow_symlinks=False)
+ assert not d.is_dir(follow_symlinks=False)
+
+ def test_dir2(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir2))
+ assert d.name == 'subdir2'
+ assert not d.is_file()
+ assert d.is_dir()
+ assert not d.is_symlink()
+ assert not d.is_file(follow_symlinks=False)
+ assert d.is_dir(follow_symlinks=False)
+
+ def test_dir3(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir3))
+ assert d.name == 'sfile3'
+ assert d.is_file()
+ assert not d.is_dir()
+ assert d.is_symlink()
+ assert d.is_file(follow_symlinks=True)
+ assert not d.is_file(follow_symlinks=False)
+
+ def test_dir4(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir4))
+ assert d.name == 'sdir4'
+ assert not d.is_file()
+ assert d.is_dir()
+ assert d.is_symlink()
+ assert d.is_dir(follow_symlinks=True)
+ assert not d.is_dir(follow_symlinks=False)
+
+ def test_dir5(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir5))
+ assert d.name == 'sbrok5'
+ assert not d.is_file()
+ assert not d.is_dir()
+ assert d.is_symlink()
+
+ def test_dir6(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir6))
+ assert d.name == 'serr6'
+ raises(OSError, d.is_file)
+ raises(OSError, d.is_dir)
+ assert d.is_symlink()
+
+ def test_fdopendir_unsupported(self):
+ posix = self.posix
+ raises(TypeError, posix.scandir, 1234)
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit