Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.
publicstaticvoidmain(String[] args)throws InterruptedException { Thread t = new Thread(() -> { long start = System.currentTimeMillis(); long end = 0; int count = 0;
publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue){ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // 线程池的工作 通过 Worker 类,Worker 类继承了 AQS (AbstractQueuedSynchronizer) final Thread t = w.thread; // 线程是取的 worker中的线程,而worker中的线程是线程池初始化的 线程工厂创建的 if (t != null) { final ReentrantLock mainLock = this.mainLock; // ReentrantLock 锁的保证下,插入到 workers(HashSet结构)中 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable thrownew IllegalThreadStateException(); workers.add(w); // 加入 hashSet 中 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 添加成功 开始 运行 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;
public V get()throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 状态检查 s = awaitDone(false, 0L); return report(s); }
publicvoidrun(){ if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
run 方法是线程池中的线程来执行的,而非主线程
执行callable.call方法来运行任务
call通过时用set方法来保存结果
call出现异常时用setException方法来保持异常信息
set/setException
1 2 3 4 5 6 7 8 9 10 11 12 13 14
protectedvoidset(V v){ if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } protectedvoidsetException(Throwable t){ if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }