CompressionAndRMI

From Protege Wiki
Revision as of 07:38, December 10, 2008 by Tredmond (talk | contribs)

Jump to: navigation, search

Compression is tricky - I thought we are done but we are still having intermittent problems with the implementation below. It occasionally hangs - looks like even though the writer has written and flushed a zipentry the reader cannot read it. I will update or delete this page when I know more.

I am writing this page in case there are others who try to implement compression over rmi and find that it is more difficult than expected. There are several other pages that describe how to create and use an rmisocket factory. The tricky part is to develop the compressing input and output streams that can be used by a socket with rmi. The main difficulties are

  1. the client hangs when trying to communicate with the server. This is what happens if you simply try to use a GZIPInputStream and a GZIPOutputStream. The flush operation on the GZIPOutputStream is not sufficient to have the desired effect on the client. RMI expects that when it flushes the output stream on the server then all the data flushed will appear on the client. Instead in this case the client is still waiting for some the data. The server does not further help the client here because the server is waiting for the next request from the client. I believe that the issue is that the GZIPOutputStream is in the middle of a compress operation and is not ready to flush everything down the wire.
  2. A common solution to the above is to modify the GZIPOutputStream so that on a flush() operation it invokes finish(). There are web pages and books that suggest this solution and perhaps it works in many contexts. This approach is indeed useful for flushing out all the data. Unfortunately what I found is that when rmi writes to this same stream after the flush, an exception is thrown. One is not supposed to finish a GZIPOutputStream and then continue writing.

So what I needed was a way to tell the compressing stream that I had a unit that could be compressed in its entirety without interfering with future writes. My solution was to make a compressing output stream based on a buffered version of a ZipOutputStream. My idea was to buffer up data to be sent and only send data over the wire when either the buffer is full or flush is called explicitly:

public class CompressingOutputStream extends OutputStream {
    private byte[] data = new byte[BUFFER_SIZE];
    int offset = 0;  // the next location in the buffer to write to
                     // also doubles as the size of the unflushed data

         ...
    @Override
    public void write(int b) throws IOException {
        ensureNotFull();
        data[offset++] = (byte) b;
        ensureNotFull();
    }
         ...

    private void ensureNotFull() throws IOException {
        if (offset >= BUFFER_SIZE) {
            flush();
        }
    }

When data is flushed for either of these reasons I create, fill and close a ZipEntry:

    @Override
    public void flush() throws IOException {
        if (offset > 0) {
            if (log.isLoggable(Level.FINER)) {
                log.finer("OutputStream: Flushing output by starting new segment " + (blockCounter + 1));
            }
            ZipEntry entry = new ZipEntry("Segment" + blockCounter++);
            if (offset < SMALL_DATA) {
                entry.setMethod(ZipEntry.STORED);
                CRC32 crc = new CRC32();
                crc.update(data, 0, offset);
                entry.setCrc(crc.getValue());
            }
            else {
                entry.setMethod(ZipEntry.DEFLATED);
            }
            entry.setSize(offset);
            compressing.putNextEntry(entry);
            compressing.write(data, 0, offset);
            compressing.closeEntry();
            compressing.flush();
            if (log.isLoggable(Level.FINER)) {
                log.finer("OutputStream: segment " + blockCounter + " written (" + offset + " bytes)");
            }
        }
        offset = 0;
        os.flush();
    }

Once the strategy for the output stream is devised, developing the input stream is pretty easy. The key idea is to read zip entries and then go to the next zip entry whenever the current zipentry runs out of data:

    @Override
    public int read() throws IOException {
        if (!initialize() || entry == null) {
            return  -1;
        }
        int ret = -1;
        if (compressing.available() != 0) {
            ret = compressing.read();
        }
        if (ret < 0) {
            compressing.closeEntry();
            logZipEntry(entry);
            if ((entry = compressing.getNextEntry()) != null) {
                if (log.isLoggable(Level.FINER)) {
                    log.finer("InputStream: reading new segment " + entry.getName());
                }
                ret = compressing.read();
            }
        }
        return ret;
    }
    
    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        if (!initialize() || entry == null) {
            return  -1;
        }
        int bytesRead = -1;
        if (compressing.available() != 0) {
            bytesRead = compressing.read(b, off, len);
        }
        if (bytesRead < 0) {
            compressing.closeEntry();
            logZipEntry(entry);
            if  ((entry = compressing.getNextEntry()) != null) {
                if (log.isLoggable(Level.FINER)) {
                    log.finer("InputStream: reading new segment " + entry.getName());
                }
                bytesRead = compressing.read(b, off, len);
            }
        }
        return bytesRead;
    }

I do need to initialize the input stream so that it is working a zip entry and I keep the input stream in this state until I reach the end of the stream:

    private boolean initialize() throws IOException {
        if (!initialized) {
            initialized = true;
            entry = compressing.getNextEntry();
            return entry != null;
        }
        return true;
    }


There is one other difficulty that I needed to overcome. Notice that in the above output stream code there is a decision made as to whether to store or deflate the buffer:

            if (offset < SMALL_DATA) {
                entry.setMethod(ZipEntry.STORED);
                     ...
            }
            else {
                entry.setMethod(ZipEntry.DEFLATED);
            }

I think that this is a good idea in any case because I suspect that if the data is too small then compression is not really worthwhile. In fact for some of the very small buffers compression appeared to actually increase the size of the data sent over the wire. When the code has small data buffer check, the compression code works great and is in use now.

However my first prototype did not have this check. What would happen in that case is that the client would occasionally hang because these really small compressed zipentries did not appear to get flushed over the wire. This hang took a while to appear - much of the initialization of my client would complete before hanging. At this point I gave up but decided to add the small data is stored logic in case I ever came back to the problem. To my surprise this one change made everything work and is now the implementation that I am using.

The full implementation can be found here. In particular the CompressingInputStream.java and CompressingOutputStream.java are the key to the implementation. This implementation would certainly need to be adapted for another application but I suspect that this ought to be relatively easy.