Author: Armin Rigo <[email protected]>
Branch: py3.5-scandir
Changeset: r86387:cfe6bf314382
Date: 2016-08-22 01:03 +0200
http://bitbucket.org/pypy/pypy/changeset/cfe6bf314382/
Log: Finish the first draft of os.scandir() (missing: Win32)
diff --git a/pypy/module/posix/interp_scandir.py
b/pypy/module/posix/interp_scandir.py
--- a/pypy/module/posix/interp_scandir.py
+++ b/pypy/module/posix/interp_scandir.py
@@ -1,12 +1,14 @@
+import stat
+from errno import ENOENT
from rpython.rlib import rgc
-from rpython.rlib import rposix_scandir
+from rpython.rlib import rposix, rposix_scandir, rposix_stat
from pypy.interpreter.gateway import unwrap_spec, WrappedDefault, interp2app
from pypy.interpreter.error import OperationError, oefmt, wrap_oserror2
from pypy.interpreter.typedef import TypeDef, GetSetProperty
from pypy.interpreter.baseobjspace import W_Root
-from pypy.module.posix.interp_posix import unwrap_fd
+from pypy.module.posix.interp_posix import unwrap_fd, build_stat_result
@unwrap_spec(w_path=WrappedDefault(u"."))
@@ -36,13 +38,18 @@
w_path_prefix = space.newbytes(path_prefix)
if not result_is_bytes:
w_path_prefix = space.fsdecode(w_path_prefix)
- return W_ScandirIterator(space, dirp, w_path_prefix, result_is_bytes)
+ if rposix.HAVE_FSTATAT:
+ dirfd = rposix.c_dirfd(dirp)
+ else:
+ dirfd = -1
+ return W_ScandirIterator(space, dirp, dirfd, w_path_prefix,
result_is_bytes)
class W_ScandirIterator(W_Root):
- def __init__(self, space, dirp, w_path_prefix, result_is_bytes):
+ def __init__(self, space, dirp, dirfd, w_path_prefix, result_is_bytes):
self.space = space
self.dirp = dirp
+ self.dirfd = dirfd
self.w_path_prefix = w_path_prefix
self.result_is_bytes = result_is_bytes
@@ -57,6 +64,7 @@
def fail(self, err=None):
dirp = self.dirp
if dirp:
+ self.dirfd = -1
self.dirp = rposix_scandir.NULL_DIRP
rposix_scandir.closedir(dirp)
if err is None:
@@ -84,10 +92,7 @@
break
#
known_type = rposix_scandir.get_known_type(entry)
- w_name = space.newbytes(name)
- if not self.result_is_bytes:
- w_name = space.fsdecode(w_name)
- direntry = W_DirEntry(w_name, self.w_path_prefix, known_type)
+ direntry = W_DirEntry(self, name, known_type)
return space.wrap(direntry)
@@ -99,13 +104,31 @@
W_ScandirIterator.typedef.acceptable_as_base_class = False
+class FileNotFound(Exception):
+ pass
+
+assert 0 <= rposix_scandir.DT_UNKNOWN <= 255
+assert 0 <= rposix_scandir.DT_REG <= 255
+assert 0 <= rposix_scandir.DT_DIR <= 255
+assert 0 <= rposix_scandir.DT_LNK <= 255
+FLAG_STAT = 256
+FLAG_LSTAT = 512
+
+
class W_DirEntry(W_Root):
w_path = None
- def __init__(self, w_name, w_path_prefix, known_type):
+ def __init__(self, scandir_iterator, name, known_type):
+ self.space = scandir_iterator.space
+ self.scandir_iterator = scandir_iterator
+ self.name = name # always bytes on Posix
+ self.flags = known_type
+ assert known_type == (known_type & 255)
+ #
+ w_name = self.space.newbytes(name)
+ if not scandir_iterator.result_is_bytes:
+ w_name = self.space.fsdecode(w_name)
self.w_name = w_name
- self.w_path_prefix = w_path_prefix
- self.known_type = known_type
def fget_name(self, space):
return self.w_name
@@ -113,33 +136,126 @@
def fget_path(self, space):
w_path = self.w_path
if w_path is None:
- w_path = space.add(self.w_path_prefix, self.w_name)
+ w_path_prefix = self.scandir_iterator.w_path_prefix
+ w_path = space.add(w_path_prefix, self.w_name)
self.w_path = w_path
return w_path
+ # The internal methods, used to implement the public methods at
+ # the end of the class. Every method only calls methods *before*
+ # it in program order, so there is no cycle.
+
+ def get_lstat(self):
+ """Get the lstat() of the direntry."""
+ if (self.flags & FLAG_LSTAT) == 0:
+ # Unlike CPython, try to use fstatat() if possible
+ dirfd = self.scandir_iterator.dirfd
+ if dirfd != -1:
+ st = rposix_stat.fstatat(self.name, dirfd,
+ follow_symlinks=False)
+ else:
+ path = self.space.fsencode_w(self.fget_path(self.space))
+ st = rposix_stat.lstat(path)
+ self.d_lstat = st
+ self.flags |= FLAG_LSTAT
+ return self.d_lstat
+
+ def get_stat(self):
+ """Get the stat() of the direntry. This is implemented in
+ such a way that it won't do both a stat() and a lstat().
+ """
+ if (self.flags & FLAG_STAT) == 0:
+ # We don't have the 'd_stat'. If the known_type says the
+ # direntry is not a DT_LNK, then try to get and cache the
+ # 'd_lstat' instead. Then, or if we already have a
+ # 'd_lstat' from before, *and* if the 'd_lstat' is not a
+ # S_ISLNK, we can reuse it unchanged for 'd_stat'.
+ #
+ # Note how, in the common case where the known_type says
+ # it is a DT_REG or DT_DIR, then we call and cache lstat()
+ # and that's it. Also note that in a d_type-less OS or on
+ # a filesystem that always answer DT_UNKNOWN, this method
+ # will instead only call at most stat(), but not cache it
+ # as 'd_lstat'.
+ known_type = self.flags & 255
+ if (known_type != rposix_scandir.DT_UNKNOWN and
+ known_type != rposix_scandir.DT_LNK):
+ self.get_lstat() # fill the 'd_lstat' cache
+ have_lstat = True
+ else:
+ have_lstat = (self.flags & FLAG_LSTAT) != 0
+
+ if have_lstat:
+ # We have the lstat() but not the stat(). They are
+ # the same, unless the 'd_lstat' is a S_IFLNK.
+ must_call_stat = stat.S_ISLNK(self.d_lstat.st_mode)
+ else:
+ must_call_stat = True
+
+ if must_call_stat:
+ # Must call stat(). Try to use fstatat() if possible
+ dirfd = self.scandir_iterator.dirfd
+ if dirfd != -1:
+ st = rposix_stat.fstatat(self.name, dirfd,
+ follow_symlinks=True)
+ else:
+ path = self.space.fsencode_w(self.fget_path(self.space))
+ st = rposix_stat.stat(path)
+ else:
+ st = self.d_lstat
+
+ self.d_stat = st
+ self.flags |= FLAG_STAT
+ return self.d_stat
+
+ def get_stat_or_lstat(self, follow_symlinks):
+ if follow_symlinks:
+ return self.get_stat()
+ else:
+ return self.get_lstat()
+
+ def check_mode(self, follow_symlinks):
+ """Get the stat() or lstat() of the direntry, and return the
+ S_IFMT. If calling stat()/lstat() gives us ENOENT, return -1
+ instead; it is better to give up and answer "no, not this type"
+ to requests, rather than propagate the error.
+ """
+ try:
+ st = self.get_stat_or_lstat(follow_symlinks)
+ except OSError as e:
+ if e.errno == ENOENT: # not found
+ return -1
+ raise wrap_oserror2(self.space, e, self.fget_path(self.space))
+ return stat.S_IFMT(st.st_mode)
+
def is_dir(self, follow_symlinks):
- known_type = self.known_type
+ known_type = self.flags & 255
if known_type != rposix_scandir.DT_UNKNOWN:
if known_type == rposix_scandir.DT_DIR:
return True
- if known_type != rposix_scandir.DT_LNK or not follow_symlinks:
+ elif follow_symlinks and known_type == rposix_scandir.DT_LNK:
+ pass # don't know in this case
+ else:
return False
- xxxx
+ return self.check_mode(follow_symlinks) == stat.S_IFDIR
def is_file(self, follow_symlinks):
- known_type = self.known_type
+ known_type = self.flags & 255
if known_type != rposix_scandir.DT_UNKNOWN:
if known_type == rposix_scandir.DT_REG:
return True
- if known_type != rposix_scandir.DT_LNK or not follow_symlinks:
+ elif follow_symlinks and known_type == rposix_scandir.DT_LNK:
+ pass # don't know in this case
+ else:
return False
- xxxx
+ return self.check_mode(follow_symlinks) == stat.S_IFREG
def is_symlink(self):
- known_type = self.known_type
+ """Check if the direntry is a symlink. May get the lstat()."""
+ known_type = self.flags & 255
if known_type != rposix_scandir.DT_UNKNOWN:
return known_type == rposix_scandir.DT_LNK
- xxxx
+ return self.check_mode(follow_symlinks=False) == stat.S_IFLNK
@unwrap_spec(follow_symlinks=int)
def descr_is_dir(self, space, __kwonly__, follow_symlinks=1):
@@ -155,6 +271,13 @@
"""return True if the entry is a symbolic link; cached per entry"""
return space.wrap(self.is_symlink())
+ @unwrap_spec(follow_symlinks=int)
+ def descr_stat(self, space, __kwonly__, follow_symlinks=1):
+ """return stat_result object for the entry; cached per entry"""
+ st = self.get_stat_or_lstat(follow_symlinks)
+ return build_stat_result(self.space, st)
+
+
W_DirEntry.typedef = TypeDef(
'posix.DirEntry',
name = GetSetProperty(W_DirEntry.fget_name,
@@ -166,5 +289,6 @@
is_dir = interp2app(W_DirEntry.descr_is_dir),
is_file = interp2app(W_DirEntry.descr_is_file),
is_symlink = interp2app(W_DirEntry.descr_is_symlink),
+ stat = interp2app(W_DirEntry.descr_stat),
)
W_DirEntry.typedef.acceptable_as_base_class = False
diff --git a/pypy/module/posix/test/test_scandir.py
b/pypy/module/posix/test/test_scandir.py
--- a/pypy/module/posix/test/test_scandir.py
+++ b/pypy/module/posix/test/test_scandir.py
@@ -78,6 +78,21 @@
assert type(d.path) is bytes
assert d.path == b'/' + d.name
+ def test_stat1(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir1))
+ assert d.name == 'file1'
+ assert d.stat().st_mode & 0o170000 == 0o100000 # S_IFREG
+ assert d.stat().st_size == 0
+
+ def test_stat4(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir4))
+ assert d.name == 'sdir4'
+ assert d.stat().st_mode & 0o170000 == 0o040000 # S_IFDIR
+ assert d.stat(follow_symlinks=True).st_mode &0o170000 == 0o040000
+ assert d.stat(follow_symlinks=False).st_mode&0o170000 == 0o120000
#IFLNK
+
def test_dir1(self):
posix = self.posix
d = next(posix.scandir(self.dir1))
@@ -86,6 +101,8 @@
assert not d.is_dir()
assert not d.is_symlink()
raises(TypeError, d.is_file, True)
+ assert d.is_file(follow_symlinks=False)
+ assert not d.is_dir(follow_symlinks=False)
def test_dir2(self):
posix = self.posix
@@ -94,6 +111,8 @@
assert not d.is_file()
assert d.is_dir()
assert not d.is_symlink()
+ assert not d.is_file(follow_symlinks=False)
+ assert d.is_dir(follow_symlinks=False)
def test_dir3(self):
posix = self.posix
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit