java.util.concurrent学习(四) FutureTask
java.util.concurrent学习(四) FutureTask
java.util.concurrent学习(四) FutureTaskFutureTask是一个实现了RunnableFuture的任务类,而RunnableFuture又同时继承了Runable,Future接口,那么这就说明,FutureTask同时具备了Runable,Future的属性。
FutureTask内部维护了Callable<V> callable 需要执行的任务,Object outcome 返回的结果或者抛出的异常,volatile Thread runner 一个内存可见的线程用来执行callable ,volatile WaitNode waiters 一个内存可见的正在等待执行的线程的Treiber stack (表示不会翻译),volatile int state 内存可见的任务执行状态。FutureTask有七种状态,通过UNSAFE的cas操作来改变状态,这七种状态在类内部用了七个常量来定义,分别是:
private static final int NEW = 0; 新建 private static final int COMPLETING = 1; 正在完成 private static final int NORMAL = 2; 已完成 private static final int EXCEPTIONAL = 3; 异常 private static final int CANCELLED = 4; 已取消 private static final int INTERRUPTING = 5; 正在打断 private static final int INTERRUPTED = 6; 已打断
NEW
从构造方法中可以看到,FutureTask通过Executors.callable(runnable, result)的方法实现了可以构造Callable,Runable两种任务的构造方法。构造方法调用以后,将FutureTask状态置为NEW,表示一个任务已建立,等待执行。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
COMPLETING
通过执行run方法来进入该状态。
public void run() { //不处于不处于new状态 或者 这个任务无法改为当前线程 则不执行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //执行任务 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; //任务异常 setException(ex); } if (ran) //结束任务 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; 如果处于正要打断或已打断的状态 则让出cpu if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
NORMAL
protected void set(V v) {
//将任务从NEW更新为COMPLETING状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
//改为NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//完成
finishCompletion();
}
}
EXCEPTIONAL
protected void setException(Throwable t) {
//任务从NEW更新为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
//改为EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//完成
finishCompletion();
}
}
CANCELLED / INTERRUPTING / INTERRUPTED
public boolean cancel(boolean mayInterruptIfRunning) {
//如果状态处于NEW 则根据mayInterruptIfRunning来决定原子更新为INTERRUPTING或者CANCELLED
//如果状态不为NEW,或者无法从NEW转为INTERRUPTING或CANCELLED,则返回false。
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// in case call to interrupt throws exception
//如果mayInterruptIfRunning为true,则正在执行的线程也会打断
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
// final state
//最后更新为INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
从这几个状态更新的方法我们可以看出FutureTask可能有的生命周期:
runAndReset();该方法会执行任务但不会更新任务状态并返回结果,除非发生异常,如果任务执行并且处于NEW的状态则返回ture。
awaitDone(boolean timed, long nanos);限时等待任务执行完成。
get();通过report(s)获取返回值,从上面的run方法可以看出,该方法获取到的可能是异常。
get(long timeout, TimeUnit unit); 限时获取返回值。通过awaitDone(boolean timed, long nanos)来限时。