Why does this non blocking IO call fail?

Background

  • I'd like to send a large (30MB, but could be much larger in the future) amount of data using Java's non-blocking SocketChannel
    • Why non-blocking? So that the computation of the next bytes to send isn't blocked waiting for the network
  • When I use the SocketChannel in blocking mode, the transfer completes without a problem
  • When I set the SocketChannel to non-blocking, it completes significantly faster, but the server doesn't receive all of the data
    • The server does receive some of the data, though

Question

  • Why is my large (30MB) file transfer failing when using a non-blocking Java NIO SocketChannel, and how do I fix it?

Files

  • I gutted the program and wrote the example so it so that it will all run in one go via javac *.java && java Main

    • It creates a Future for the server, a Future for the client, has the client send 30MB of random bytes to the server, and then blocks on the main thread until both Futures complete (though the server never does)
  • Note: This is primarily about TheClient.java

    • If the line between the comments <CommentOutToMakeWork> and </CommentOutToMakeWork> is commented out, the SocketChannel will be blocking and the transfer will complete

Main.java:

import java.io.IOException;
import java.lang.InterruptedException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
  public static void main(String[] args) throws ExecutionException, IOException, InterruptedException {
    final SocketAddress address = new InetSocketAddress("127.0.0.1", 12345);
    final int size              = 30 * 1000 * 1000;

    ExecutorService executor = Executors.newFixedThreadPool(2);
    TheServer theServer      = new TheServer(address, size);
    TheClient theClient      = new TheClient(address, size);

    Future<String> serverFuture = executor.submit(theServer);
    Thread.sleep(2000);
    Future<String> clientFuture = executor.submit(theClient);

    System.out.println("MAIN: Received from client: " + clientFuture.get());
    System.out.println("MAIN: Received from server: " + serverFuture.get());
    executor.shutdown();
  }
}

TheClient.java:

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Random;
import java.util.concurrent.Callable;

class TheClient implements Callable<String> {
  private TheClient() {}
  public TheClient(SocketAddress address, int size) {
    this.size = size;
    this.from = new byte[size];
    this.serverAddress = address;
    new Random().nextBytes(from);
  }

  private int           size;
  private byte[]        from;
  private SocketAddress serverAddress;

  public String call() throws IOException {
    SocketChannel socketChannel = SocketChannel.open();
    System.out.println("CLIENT: Attempting to connect to server...");
    socketChannel.connect(serverAddress);
    // <CommentOutToMakeWork>
    socketChannel.configureBlocking(false);
    // </CommentOutToMakeWork>
    System.out.println("CLIENT: Connection established. Sending " + size + " bytes.");

    // For this example, this is one large write, but even my actual
    // program, which uses a loop and puts smaller chunks onto the channel,
    // is too fast for the SocketChannel.

    socketChannel.write(ByteBuffer.wrap(from));

    System.out.println("CLIENT: Write completed.");
    return "CLIENT: Success!";
  }
}

TheServer.java:

import java.io.IOException;
import java.io.InputStream;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.Random;
import java.util.concurrent.Callable;

class TheServer implements Callable<String> {
  private TheServer() {}
  public TheServer(SocketAddress address, int size) {
    this.size = size;
    this.to = new byte[size];
    this.serverAddress = address;
  }
  private int           size;
  private byte[]        to;
  private SocketAddress serverAddress;

  public String call() throws IOException {
    ServerSocketChannel serverChannel = ServerSocketChannel.open().bind(serverAddress);
    System.out.println("SERVER: Awaiting connection...");
    InputStream clientSocketInputStream = serverChannel.accept().socket().getInputStream();
    System.out.println("SERVER: Connection established. Attempting to read " + size + " bytes.");
    for (int i = 0; i < size; ++i) {
      to[i] = (byte) clientSocketInputStream.read();
    }
    System.out.println("SERVER: Read completed.");
    return "SERVER: Success!";
  }
}
Jon Skeet
people
quotationmark

I believe the answer lies in the WritableByteChannel.write documentation:

Unless otherwise specified, a write operation will return only after writing all of the r requested bytes. Some types of channels, depending upon their state, may write only some of the bytes or possibly none at all. A socket channel in non-blocking mode, for example, cannot write any more bytes than are free in the socket's output buffer.

So it looks like you need to use the return value of write to find out how much has been written, and handle the case when it's not all been written. What isn't clear from the description is how you handle that case - you may find you need to do some scheduling to continue writing when the socket output buffer has drained, for example.

people

See more on this question at Stackoverflow