Thursday, January 1, 2015

Piped Streams in Java : Reduced In-Memory Cost and Gained Performance

Recently My Senior Architect actually suggested me this technique of Piping the Input and Output Streams. It is actually a very cool feature that you can leverage in many scenarios of File (Stream) Reading and Writing. 

 Here was my scenario where this became applicable. We had to transport substantially large files (In MB terms) across the CXF Web Services to the frontend. However, the client had a security requirement where we actually had NO freedom to physically write this as a file on a File Server location and use a File Downloading technique from the client end. Since I guess these data that we touch are client confidential or for some strange reason.


 Now, how we survive with this? Without creating the actual file, we need to the pass the content, as if it is an Input Stream. Now think of the second challenge. The application users can go up to 100,000 per day. Now we need to be smart about how we handle this. Best solution is where you have an asynchronous technique of data transportation using streams.

So, I was introduced to Java Piped Streams, which actually became to picture with Java 7. Our solution became pretty simple. We actually fed the data as an Output Stream and using the piping mechanism, we connected it to the Input Stream and transport it via the CXF response.

However, I am going to cut to the chase and look in towards the Streaming part, rather than CXF web service, because it is just another use case.

Here I will walk through few code snippets which will depict a nicer way of making the best of Piped Streams.
There are two ways that we can pipe these streams.


Piped Stream : 2 ways of Tunneling Data Stream via In-Memory Pipes

       /**
         * Piping the output stream to the input stream.
         */
        final PipedInputStream inputStream = new PipedInputStream(1024);
        final PipedOutputStream pipedOutputStream = new PipedOutputStream(inputStream);

        /**
         * Piping the input stream to the output stream.
         */
        final PipedOutputStream outputStream = new PipedOutputStream();
        final PipedInputStream pipedInputStream = new PipedInputStream(outputStream, 1024);


IMPORTANT: For Piped Streams to work, they must operate asynchronously. Meaning, they need to be working in two different threads. We will get to that part soon.


So, here is my simple code example on piping the Output Stream to the Input Stream. I always like to keep the practice of coding to interface. Here is my generic writer. This way you can write many different writers and configure them as required.

 
 /**
 * <p>
 * This interface defines a writer based on output stream passed on from the invoker.
 * </p>
 */
public interface IPipedWriter {

    /**
     * Initialize method which takes the outputstream and setup the writing mechanism.
     *
     * @param outputStream : Input parameter.
     */
    void initializeWriter(OutputStream outputStream);

    /**
     * <p>
     * This method can be called multiple times within the initializeWriter() and the finalizeWriter method.
     * </p>
     * It helps us to write many times to same streams without opening and closing connections which is costly.
     *
     * @param dataChunks : chunks of data that can be written in set pieces, in an iterative manner.
     */
    void writeDataChunks(Map<String, Object> dataChunks);

    /**
     * Finalizing method.
     */
    void finalizeWriter();
}

Please read the API comments of each method, which will help you understand this better. 

The following is a simple implementation of the writer.
 
     /**
     * Sample writer implementation.
     */
    static final IPipedWriter writer = new IPipedWriter() {

        private List data = new ArrayList();

        private OutputStream outputStream;

        @Override
        public void initializeWriter(OutputStream outputStreamParam) {
            outputStream = outputStreamParam;
        }

        @Override
        public void writeDataChunks(Map dataChunks) {
            // Data is stored to the key "data".
            String dataItem = (String) dataChunks.get("data");
            try {
                outputStream.write(dataItem.getBytes());
            } catch (IOException e) {
                // Need to handle the exception in an application specific meaningful manner.
                e.printStackTrace();
            }
        }


        @Override
        public void finalizeWriter() {
            try {
                outputStream.close();
            } catch (IOException e) {
                // Need to handle the exception in an application specific meaningful manner.
                e.printStackTrace();
            }
        }
    };

Now lets see how we can get this writer to connect to an input stream.

 
       public static void main(String[] args) throws IOException {
        System.out.println("Piping Streams Example");

        /**
         * Piping the output stream to the input stream.
         */
        final PipedInputStream inputStream = new PipedInputStream(1024);
        final PipedOutputStream pipedOutputStream = new PipedOutputStream(inputStream);

        // Creating a list of data. This can be data retrieval logic in a actual use case.
        final ArrayList data = new ArrayList(100000);

        for (int i = 0; i < 100000; i++) {
            data.add("Test Data : " + i + "\n");
        }
        
        System.out.println("Starting the Writer Thread.");

        /**
         * Creating a new thread for the writer to operate.
         * NOTE: This is to avoid blocking between the reader and the writer.
         */
        new Thread(
                new Runnable() {
                    @Override
                    public void run() {

                        writer.initializeWriter(pipedOutputStream);

                        for (String item : data){
                            Map dataChunk = new HashMap();
                            dataChunk.put("data", item);
                            writer.writeDataChunks(dataChunk);
                        }

                        writer.finalizeWriter();
                    }
                }
        ).start();


        // Reader will be occupying the MAIN thread.
        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
        StringBuilder out = new StringBuilder();
        String line;

        while ((line = reader.readLine()) != null) {
            System.out.println(line);
        }

    }

Expected Output:
Test Data :0
Test Data :1
..

Test Data :99999

Hope I presented a compelling case for you to try Piped streams. Its actually quite nice capability Core Java IO provides without being dependent on any third party libraries.

Cheers
Chinthaka

My Stack Overflaw Flair