/** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return <tt>true</tt> (as specified by {@link Collection#add}) */ publicbooleanadd(E e) { ensureCapacityInternal(size + 1); // Increments modCount!! elementData[size++] = e; returntrue; }
privatevoidgrow(int minCapacity) { // overflow-conscious code intoldCapacity= elementData.length; intnewCapacity= oldCapacity + (oldCapacity >> 1); if (newCapacity - minCapacity < 0) newCapacity = minCapacity; if (newCapacity - MAX_ARRAY_SIZE > 0) newCapacity = hugeCapacity(minCapacity); // minCapacity is usually close to size, so this is a win: elementData = Arrays.copyOf(elementData, newCapacity); }
publicbooleanremove(Object o) { if (o == null) { for (intindex=0; index < size; index++) if (elementData[index] == null) { fastRemove(index); returntrue; } } else { for (intindex=0; index < size; index++) if (o.equals(elementData[index])) { fastRemove(index); returntrue; } } returnfalse; }
remove方法实际上就是遍历数组所有的元素,然后找到数组下标,再根据数组下标进行删除
1 2 3 4 5 6 7 8 9 10 11 12
/* * Private remove method that skips bounds checking and does not * return the value removed. */ privatevoidfastRemove(int index) { modCount++; intnumMoved= size - index - 1; if (numMoved > 0) System.arraycopy(elementData, index+1, elementData, index, numMoved); elementData[--size] = null; // clear to let GC do its work }
@SuppressWarnings("unchecked") public E next() { checkForComodification(); inti= cursor; if (i >= size) thrownewNoSuchElementException(); Object[] elementData = ArrayList.this.elementData; if (i >= elementData.length) thrownewConcurrentModificationException(); cursor = i + 1; return (E) elementData[lastRet = i]; }
// modCount是ArrayList中的属性值,是集合添加元素、删除元素的次数,expectedModCount是迭代器中的属性值,是预期的修改次数。实际修改值与期望值不同 finalvoidcheckForComodification() { if (modCount != expectedModCount) thrownewConcurrentModificationException(); }
获取锁:如果每次申请锁的线程都是不相同的,则锁会升级为轻量级锁,指向栈中锁记录的指针。轻量级锁适用于线程交替执行同步块的场景。 释放锁:通过CAS操作,尝试把线程中复制的Displaced Mark Word对象替换当前的Mark Word,如果成功则完成解锁操作。如果失败则表明有其他线程获取该锁,此时锁膨胀为重量级锁。释放锁的同时,唤醒被挂起的线程。
重量级
当多个线程同时竞争锁,则轻量级锁会膨胀为重量级锁。指向互斥量的指针。未获取到锁的线程会阻塞。
自旋锁
循环检测锁标志位
可中断锁
Lock
1 2 3 4 5 6 7 8
// 支持中断的 API voidlockInterruptibly() throws InterruptedException; // 支持超时的 API booleantryLock(long time, TimeUnit unit) throws InterruptedException; // 支持非阻塞获取锁的 API booleantryLock();
[session_server] listen_address = "0.0.0.0:8093"# listen on all available interfaces on port 8093 advertise_address = "runner-host-name.tld:8093" session_timeout = 1800
Thread state for a thread which has not yet started.
还没有开始的状态,就是new
RUNNABLE
Thread state for a runnable thread. A thread in the runnable state is executing in the Java virtual achine but it may be waiting for other resources from the operating system such as processor.
正在jvm虚拟机中运行的状态,但是可能还需要等待系统资源,类似于cpu这种资源。
BLOCKED
Thread state for a thread blocked waiting for a monitor lock. A thread in the blocked state is waiting for a monitor lock to enter a synchronized block/method or reenter a synchronized block/method after calling
Thread state for a waiting thread with a specified waiting time. A thread is in the timed waiting state due to calling one of the following methods with a specified positive waiting time.
TERMINATED
Thread state for a terminated thread.The thread has completed execution.
int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler
publicvoidexecute(Runnable command) { if (command == null) thrownewNullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ intc= ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { intrecheck= ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); elseif (workerCountOf(recheck) == 0) addWorker(null, false); } elseif (!addWorker(command, false)) reject(command); }
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) returnfalse;
for (;;) { intwc= workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
booleanworkerStarted=false; booleanworkerAdded=false; Workerw=null; try { w = newWorker(firstTask); finalThreadt= w.thread; if (t != null) { finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. intrs= runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable thrownewIllegalThreadStateException(); workers.add(w); ints= workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
rs为什么可以表示runstate?
1
privatestaticintrunStateOf(int c) { return c & ~CAPACITY; }
publicLinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); finalReentrantLockputLock=this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { intn=0; for (E e : c) { if (e == null) thrownewNullPointerException(); if (n == capacity) thrownewIllegalStateException("Queue full"); enqueue(newNode<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
内部调用的this和enqueue方法
1 2 3 4 5 6
publicLinkedBlockingQueue(int capacity) { if (capacity <= 0) thrownewIllegalArgumentException(); this.capacity = capacity; // last和head节点都是null last = head = newNode<E>(null); }
/** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next;
publicvoidput(E e)throws InterruptedException { if (e == null) thrownewNullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. intc= -1; Node<E> node = newNode<E>(e); // 拿到写锁 finalReentrantLockputLock=this.putLock; finalAtomicIntegercount=this.count; // 对写操作加锁 putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ // 如果当前容量已经满了,则阻塞并挂起当前线程 while (count.get() == capacity) { notFull.await(); } // 入队操作 enqueue(node); // 元素+1 c = count.getAndIncrement(); if (c + 1 < capacity) // 如果容量还没满,在放锁的条件对象notFull唤醒正在等待的线程 notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
/** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
/** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
publicstaticclassDiscardOldestPolicyimplementsRejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ publicDiscardOldestPolicy() { }
/** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }