Author: Armin Rigo <[email protected]>
Branch: py3.5-scandir
Changeset: r86384:7a230ed585cb
Date: 2016-08-21 21:12 +0200
http://bitbucket.org/pypy/pypy/changeset/7a230ed585cb/
Log: in-progress: is_file() etc.
diff --git a/pypy/module/posix/interp_scandir.py
b/pypy/module/posix/interp_scandir.py
--- a/pypy/module/posix/interp_scandir.py
+++ b/pypy/module/posix/interp_scandir.py
@@ -1,6 +1,5 @@
from rpython.rlib import rgc
from rpython.rlib import rposix_scandir
-from rpython.rtyper.lltypesystem import lltype
from pypy.interpreter.gateway import unwrap_spec, WrappedDefault, interp2app
from pypy.interpreter.error import OperationError, oefmt, wrap_oserror2
@@ -58,7 +57,7 @@
def fail(self, err=None):
dirp = self.dirp
if dirp:
- self.dirp = lltype.nullptr(lltype.typeOf(dirp).TO)
+ self.dirp = rposix_scandir.NULL_DIRP
rposix_scandir.closedir(dirp)
if err is None:
raise OperationError(self.space.w_StopIteration, self.space.w_None)
@@ -66,8 +65,8 @@
raise err
def next_w(self):
- # XXX not safe against being called on several threads for
- # the same object, but I think that CPython has the same problem
+ # XXX not safe against being called on several threads for the
+ # same ScandirIterator, but I think that CPython has the same problem
if not self.dirp:
self.fail()
#
@@ -75,20 +74,20 @@
while True:
try:
entry = rposix_scandir.nextentry(self.dirp)
- except StopIteration:
- self.fail()
except OSError as e:
self.fail(wrap_oserror(space, e))
+ if not entry:
+ self.fail()
assert rposix_scandir.has_name_bytes(entry)
name = rposix_scandir.get_name_bytes(entry)
if name != '.' and name != '..':
break
#
+ known_type = rposix_scandir.get_known_type(entry)
w_name = space.newbytes(name)
- result_is_bytes = self.result_is_bytes
- if not result_is_bytes:
+ if not self.result_is_bytes:
w_name = space.fsdecode(w_name)
- direntry = W_DirEntry(w_name, self.w_path_prefix, result_is_bytes)
+ direntry = W_DirEntry(w_name, self.w_path_prefix, known_type)
return space.wrap(direntry)
@@ -103,10 +102,10 @@
class W_DirEntry(W_Root):
w_path = None
- def __init__(self, w_name, w_path_prefix, result_is_bytes):
+ def __init__(self, w_name, w_path_prefix, known_type):
self.w_name = w_name
self.w_path_prefix = w_path_prefix
- self.result_is_bytes = result_is_bytes
+ self.known_type = known_type
def fget_name(self, space):
return self.w_name
@@ -118,6 +117,44 @@
self.w_path = w_path
return w_path
+ def is_dir(self, follow_symlinks):
+ known_type = self.known_type
+ if known_type != rposix_scandir.DT_UNKNOWN:
+ if known_type == rposix_scandir.DT_DIR:
+ return True
+ if known_type != rposix_scandir.DT_LNK or not follow_symlinks:
+ return False
+ xxxx
+
+ def is_file(self, follow_symlinks):
+ known_type = self.known_type
+ if known_type != rposix_scandir.DT_UNKNOWN:
+ if known_type == rposix_scandir.DT_REG:
+ return True
+ if known_type != rposix_scandir.DT_LNK or not follow_symlinks:
+ return False
+ xxxx
+
+ def is_symlink(self):
+ known_type = self.known_type
+ if known_type != rposix_scandir.DT_UNKNOWN:
+ return known_type == rposix_scandir.DT_LNK
+ xxxx
+
+ @unwrap_spec(follow_symlinks=int)
+ def descr_is_dir(self, space, __kwonly__, follow_symlinks=1):
+ """return True if the entry is a directory; cached per entry"""
+ return space.wrap(self.is_dir(follow_symlinks))
+
+ @unwrap_spec(follow_symlinks=int)
+ def descr_is_file(self, space, __kwonly__, follow_symlinks=1):
+ """return True if the entry is a file; cached per entry"""
+ return space.wrap(self.is_file(follow_symlinks))
+
+ def descr_is_symlink(self, space):
+ """return True if the entry is a symbolic link; cached per entry"""
+ return space.wrap(self.is_symlink())
+
W_DirEntry.typedef = TypeDef(
'posix.DirEntry',
name = GetSetProperty(W_DirEntry.fget_name,
@@ -126,5 +163,8 @@
path = GetSetProperty(W_DirEntry.fget_path,
doc="the entry's full path name; equivalent to "
"os.path.join(scandir_path, entry.name)"),
+ is_dir = interp2app(W_DirEntry.descr_is_dir),
+ is_file = interp2app(W_DirEntry.descr_is_file),
+ is_symlink = interp2app(W_DirEntry.descr_is_symlink),
)
W_DirEntry.typedef.acceptable_as_base_class = False
diff --git a/pypy/module/posix/test/test_scandir.py
b/pypy/module/posix/test/test_scandir.py
--- a/pypy/module/posix/test/test_scandir.py
+++ b/pypy/module/posix/test/test_scandir.py
@@ -7,7 +7,22 @@
d = os.path.join(str(udir), dirname)
os.mkdir(d)
for key, value in content.items():
- xxx
+ filename = os.path.join(d, key)
+ if value == 'dir':
+ os.mkdir(filename)
+ elif value == 'file':
+ with open(filename, 'w') as f:
+ pass
+ elif value == 'symlink-file':
+ os.symlink(str(udir.ensure('some_file')), filename)
+ elif value == 'symlink-dir':
+ os.symlink(str(udir), filename)
+ elif value == 'symlink-broken':
+ os.symlink(filename + '-broken', filename)
+ elif value == 'symlink-error':
+ os.symlink(filename, filename)
+ else:
+ raise NotImplementedError(repr(value))
return d.decode(sys.getfilesystemencoding())
@@ -18,6 +33,15 @@
space = cls.space
cls.w_posix = space.appexec([], test_posix2.GET_POSIX)
cls.w_dir_empty = space.wrap(_make_dir('empty', {}))
+ cls.w_dir0 = space.wrap(_make_dir('dir0', {'f1': 'file',
+ 'f2': 'file',
+ 'f3': 'file'}))
+ cls.w_dir1 = space.wrap(_make_dir('dir1', {'file1': 'file'}))
+ cls.w_dir2 = space.wrap(_make_dir('dir2', {'subdir2': 'dir'}))
+ cls.w_dir3 = space.wrap(_make_dir('dir3', {'sfile3': 'symlink-file'}))
+ cls.w_dir4 = space.wrap(_make_dir('dir4', {'sdir4': 'symlink-dir'}))
+ cls.w_dir5 = space.wrap(_make_dir('dir5', {'sbrok5':
'symlink-broken'}))
+ cls.w_dir6 = space.wrap(_make_dir('dir6', {'serr6': 'symlink-error'}))
def test_scandir_empty(self):
posix = self.posix
@@ -25,6 +49,12 @@
assert list(sd) == []
assert list(sd) == []
+ def test_scandir_files(self):
+ posix = self.posix
+ sd = posix.scandir(self.dir0)
+ names = [d.name for d in sd]
+ assert sorted(names) == ['f1', 'f2', 'f3']
+
def test_unicode_versus_bytes(self):
posix = self.posix
d = next(posix.scandir())
@@ -47,3 +77,56 @@
assert type(d.name) is bytes
assert type(d.path) is bytes
assert d.path == b'/' + d.name
+
+ def test_dir1(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir1))
+ assert d.name == 'file1'
+ assert d.is_file()
+ assert not d.is_dir()
+ assert not d.is_symlink()
+ raises(TypeError, d.is_file, True)
+
+ def test_dir2(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir2))
+ assert d.name == 'subdir2'
+ assert not d.is_file()
+ assert d.is_dir()
+ assert not d.is_symlink()
+
+ def test_dir3(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir3))
+ assert d.name == 'sfile3'
+ assert d.is_file()
+ assert not d.is_dir()
+ assert d.is_symlink()
+ assert d.is_file(follow_symlinks=True)
+ assert not d.is_file(follow_symlinks=False)
+
+ def test_dir4(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir4))
+ assert d.name == 'sdir4'
+ assert not d.is_file()
+ assert d.is_dir()
+ assert d.is_symlink()
+ assert d.is_dir(follow_symlinks=True)
+ assert not d.is_dir(follow_symlinks=False)
+
+ def test_dir5(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir5))
+ assert d.name == 'sbrok5'
+ assert not d.is_file()
+ assert not d.is_dir()
+ assert d.is_symlink()
+
+ def test_dir6(self):
+ posix = self.posix
+ d = next(posix.scandir(self.dir6))
+ assert d.name == 'serr6'
+ raises(OSError, d.is_file)
+ raises(OSError, d.is_dir)
+ assert d.is_symlink()
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit