Difference between revisions of "CompressionAndRMI"

From Protege Wiki
Jump to: navigation, search
Line 2: Line 2:
 
# the client hangs when trying to communicate with the server. This is what happens  if you simply try to use a [http://java.sun.com/j2se/1.5.0/docs/api/java/util/zip/GZIPInputStream.html GZIPInputStream]  and a [http://java.sun.com/j2se/1.5.0/docs/api/java/util/zip/GZIPInputStream.html GZIPOutputStream].  The flush operation on the GZIPOutputStream is not sufficient to have the desired effect on the client.  RMI expects that when  it flushes the output stream on the server then all the data flushed will appear  on the client.  Instead in this case the client is still waiting for some  the data.  The server does not further help the client here because the server is waiting for the next request from the client.  I believe that  the issue is that the GZIPOutputStream is in the middle of a compress operation and is not ready to flush everything down the wire.
 
# the client hangs when trying to communicate with the server. This is what happens  if you simply try to use a [http://java.sun.com/j2se/1.5.0/docs/api/java/util/zip/GZIPInputStream.html GZIPInputStream]  and a [http://java.sun.com/j2se/1.5.0/docs/api/java/util/zip/GZIPInputStream.html GZIPOutputStream].  The flush operation on the GZIPOutputStream is not sufficient to have the desired effect on the client.  RMI expects that when  it flushes the output stream on the server then all the data flushed will appear  on the client.  Instead in this case the client is still waiting for some  the data.  The server does not further help the client here because the server is waiting for the next request from the client.  I believe that  the issue is that the GZIPOutputStream is in the middle of a compress operation and is not ready to flush everything down the wire.
 
# A common solution to the above is to modify the GZIPOutputStream so that on a flush() operation it invokes finish().  There are web pages and books that suggest  this soltion and perhaps it works in many contexts.  This approach is indeed useful for flushing out all the data.  Unfortunately what I found is that when rmi writes to this same stream after the flush, an exception is thrown. One is not supposed to finish a GZIPOutputStream and then continue writing.
 
# A common solution to the above is to modify the GZIPOutputStream so that on a flush() operation it invokes finish().  There are web pages and books that suggest  this soltion and perhaps it works in many contexts.  This approach is indeed useful for flushing out all the data.  Unfortunately what I found is that when rmi writes to this same stream after the flush, an exception is thrown. One is not supposed to finish a GZIPOutputStream and then continue writing.
 +
# RMI uses a BufferedOutputStream that limits the data on the wire to 8192 bytes at a time.  It is argued [http://forums.sun.com/thread.jspa?threadID=5167582 here] that this makes compression hardly worth while.  But- I did this purely by accident - the compressing output stream below is also buffered and as part of a socket sits closer to the wire than RMI's buffered output stream.  This means that in terms of buffer size my buffered output stream wins - it sets the real buffer size.  And I know from measurements that I am getting almost a 10 to 1 compression ratio.  I have not yet done sufficient experiments to determine what the optimal buffer size would be.  I am currently using 64K buffers.
  
 
So what I needed was a way to tell the compressing stream that I had a unit that could be compressed in its entirety without interfering with future writes.  My solution was to make a compressing output stream based on a  buffered version of a [http://java.sun.com/j2se/1.5.0/docs/api/java/util/zip/ZipOutputStream.html ZipOutputStream].  My idea was to buffer up data to be sent and only send data over the wire  when either the buffer is  full or flush is called explicitly:
 
So what I needed was a way to tell the compressing stream that I had a unit that could be compressed in its entirety without interfering with future writes.  My solution was to make a compressing output stream based on a  buffered version of a [http://java.sun.com/j2se/1.5.0/docs/api/java/util/zip/ZipOutputStream.html ZipOutputStream].  My idea was to buffer up data to be sent and only send data over the wire  when either the buffer is  full or flush is called explicitly:

Revision as of 23:28, December 1, 2008

I am writing this page in case there are others who try to implement compression over rmi and find that it is more difficult than expected. There are several other pages that describe how to create and use an rmisocket factory. The tricky part is to develop the compressing input and output streams that can be used by a socket with rmi. The main difficulties are

  1. the client hangs when trying to communicate with the server. This is what happens if you simply try to use a GZIPInputStream and a GZIPOutputStream. The flush operation on the GZIPOutputStream is not sufficient to have the desired effect on the client. RMI expects that when it flushes the output stream on the server then all the data flushed will appear on the client. Instead in this case the client is still waiting for some the data. The server does not further help the client here because the server is waiting for the next request from the client. I believe that the issue is that the GZIPOutputStream is in the middle of a compress operation and is not ready to flush everything down the wire.
  2. A common solution to the above is to modify the GZIPOutputStream so that on a flush() operation it invokes finish(). There are web pages and books that suggest this soltion and perhaps it works in many contexts. This approach is indeed useful for flushing out all the data. Unfortunately what I found is that when rmi writes to this same stream after the flush, an exception is thrown. One is not supposed to finish a GZIPOutputStream and then continue writing.
  3. RMI uses a BufferedOutputStream that limits the data on the wire to 8192 bytes at a time. It is argued here that this makes compression hardly worth while. But- I did this purely by accident - the compressing output stream below is also buffered and as part of a socket sits closer to the wire than RMI's buffered output stream. This means that in terms of buffer size my buffered output stream wins - it sets the real buffer size. And I know from measurements that I am getting almost a 10 to 1 compression ratio. I have not yet done sufficient experiments to determine what the optimal buffer size would be. I am currently using 64K buffers.

So what I needed was a way to tell the compressing stream that I had a unit that could be compressed in its entirety without interfering with future writes. My solution was to make a compressing output stream based on a buffered version of a ZipOutputStream. My idea was to buffer up data to be sent and only send data over the wire when either the buffer is full or flush is called explicitly:

public class CompressingOutputStream extends OutputStream {
    private byte[] data = new byte[BUFFER_SIZE];
    int offset = 0;  // the next location in the buffer to write to
                     // also doubles as the size of the unflushed data

         ...
    @Override
    public void write(int b) throws IOException {
        ensureNotFull();
        data[offset++] = (byte) b;
        ensureNotFull();
    }
         ...

    private void ensureNotFull() throws IOException {
        if (offset >= BUFFER_SIZE) {
            flush();
        }
    }

When data is flushed for either of these reasons I create, fill and close a ZipEntry:

    @Override
    public void flush() throws IOException {
        if (offset > 0) {
            if (log.isLoggable(Level.FINER)) {
                log.finer("OutputStream: Flushing output by starting new segment " + (blockCounter + 1));
            }
            ZipEntry entry = new ZipEntry("Segment" + blockCounter++);
            if (offset < SMALL_DATA) {
                entry.setMethod(ZipEntry.STORED);
                CRC32 crc = new CRC32();
                crc.update(data, 0, offset);
                entry.setCrc(crc.getValue());
            }
            else {
                entry.setMethod(ZipEntry.DEFLATED);
            }
            entry.setSize(offset);
            compressing.putNextEntry(entry);
            compressing.write(data, 0, offset);
            compressing.closeEntry();
            compressing.flush();
            if (log.isLoggable(Level.FINER)) {
                log.finer("OutputStream: segment " + blockCounter + " written (" + offset + " bytes)");
            }
        }
        offset = 0;
        os.flush();
    }

Once the strategy for the output stream is devised, developing the input stream is pretty easy. The key idea is to read zip entries and then go to the next zip entry whenever the current zipentry runs out of data:

    @Override
    public int read() throws IOException {
        if (!initialize() || entry == null) {
            return  -1;
        }
        int ret = -1;
        if (compressing.available() != 0) {
            ret = compressing.read();
        }
        if (ret < 0) {
            compressing.closeEntry();
            logZipEntry(entry);
            if ((entry = compressing.getNextEntry()) != null) {
                if (log.isLoggable(Level.FINER)) {
                    log.finer("InputStream: reading new segment " + entry.getName());
                }
                ret = compressing.read();
            }
        }
        return ret;
    }
    
    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        if (!initialize() || entry == null) {
            return  -1;
        }
        int bytesRead = -1;
        if (compressing.available() != 0) {
            bytesRead = compressing.read(b, off, len);
        }
        if (bytesRead < 0) {
            compressing.closeEntry();
            logZipEntry(entry);
            if  ((entry = compressing.getNextEntry()) != null) {
                if (log.isLoggable(Level.FINER)) {
                    log.finer("InputStream: reading new segment " + entry.getName());
                }
                bytesRead = compressing.read(b, off, len);
            }
        }
        return bytesRead;
    }

I do need to initialize the input stream so that it is working a zip entry and I keep the input stream in this state until I reach the end of the stream:

    private boolean initialize() throws IOException {
        if (!initialized) {
            initialized = true;
            entry = compressing.getNextEntry();
            return entry != null;
        }
        return true;
    }


There is one other difficulty that I needed to overcome. Notice that in the above output stream code there is a decision made as to whether to store or deflate the buffer:

            if (offset < SMALL_DATA) {
                entry.setMethod(ZipEntry.STORED);
                     ...
            }
            else {
                entry.setMethod(ZipEntry.DEFLATED);
            }

I think that this is a good idea in any case because I suspect that if the data is too small then compression is not really worthwhile. In fact for some of the very small buffers compression appeared to actually increase the size of the data sent over the wire. When the code has small data buffer check, the compression code works great and is in use now.

However my first prototype did not have this check. What would happen in that case is that the client would occasionally hang because these really small compressed zipentries did not appear to get flushed over the wire. This hang took a while to appear - much of the initialization of my client would complete before hanging. At this point I gave up but decided to add the small data is stored logic in case I ever came back to the problem. To my surprise this one change made everything work and is now the implementation that I am using.

The full implementation can be found here. In particular the CompressingInputStream.java and CompressingOutputStream.java are the key to the implementation. This implementation would certainly need to be adapted for another application but I suspect that this ought to be relatively easy.