I have a question on this class. Why not use ConcurrentHashMap for this instead of synchronizing? Or CopyOnWriteMap?
On Sat, Jul 19, 2008 at 7:19 PM, <[EMAIL PROTECTED]> wrote: > Author: edeoliveira > Date: Sat Jul 19 16:19:14 2008 > New Revision: 678238 > > URL: http://svn.apache.org/viewvc?rev=678238&view=rev > Log: > New IoFilter that implements DIRMINA-519 > > Added: > mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/ > > > mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java > (with props) > > Added: > mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java > URL: > http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java?rev=678238&view=auto > > ============================================================================== > --- > mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java > (added) > +++ > mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java > Sat Jul 19 16:19:14 2008 > @@ -0,0 +1,243 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + * > + */ > +package org.apache.mina.filter.buffer; > + > +import java.io.BufferedOutputStream; > +import java.util.HashMap; > +import java.util.Map; > + > +import org.apache.mina.core.buffer.IoBuffer; > +import org.apache.mina.core.filterchain.IoFilter; > +import org.apache.mina.core.filterchain.IoFilterAdapter; > +import org.apache.mina.core.session.IoSession; > +import org.apache.mina.core.write.DefaultWriteRequest; > +import org.apache.mina.core.write.WriteRequest; > +import org.apache.mina.filter.codec.ProtocolCodecFilter; > + > +/** > + * An [EMAIL PROTECTED] IoFilter} implementation used to buffer outgoing > [EMAIL PROTECTED] > WriteRequest} almost > + * like what [EMAIL PROTECTED] BufferedOutputStream} does. Using this filter > allows > to be less dependent > + * from network latency. It is also useful when a session is generating > very small messages > + * too frequently and consequently generating unnecessary traffic > overhead. > + * > + * Please note that it should always be placed before the [EMAIL PROTECTED] > ProtocolCodecFilter} > + * as it only handles [EMAIL PROTECTED] WriteRequest}'s carrying [EMAIL > PROTECTED] IoBuffer} > objects. > + * > + * @author The Apache MINA Project (dev@mina.apache.org) > + * @version $Rev: $, $Date: $ > + * @since MINA 2.0.0-M2 > + */ > +public final class BufferedWriteFilter extends IoFilterAdapter { > + > + /** > + * Default buffer size value in bytes. > + */ > + public final static int DEFAULT_BUFFER_SIZE = 8192; > + > + /** > + * The buffer size allocated for each new session's buffer. > + */ > + private int bufferSize = DEFAULT_BUFFER_SIZE; > + > + /** > + * The map that matches an [EMAIL PROTECTED] IoSession} and it's [EMAIL > PROTECTED] IoBuffer} > + * buffer. > + */ > + protected Map<IoSession, IoBuffer> buffersMap = new HashMap<IoSession, > IoBuffer>(); > + > + /** > + * Default constructor. Sets buffer size to [EMAIL PROTECTED] > #DEFAULT_BUFFER_SIZE} > + * bytes. > + */ > + public BufferedWriteFilter() { > + this(DEFAULT_BUFFER_SIZE); > + } > + > + /** > + * Constructor which sets buffer size to <code>bufferSize</code>. > + * > + * @param bufferSize the new buffer size > + */ > + public BufferedWriteFilter(int bufferSize) { > + super(); > + this.bufferSize = bufferSize; > + } > + > + /** > + * Returns buffer size. > + */ > + public int getBufferSize() { > + return bufferSize; > + } > + > + /** > + * Sets the buffer size but only for the newly created buffers. > + * > + * @param bufferSize the new buffer size > + */ > + public void setBufferSize(int bufferSize) { > + this.bufferSize = bufferSize; > + } > + > + /** > + * [EMAIL PROTECTED] > + * > + * @throws Exception if <code>writeRequest.message</code> isn't an > + * [EMAIL PROTECTED] IoBuffer} instance. > + */ > + @Override > + public void filterWrite(NextFilter nextFilter, IoSession session, > + WriteRequest writeRequest) throws Exception { > + > + Object data = writeRequest.getMessage(); > + > + if (data instanceof IoBuffer) { > + write(session, (IoBuffer) data); > + } else { > + throw new IllegalArgumentException( > + "This filter should only buffer IoBuffer objects"); > + } > + } > + > + /** > + * Writes an [EMAIL PROTECTED] IoBuffer} to the session's buffer. > + * > + * @param session the session to which a write is requested > + * @param data the data to buffer > + */ > + private void write(IoSession session, IoBuffer data) { > + IoBuffer dest = null; > + synchronized (buffersMap) { > + dest = buffersMap.get(session); > + if (dest == null) { > + // Enforce the creation of a non-expandable buffer > + dest = IoBuffer.allocate(bufferSize).setAutoExpand(false); > + buffersMap.put(session, dest); > + } > + } > + > + write(session, data, dest); > + } > + > + /** > + * Writes <code>data</code> [EMAIL PROTECTED] IoBuffer} to the > <code>buf</code> > + * [EMAIL PROTECTED] IoBuffer} which buffers write requests for the > + * <code>session</code> {@ link IoSession} until buffer is full > + * or manually flushed. > + * > + * @param session the session where buffer will be written > + * @param data the data to buffer > + * @param buf the buffer where data will be temporarily written > + */ > + private void write(IoSession session, IoBuffer data, IoBuffer buf) { > + synchronized (buf) { > + try { > + int len = data.remaining(); > + if (len >= buf.capacity()) { > + /* > + * If the request length exceeds the size of the > output buffer, > + * flush the output buffer and then write the data > directly. > + */ > + NextFilter nextFilter = session.getFilterChain() > + .getNextFilter(this); > + internalFlush(nextFilter, session, buf); > + nextFilter.filterWrite(session, > + new DefaultWriteRequest(buf)); > + return; > + } > + if (len > (buf.limit() - buf.position())) { > + > internalFlush(session.getFilterChain().getNextFilter(this), > + session, buf); > + } > + buf.put(data); > + } catch (Throwable e) { > + session.getFilterChain().fireExceptionCaught(e); > + } > + } > + } > + > + /** > + * Internal method that actually flushes the buffered data. > + * > + * @param nextFilter the [EMAIL PROTECTED] NextFilter} of this filter > + * @param session the session where buffer will be written > + * @param data the data to write > + * @throws Exception if a write operation fails > + */ > + private void internalFlush(NextFilter nextFilter, IoSession session, > + IoBuffer data) throws Exception { > + if (data != null) { > + nextFilter.filterWrite(session, new > DefaultWriteRequest(data)); > + data.clear(); > + } > + } > + > + /** > + * Flushes the buffered data. > + * > + * @param session the session where buffer will be written > + */ > + public void flush(IoSession session) { > + try { > + IoBuffer data = null; > + synchronized (session) { > + data = buffersMap.get(session); > + } > + internalFlush(session.getFilterChain().getNextFilter(this), > + session, data); > + > + } catch (Throwable e) { > + session.getFilterChain().fireExceptionCaught(e); > + } > + } > + > + /** > + * Internal method that actually frees the [EMAIL PROTECTED] IoBuffer} > that > contains > + * the buffered data that has not been flushed. > + * > + * @param session the session we operate on > + */ > + private void clear(IoSession session) { > + synchronized (session) { > + IoBuffer buf = buffersMap.remove(session); > + if (buf != null) { > + buf.free(); > + } > + } > + } > + > + /** > + * [EMAIL PROTECTED] > + */ > + @Override > + public void exceptionCaught(NextFilter nextFilter, IoSession session, > + Throwable cause) throws Exception { > + clear(session); > + } > + > + /** > + * [EMAIL PROTECTED] > + */ > + @Override > + public void sessionClosed(NextFilter nextFilter, IoSession session) > + throws Exception { > + clear(session); > + } > +} > \ No newline at end of file > > Propchange: > mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java > > ------------------------------------------------------------------------------ > svn:eol-style = native > > Propchange: > mina/trunk/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java > > ------------------------------------------------------------------------------ > svn:mime-type = text/plain > > >