On Wed, 13 Oct 2010 18:32:15 +0400, Andrei Alexandrescu <seewebsiteforem...@erdani.org> wrote:

On 10/11/2010 07:49 PM, Daniel Gibson wrote:
Andrei Alexandrescu schrieb:
Agreed. Maybe this is a good time to sart making a requirements list
for streams. What are the essential features/feature groups?

Andrei

Maybe something like the following (I hope it's not too extensive):

* Input- Output- and InputAndOutput- Streams
- having InputStream and OutputStream as an interface like in the old
design may be a good idea
- implementing the standard operations that are mostly independent from
the data source/sink
like read/write for basic types, strings, ... in mixin templates is
probably elegant to create
streams that are both Input and Output (one mixin that implements most
of InputStream and
one that implements most of OutputStream)

So far so good. I will point out, however, that the classic read/write routines are not all that good. For example if you want to implement a line-buffered stream on top of a block-buffered stream you'll be forced to write inefficient code.


Never heard of filesystems that allow reading files in lines - they always read in blocks, and that's what streams should do. That's because most of the steams are binary streams, and there is no such thing as a "line" in them (e.g. how often do you need to read a line from a SocketStream?).

I don't think streams should buffer anything either (what an underlying OS I/O API caches should suffice), buffered streams adapters can do that in a stream-independent way (why duplicate code when you can do that as efficiently with external methods?).

Besides, as you noted, the buffering is redundant for byChunk/byLine adapter ranges. It means that byChunk/byLine should operate on unbuffered streams.

I'll explain my I/O streams implementation below in case you didn't read my message (I've changed some stuff a little since then). My Stream interface is very simple:

// A generic stream
interface Stream
{
    @property InputStream input();
    @property OutputStream output();
    @property SeekableStream seekable();
    @property bool endOfStream();
    void close();
}

You may ask, why separate Input and Output streams? Well, that's because you either read from them, write from them, or both. Some streams are read-only (think Stdin), some write-only (Stdout), some support both, like FileStream. Right?

Not exactly. Does FileStream support writing when you open file for reading? Does it support reading when you open for writing? So, you may or may not read from a generic stream, and you also may or may not write to a generic stream. With a design like that you can make a mistake: if a stream isn't readable, you have no reference to invoke read() method on.

Similarly, a stream is either seekable, or not. SeekableStreams allow stream cursor manipulation:

interface SeekableStream : Stream
{
    long getPosition(Anchor whence = Anchor.begin);
    void setPosition(long position, Anchor whence = Anchor.begin);
}

InputStream doesn't really has many methods:

interface InputStream
{
        // reads up to buffer.length bytes from a stream
        // returns number of bytes read
        // throws on error
        size_t read(ubyte[] buffer);

        // reads from current position
        AsyncReadRequest readAsync(ubyte[] buffer, Mailbox* mailbox = null);
}

So is OutputStream:

interface OutputStream
{
        // returns number of bytes written
        // throws on error
        size_t write(const(ubyte)[] buffer);

        // writes from current position
AsyncWriteRequest writeAsync(const(ubyte)[] buffer, Mailbox* mailbox = null);
}

They basically support only reading and writing in blocks, nothing else. However, they support asynchronous reads/writes, too (think of mailbox as a std.concurrency's Tid).

Unlike Daniel's proposal, my design reads up to buffer size bytes for two reasons:
- it avoids potential buffering and multiple sys calls
- it is the only way to go with SocketStreams. I mean, you often don't know how many bytes an incoming socket message contains. You either have to read it byte-by-byte, or your application might stall for potentially infinite time (if message was shorter than your buffer, and no more messages are being sent)

Why do my streams provide async methods? Because it's the modern approach to I/O - blocking I/O (aka one thread per client) doesn't scale. E.g. Java adds a second revision of Async I/O API in JDK7 (called NIO2, first appeared in February, 2002), C# has asynchronous operations as part of their Stream interface since .NET 1.1 (April, 2003).

With async I/O you can server many clients with one thread. Here is an example (pseude-code, usings std.concurrency):

foreach (connection; networkConnections) {
    connection.receiveMessage(getTid());
}

receiveOnly!( (NetworkMessage message) { /* do stuff */ }

This is still not the most performant solution, but it's still a lot better than one thread per client.

Async I/O not only needed for network stuff. Here is a code snippet from DMD (comments added):

#define ASYNCREAD 1
#if ASYNCREAD
    AsyncRead *aw = AsyncRead::create(modules.dim);
    for (i = 0; i < modules.dim; i++)
    {
        m = (Module *)modules.data[i];
        aw->addFile(m->srcfile);
    }
    aw->start(); // executes async request, doesn't block
#else
    // Single threaded
    for (i = 0; i < modules.dim; i++)
    {
        m = (Module *)modules.data[i];
        m->read(0); // blocks
    }
#endif

    // Do some other stuff

    for (i = 0; i < modules.dim; i++)
    {
        ...
#if ASYNCREAD
        aw->read(i); // waits until async operation finishes
#endif

Walter told that this small change gave quite a speed up in compilation time.

Also, my async methods return a reference to AsyncRequest interface that allows waiting for completion (that's what Walter does in DMD), canceling, querying a status (complete, in progress, failed), reporting an error, etc and that's very useful, too.

I strongly believe we shouldn't ignore this type of API.

P.S. For threads this deep it's better fork a new one, especially when changing the subject.

Reply via email to