I am trying to signal between two threads using the below FutureResult class which extends FutureTask class. When run the script, it prints the following result.
SENDING: 0
SENT: 0
POLL: FutureResult@513431
SIGNALLED: FutureResult@513431
Then the program hang up forever. I expect FutureResult instance should return the value from it's blocking get method. Then print the result in the console. But FutureResult.get is blocking forever.
import java.util.concurrent.*;
/**
* Created by someone on 20/08/2015.
*/
final public class FutureResult<T> extends FutureTask<T> {
private static final Object SS = "SS";
public FutureResult() {
super(() -> null);
}
public void signal(final T value) {
set(value);
}
public void signalError(final Throwable throwable) {
setException(throwable);
}
public static void main(String... args) throws Exception {
final ArrayBlockingQueue<FutureResult> queue = new ArrayBlockingQueue<>(1000000);
new Thread(() -> {
while (true) {
try {
final FutureResult poll = queue.take();
System.out.println("POLL: " + poll);
if (poll != null) {
poll.signal(SS);
System.out.println("SIGNALLED: " + poll);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
for (int i = 0; i < 1; i++) {
final FutureResult<Object> result = new FutureResult<>();
System.out.println("SENDING: " + i);
queue.offer(new FutureResult());
try {
System.out.println("SENT: " + i);
result.get();
System.out.println("GOT : " + i);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}).start();
}
}
This is the problem:
queue.offer(new FutureResult());
You're setting the value on one FutureResult
, but that's not the one you're waiting for. Just change that line to:
queue.offer(result);
and it works fine.
See more on this question at Stackoverflow