java多线程
1、线程与进程
进程:运行中的程序称为进程 (每一个软件启动都对应一个进程),每个进程都会有一个独立的地址空间,进程的切换会有较大的开销。
线程:进程中的一条执行流,一个进程中至少有一个线程。线程共享进程的空间,线程可以独占进程的空间,这时候需要加锁,其他线程不能访问,直到线程主动释放锁。
上下文:启动或停止线程时,需要读取或记录进度信息,这个进度信息即为上下文。
上下文切换:读取或者记录进度信息的过程即为上下文切换。
串行:单个CPU处理多个逻辑,需等待当前逻辑处理完毕才可进行下一个逻辑。
并行:多个CPU同时处理多段逻辑。
并发:单个CPU处理多个逻辑,但不会处理完当前逻辑,而是处理当前逻辑一段时间后,切换至另一个逻辑,如此往复直到所有逻辑处理完成。
多线程的应用程序,程序可以并行执行多个任务(操作),相对单线程应用程序,效率要高很多。
注意:很多多线程是模拟出来的,真正的多线程是指有多个CPU,如果是模拟出来的多线程,即一个cup的情况下,在同一个时间点,CPU只能执行一个代码,因为切换得很快,所以就有同时执行的错觉。
2、创建线程
方式一: 继承Thread类创建一个线程 (只需要重写run方法即可)
注意:不建议使用,避免oop单继承局限性
public class Thread1 extends Thread {
public Thread1(String name) {
this.setName(name);
}
@Override
public void run() { //当前线程要执行的操作;
for(int i=1;i<10;i++) {
System.out.println(Thread.currentThread().getName() +" -> " + i);
}
}
}
启动:
Thread1 t1 = new Thread1();
t1.start();
**方式二:**实现Runnable接口创建线程
注意:推荐使用,避免了oop单继承局限性,灵活方便,方便同一个对象被多个线程使用
public class Thread2 implements Runnable {
@Override
public void run() {
for(int i=0;i<10;i++) {
System.out.println(Thread.currentThread().getName() +" -> " +i);
}
}
}
启动:
Thread2 t2 = new Thread2();
new Thread(t2).start();
方式三:实现Callable接口创建线程
public class Down implements Callable<Boolean> {
private String url;
private String name;
public Down(String url,String name){
this.url=url;
this.name=name;
}
@Override
public Boolean call() throws Exception {
download(url,name);
System.out.println("下载了文件"+name);
return true;
}
public void download(String url,String name){
try {
FileUtils.copyURLToFile(new URL(url),new File(name));
} catch (IOException e) {
e.printStackTrace();
}
}
}
启动:
Down t1 = new Down("https://baidu.com/static/userImg/160542hs47ii7uvma6xhxm.png","仙贝.jpg");
Down t2 = new Down("https://baidu.com/static/userImg/39904_11232801629343358130.jpg","快递.jpg");
Down t3 = new Down("https://baidu.com/static/userImg/42e731c8184e6521.jpg","分身.jpg");
//创建执行服务
ExecutorService service = Executors.newFixedThreadPool(3);
//提交执行
Future<Boolean> r1 = service.submit(t1);
Future<Boolean> r2 = service.submit(t2);
Future<Boolean> r3 = service.submit(t3);
//获取结果
Boolean rs1 = r1.get();
Boolean rs2 = r2.get();
Boolean rs3 = r3.get();
//打印结果
System.out.println(rs1);
System.out.println(rs2);
System.out.println(rs3);
//关闭服务
service.shutdownNow();
方式四:线程池
背景:经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。
思路:提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。可以避免频繁创建销毁、实现重复利用。类似生活中的公共交通工具。
好处:
- 提高响应速度(减少了创建新线程的时间)
- 降低资源消耗(重复利用线程池中线程,不需要每次都创建)
- 便于线程管理
- corePoolSize: 核心池的大小
- maximumPoolSize: 最大线程数
- keepAliveTime: 线程没有任务时最多保持多长时间后会终止
使用线程池:
1.JDK 5.0起提供了线程池相关API: ExecutorService 和Executors
2.ExecutorService: 真正的线程池接口。常见子类ThreadPoolExecutor
- void execute(Runnable command) :执行任务/命令,没有返回值,一般用来执行Runnable
- Future submit(Callable task):执行任务,有返回值,一般用来执行Callable
- void shutdown() :关闭连接池
3.Executors: 工具类、线程池的工厂类,用于创建并返回不同类型的线程池
// 通过java.util.concurrent.Executors类创建线程池
//1、Executors创建存放单个线程的线程池 ;
ExecutorService executorService = Executors.newSingleThreadExecutor();
//在线程池中放线程:通过execute方法放线程(没有返回值)| 通过submit方法向池中添加线程,有返回值;
executorService.submit(new ThreadA());
if(executorService.isShutdown()==false) {
executorService.shutdown();
}
//2、Executors创建存放固定数量的线程池 ;
ExecutorService executorService =Executors.newFixedThreadPool(5);
for(int i=0;i<5;i++) {
executorService.submit(new ThreadA());
}
//3、Executors创建存放不确定数量的线程池 ;(任意)
ExecutorService executorService = Executors.newCachedThreadPool();
//4、Executors创建作业调度线程池(在指定时间执行或者重复执行)
ScheduledExecutorService schedudService =Executors.newScheduledThreadPool(1);
schedudService.schedule(new ThreadA(), 5, TimeUnit.SECONDS);
//间隔一段时间执行任务,固定于某个时间点执行,不管任务是否执行完毕
ScheduledExecutorService schedudService =Executors.newScheduledThreadPool(1);
schedudService.scheduleAtFixedRate(new ThreadA(), 10, 3, TimeUnit.SECONDS);
//间隔一段时间执行任务,当任务执行完毕后,间隔某段时间后继续执行
ScheduledExecutorService schedudService =Executors.newScheduledThreadPool(1);
schedudService.scheduleWithFixedDealy(new ThreadA(), 10, 3, TimeUnit.SECONDS);
3、线程状态
线程在running状态下可能会出现blocked情况:
- 调用join()和sleep()方法,sleep()时间结束或被打断,join()中断,IO完成都会回到Runnable状态,等待JVM的调度。
- 调用wait(),使该线程处于等待池(wait blocked pool),直到notify()、notifyAll(),线程被唤醒被放到锁定池(lock blocked pool ),释放同步锁使线程回到可运行状态(Runnable)。
- 对Running状态的线程加同步锁(Synchronized)使其进入(lock blocked pool ),同步锁被释放进入可运行状态(Runnable)。
- 在runnable状态的线程是处于被调度的线程,此时的调度顺序是不一定的。Thread类中的yield方法可以让一个running状态的线程转入runnable。
4、停止线程
1.使用退出标志,使线程正常退出,也就是当 run() 方法完成后线程中止。
public class ServerThread extends Thread {
//volatile修饰符用来保证其它线程读取的总是该变量的最新的值
public volatile boolean exit = false;
@Override
public void run() {
ServerSocket serverSocket = new ServerSocket(8080);
while(!exit){
serverSocket.accept(); //阻塞等待客户端消息
...
}
}
public static void main(String[] args) {
ServerThread t = new ServerThread();
t.start();
...
t.exit = true; //修改标志位,退出线程
}
}
2.使用 stop() 方法强行终止线程,但是不推荐使用这个方法,该方法已被弃用。
3.使用 interrupt 方法中断线程,不推荐。
5、线程休眠(sleep)
- sleep(毫秒)指定当前线程阻塞的毫秒数
- sleep存在异常InterruptedException
- sleep时间达到后线程进入就绪状态
- sleep可以模拟网络延时,倒计时等
- 每一个对象都有一个锁,sleep不会释放锁
6、线程礼让(yield)
- 礼让线程,让当前正在执行的线程暂停,但不阻塞
- 将线程从运行状态转为就绪状态
- 让CPU重新调度,礼让不一定成功,看CPU情况
public class TestYield {
public static void main(String[] args) {
TestYieldDemo t = new TestYieldDemo();
new Thread(t,"a").start();
new Thread(t,"b").start();
}
}
class TestYieldDemo implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"线程开始");
Thread.yield();//礼让
System.out.println(Thread.currentThread().getName()+"线程结束");
}
}
礼让成功:
a线程开始
b线程开始
a线程结束
b线程结束
礼让失败:
b线程开始
b线程结束
a线程开始
a线程结束
7、线程强制执行(join)
- join合并线程,待此线程执行完成后,再执行其他线程,其他线程阻塞
- 可以想象成插队
public class TestJoin {
public static void main(String[] args) throws Exception{
MyJoin mj = new MyJoin();
Thread t = new Thread(mj);
t.start();
for (int i = 0; i < 1000; i++) {
if(i == 200){
t.join();
}
System.out.println("main线程"+i);
}
}
}
class MyJoin implements Runnable{
@Override
public void run() {
for (int i = 0; i < 500; i++) {
System.out.println("vip线程来了"+i);
}
}
}
8、监测线程状态(Thread.State)
- NEW尚未启动
- RUNNABLE正在执行
- BLOCKED被阻塞等待监视器锁定
- WAITING等待另一个线程执行特定动作
- TIMED_WAITING等待另一个线程执行动作达到指定等待时间
- TERMINATED已退出
注意:线程死亡后不可再次启动
public class TestState {
public static void main(String[] args) throws Exception{
MyState m = new MyState();
Thread t = new Thread(m);
//观察状态
Thread.State state = t.getState();
System.out.println(state);//NEW
//观察启动后
t.start();
state = t.getState();
System.out.println(state);//RUN
while (state != Thread.State.TERMINATED){//只要线程不终止就一直运行
Thread.sleep(500);
state = t.getState();
System.out.println(state);//TIMED_WAITING与TERMINATED(最后一次输出)
}
}
}
class MyState implements Runnable{
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("///////");
}
}
//输出结果
NEW
RUNNABLE
TIMED_WAITING
TIMED_WAITING
TIMED_WAITING
TIMED_WAITING
TIMED_WAITING
TIMED_WAITING
TIMED_WAITING
TIMED_WAITING
TIMED_WAITING
///////
TERMINATED
9、线程优先级(priority)
- Thread.MIN_PRIORITY = 1
- Thread.MAX_PRIORITY = 10
- Thread.NORM_PRIORITY= 5
- getPriority()
- setPriority(int x)
注意:优先级的设定建议在start()之前;优先级低只意味着被调度的概率低,并不是优先级低就不会被调用了,优先级高同理,都是看CPU调度的。
10、守护线程
-
线程分为用户线程和守护线程
-
虚拟机必须确保用户线程执行完毕
-
虚拟机不用等待守护线程执行完毕
public class TestDaemon { public static void main(String[] args) { God god = new God(); You you = new You(); Thread thread = new Thread(god); //设置god为守护线程 thread.setDaemon(true);//默认为false thread.start(); new Thread(you).start(); } } class God implements Runnable{ @Override public void run() { while (true){ System.out.println("上帝保佑着你"); } } } class You implements Runnable{ @Override public void run() { for (int i = 0; i < 100; i++) { System.out.println("开心地活着"); } System.out.println("goodbey world"); } }
11、线程同步/锁
由于同一进程的多个线程共享同一块存储空间,在带来方便的同时也带来了访问冲突问题,为了保证数据在方法中被访问时的正确性,在访问时加入锁机制synchronized,当一个线程获得对象的排他锁,独占资源,其他线程必须等待,使用后释放锁即可,但存在以下问题:
- 一个线程持有锁会导致其他所有需要此锁的线程挂起
- 在多线程竞争下,加锁,释放锁会导致比较多的上下文切换和调度延时,引起性能问题
- 如果一个优先级高的线程等待一个优先级低的线程释放锁,会导致优先级倒置,引起性能问题
monitor
他们是应用于同步问题的人工线程调度工具。Java中的每个对象都有一个监视器,来监测并发代码的重入。在非多线程编码时该监视器不发挥作用,反之如果在synchronized 范围内,监视器发挥作用。
wait/notify必须存在于synchronized块中。并且,这三个关键字针对的是同一个监视器(某对象的监视器)。这意味着wait之后,其他线程可以进入同步块执行。
当某代码并不持有监视器的使用权时去wait或notify,会抛出java.lang.IllegalMonitorStateException。也包括在synchronized块中去调用另一个对象的wait/notify,因为不同对象的监视器不同,同样会抛出此异常。
synchronized方法
public synchronized void method(int args){
todo...
};
synchronized方法控制对对象的访问,每个对象对应一把锁,每个synchronized方法都必须获得调用该方法的对象的锁才能执行,否则线程会阻塞,方法一旦执行,就独占该锁,直到该方法结束才释放锁,后面被阻塞的线程才能获得这个锁继续执行。
public class BuyTicket {
public static void main(String[] args) {
Buy station = new Buy();
new Thread(station,"我").start();
new Thread(station,"你").start();
new Thread(station,"黄牛").start();
}
}
class Buy implements Runnable{
private int num = 10000;
private Boolean flag = true;
@Override
public void run() {
while (flag){
buy();
}
}
private synchronized void buy(){
if(num <= 0){
flag = false;
return;
}
System.out.println(Thread.currentThread().getName()+"买到了第"+num+"张票");
num--;
}
}
synchronized块
synchronized (Obj){
todo...
}
Obj称之为同步监视器,可以是任何对象,但是推荐使用共享资源作为同步监视器
public class UnsafeList {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
new Thread(()->{
synchronized (list){
list.add(Thread.currentThread().getName());
}
}).start();
}
int j = 0;
for (int i = 0; i < 21000; i++) {
j = 500 + j++ - 100;
System.out.println(j);
}
System.out.println(list.size());
}
}
线程同步 :一次只允许某一个线程对某一资源进行访问称为线程同步
volatile:保证变量的内存可见性;禁止指令重排序 ;volatile修饰的变量一旦被某个线程更改,必须立即刷新到主内存 ;所有volatile修饰的变量在使用之前必须重新读取主内存的值
锁的原理 :
Java中每个对象都有一个内置锁。
当程序运行到非静态的synchronized同步方法上时,自动获得与正在执行代码类的当前实例(this实例)有关的锁。
获得一个对象的锁也称为获取锁、锁定对象、在对象上锁定或在对象上同步。
当程序运行到synchronized同步方法或代码块时该对象锁才起作用。
一个对象只有一个锁。所以,如果一个线程获得该锁,就没有其他线程可以获得锁,直到第一个线程释放(或返回)锁。这也意味着任何其他线程都不能进入该对象上的synchronized方法或代码块,直到该锁被释放。 释放锁是指持锁线程退出了synchronized同步方法或代码块。
关于锁和同步,有一下几个要点:
1)、只能同步方法,而不能同步变量和类;
2)、每个对象只有一个锁;当提到同步时,应该清楚在什么上同步?也就是说,在哪个对象上同步?
3)、不必同步类中所有的方法,类可以同时拥有同步和非同步方法。
4)、如果两个线程要执行一个类中的synchronized方法,并且两个线程使用相同的实例来调用方法,那么一次只能有一个线程能够执行方法,另一个需要等待,直到锁被释放。也就是说:如果一个线程在对象上获得一个锁,就没有任何其他线程可以进入(该对象的)类中的任何一个同步方法。
5)、如果线程拥有同步和非同步方法,则非同步方法可以被多个线程自由访问而不受锁的限制。
6)、线程睡眠时,它所持的任何锁都不会释放。
7)、线程可以获得多个锁。比如,在一个对象的同步方法里面调用另外一个对象的同步方法,则获取了两个对象的同步锁。
8)、同步损害并发性,应该尽可能缩小同步范围。同步不但可以同步整个方法,还可以同步方法中一部分代码块。
9)、在使用同步代码块时候,应该指定在哪个对象上同步,也就是说要获取哪个对象的锁。
如果线程不能获得锁会怎么样 :
如果线程试图进入同步方法,而其锁已经被占用,则线程在该对象上被阻塞。实质上,线程进入该对象的锁定池中,必须在哪里等待,直到其锁被释放,该线程再次变为可运行或运行为止。
当考虑阻塞时,一定要注意哪个对象正被用于锁定:
1、调用同一个对象中非静态同步方法的线程将彼此阻塞。如果是不同对象,则每个线程有自己的对象的锁,线程间彼此互不干预。
2、调用同一个类中的静态同步方法的线程将彼此阻塞,它们都是锁定在相同的Class对象上。
3、静态同步方法和非静态同步方法将永远不会彼此阻塞,因为静态方法锁定在Class对象上,非静态方法锁定在该类的对象上。
4、对于同步代码块,要看清楚什么对象已经用于锁定(synchronized后面括号的内容)。在同一个对象上进行同步的线程将彼此阻塞,在不同对象上锁定的线程将永远不会彼此阻塞。
何时需要同步
在多个线程同时访问互斥(可交换)数据时,应该同步以保护数据,确保两个线程不会同时修改更改它。
对于非静态字段中可更改的数据,通常使用非静态方法访问。
对于静态字段中可更改的数据,通常使用静态方法访问。 如果需要在非静态方法中使用静态字段,或者在静态字段中调用非静态方法,问题将变得非常复杂。
线程安全类
当一个类已经很好的同步以保护它的数据时,这个类就称为“线程安全的”。
即使是线程安全类,也应该特别小心,因为操作的线程之间仍然不一定安全。
小结
1、线程同步的目的是为了保护多个线程访问一个资源时对资源的破坏。
2、线程同步方法是通过锁来实现,每个对象都有且仅有一个锁,这个锁与一个特定的对象关联,线程一旦获取了对象锁,其他访问该对象的线程就无法再访问该对象的其他同步方法。
3、对于静态同步方法,锁是针对这个类的,锁对象是该类的Class对象。静态和非静态方法的锁互不干预。一个线程获得锁,当在一个同步方法中访问另外对象上的同步方法时,会获取这两个对象锁。
4、对于同步,要时刻清醒在哪个对象上同步,这是关键。
5、编写线程安全的类,需要时刻注意对多个线程竞争访问资源的逻辑和安全做出正确的判断,对“原子”操作做出分析,并保证原子操作期间别的线程无法访问竞争资源。
6、当多个线程等待一个对象锁时,没有获取到锁的线程将发生阻塞。
7、死锁是线程间相互等待锁造成的,在实际中发生的概率非常的小。
12、死锁
多个线程各自占有一些共享资源,并且互相等待其他线程占有的资源才能运行,而导致两个或者多个线程都在等待对方释放资源,都停止执行的情况,某一个同步块同时拥有两个或以上对象的锁时,就可能会发生死锁的问题。
产生死锁的四个必要条件:
- 互斥条件:一个资源每次只能被一个线程使用
- 请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:线程已获得的资源,在未使用完之前,不能强行剥夺
- 循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系
上面的四个条件,我们只要想办法破其中的任意一个或多个就可以避免死锁发生
public class DeadLock {
public static void main(String[] args) {
Makeup m1 = new Makeup(0,"灰姑凉");
Makeup m2 = new Makeup(1,"白雪公主");
new Thread(m1).start();
new Thread(m2).start();
}
}
class Mirror{
}
class Lipstick{
}
class Makeup implements Runnable{
//需要的资源只有一份,需要用static修饰
static Mirror mirror = new Mirror();
static Lipstick lipstick = new Lipstick();
private int choose;//选择
private String name;//名字
public Makeup(int choose,String name){
this.choose=choose;
this.name=name;
}
@Override
public void run() {
//化妆
try {
makeup();
} catch (Exception e) {
e.printStackTrace();
}
}
private void makeup() throws Exception{
if(choose == 0){
//获得口红的锁
synchronized (lipstick){
System.out.println(name+"获得口红的锁");
Thread.sleep(1000);
//一秒钟后获取镜子的锁
synchronized (mirror){
System.out.println(name+"获得镜子的锁");
}
}
}else{
//获得镜子的锁
synchronized (mirror){
System.out.println(name+"获得镜子的锁");
Thread.sleep(2000);
//两秒钟后获取口红的锁
synchronized (lipstick){
System.out.println(name+"获得口红的锁");
}
}
}
}
}
解决:避免一个对象获得两把锁
public class DeadLock {
public static void main(String[] args) {
Makeup m1 = new Makeup(0,"灰姑凉");
Makeup m2 = new Makeup(1,"白雪公主");
new Thread(m1).start();
new Thread(m2).start();
}
}
class Mirror{
}
class Lipstick{
}
class Makeup implements Runnable{
//需要的资源只有一份,需要用static修饰
static Mirror mirror = new Mirror();
static Lipstick lipstick = new Lipstick();
private int choose;//选择
private String name;//名字
public Makeup(int choose,String name){
this.choose=choose;
this.name=name;
}
@Override
public void run() {
//化妆
try {
makeup();
} catch (Exception e) {
e.printStackTrace();
}
}
private void makeup() throws Exception{
if(choose == 0){
//获得口红的锁
synchronized (lipstick){
System.out.println(name+"获得口红的锁");
Thread.sleep(1000);
}
//一秒钟后获取镜子的锁
synchronized (mirror){
System.out.println(name+"获得镜子的锁");
}
}else{
//获得镜子的锁
synchronized (mirror){
System.out.println(name+"获得镜子的锁");
Thread.sleep(2000);
}
//两秒钟后获取口红的锁
synchronized (lipstick){
System.out.println(name+"获得口红的锁");
}
}
}
}
13、Lock锁
synchronized为隐式的锁,Lock为显式的锁。
- 从JDK 5.0开始,Java提供了更强大的线程同步机制:通过显式定义同步锁对象来实现同步。同步锁使用Lock对象充当
- java.util.concurrent.locks.Lock接口是控制多个线程对共享资源进行访问的工具。锁提供了对共享资源的独占访问,每次只能有一个线程对Lock对象加锁,线程开始访问共享资源之前应先获得Lock对象
- ReentrantLock(可重入锁)类实现了Lock,它拥有与synchronized相同的并发性和内存语义,在实现线程安全的控制中,比较常用的是ReentrantLock,可以显式加锁、释放锁
synchronized与lock的对比:
- Lock是显式锁(手动开启和关闭锁,别忘记关闭锁) ,synchronized是隐式锁, 出了作用域自动释放
- Lock只有代码块锁,synchronized有代码块锁和方法锁
- 使用Lock锁,JVM将花费较少的时间来调度线程,性能更好。并且具有更好的扩展性(提供更多的子类)
- 优先使用顺序:
Lock > 同步代码块(已经进入了方法体,分配了相应资源) > 同步方法(在方法体之外)
public class TestLock {
public static void main(String[] args) {
Ticket t = new Ticket();
new Thread(t,"我").start();
new Thread(t,"你").start();
new Thread(t,"黄牛").start();
}
}
class Ticket implements Runnable{
//创建私有的final的锁对象
private final ReentrantLock lock = new ReentrantLock();
private int num = 1000;
@Override
public void run() {
buy();
}
public void buy(){
while (true) {
//加锁
lock.lock();
try {
if (num <= 0) {
break;
}
System.out.println(Thread.currentThread().getName() + "购买了第" + num--);
} finally {
//解锁
lock.unlock();
}
}
}
}
注意:加锁需放在try语句上一行,解锁需放在finally中
14、线程通信
- void wait():当前线程等待,会释放锁,等待其他线程调用此对象的notify()方法或notifyAll()方法
- void notify():唤醒在此对象锁上等待的单个线程
- void notifyAll():唤醒在此对象锁上等待的所有线程
- volatile:保证变量的内存可见性;禁止指令重排序 ;volatile修饰的变量一旦被某个线程更改,必须立即刷新到主内存 ;所有volatile修饰的变量在使用之前必须重新读取主内存的值
- **execute()**方法只能放Runnable类型的线程
- **submit()**可以放Runnable和Callable类型的线程
15、多线程控制类
Java1.5提供了一个非常高效实用的多线程包:java.util.concurrent, 提供了大量高级工具,可以帮助开发者编写高效、易维护、结构清晰的Java多线程程序。
1.ThreadLocal类
用处:保存线程的独立变量。对一个线程类(继承自Thread),当使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。常用于用户登录控制,如记录session信息。
实现:每个Thread都持有一个TreadLocalMap类型的变量(该类是一个轻量级的Map,功能与map一样,区别是桶里放的是entry而不是entry的链表。功能还是一个map。)以本身为key,以目标为value。
主要方法是get()和set(T a),set之后在map里维护一个threadLocal -> a,get时将a返回。ThreadLocal是一个特殊的容器。
2.原子类
类别 | 描述 | 实例 |
---|---|---|
Atomic* | 基本类型原子类 | AtomicInteger AtomicLong AtomicBoolean |
Atomic*Array | 数组类型原子类 | AtomicIntegerArray AtomicLongArray AtomicReferenceArray |
Atomic*Reference | 应用类型原子类 | AtomicReference AtomicStampedReference AtomicMarkAbleReference |
Atomic*FieldUpdater | 升级类型原子类 | AtomicIntegerFieldUpdater AtomicLongFieldUpdater AtomicReferenceFieldUpdater |
*Adder | 累加器 | LongAdder DoubleAdder |
*Accumulator | 累加器 | LongAdder DoubleAdder |
原子类相比于普通的锁,粒度更细、效率更高(除了高度竞争的情况下)
3.Lock类
在java.util.concurrent包内。共有三个实现:
ReentrantLock
ReentrantReadWriteLock.ReadLock
ReentrantReadWriteLock.WriteLock
主要目的是和synchronized一样, 两者都是为了解决同步问题,处理资源争端而产生的技术。功能类似但有一些区别。
区别如下:
- lock更灵活,可以自由定义多把锁的枷锁解锁顺序(synchronized要按照先加的后解顺序)
- 提供多种加锁方案,lock 阻塞式, trylock 无阻塞式, lockInterruptily 可打断式, 还有trylock的带超时时间版本
- 性能更高
- 本质上和监视器锁(即synchronized是一样的)
- 必须控制好加锁和解锁,否则会导致灾难
1、ReentrantLock
可重入的意义在于持有锁的线程可以继续持有,并且要释放对等的次数后才真正释放该锁。
1.new一个实例
static ReentrantLock r=new ReentrantLock();
2.加锁
r.lock()或r.lockInterruptibly();
此处也是个不同,后者可被打断。当a线程lock后,b线程阻塞,此时如果是lockInterruptibly,那么在调用b.interrupt()之后,b线程退出阻塞,并放弃对资源的争抢,进入catch块。(如果使用后者,必须throw interruptable exception 或catch)
3.释放锁
r.unlock()
必须放在finally里面。以防止异常跳出了正常流程,导致灾难。经过测试,哪怕是发生了OutOfMemoryError,finally块中的语句执行也能够得到保证。
2、ReentrantReadWriteLock
可重入读写锁(读写锁的一个实现)
ReentrantReadWriteLock lock = new ReentrantReadWriteLock()
ReadLock r = lock.readLock();
WriteLock w = lock.writeLock();
两者都有lock,unlock方法。写写,写读互斥;读读不互斥。可以实现并发读的高效线程安全代码。
3、容器类
1.BlockingQueue
阻塞队列。该类是java.util.concurrent包下的重要类,通过对Queue的学习可以得知,这个queue是单向队列,可以在队列头添加元素和在队尾删除或取出元素。类似于一个管道,特别适用于先进先出策略的一些应用场景。
BlockingQueue在队列的基础上添加了多线程协作的功能:
方法 | 描述 |
---|---|
offer(E e) | 如果队列没满,立即返回true; 如果队列满了,立即返回false–>不阻塞 |
put(E e) | 如果队列满了,一直阻塞,直到队列不满了或者线程被中断–>阻塞 |
offer(E e, long timeout, TimeUnit unit) | 在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况: 被唤醒 时间超时 线程中断 |
poll() | 如果没有元素,直接返回null;如果有元素,出队–>不阻塞 |
take() | 如果队列空了,一直阻塞,直到队列不为空或者线程被中断–>阻塞 |
poll(long timeout, TimeUnit unit) | 如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况: 被唤醒 时间超时 线程中断 |
常见的阻塞队列:
ArrayListBlockingQueue
LinkedListBlockingQueue
DelayQueue
SynchronousQueue
ArrayBlockingQueue:
-
一个对象数组 + 一把锁 + 两个条件
-
入队与出队都用同一把锁
-
在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高
-
采用了数组,必须指定大小,即容量有限
LinkedBlockingQueue:
-
一个单向链表 + 两把锁 + 两个条件
-
两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争。
-
在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多
-
采用了链表,最大容量为整数最大值,可看做容量无限
2.ConcurrentHashMap
在并发使用到HashMap的时候,往往不建议直接用HashMap,因为HashMap在并发写数据的时候容易因为rehash的过程产生环形链表的情况。所以在并发使用Map结构时,一般建议使用ConcurrentHashMap。
JDK8中ConcurrentHashMap参考了JDK8 HashMap的实现,采用了数组+链表+红黑树的实现方式来设计,内部大量采用CAS操作。并发控制使⽤synchronized 和 CAS 来操作。(JDK1.6 以后 对 synchronized 锁做了很多优化) 整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但
是已经简化了属性,只是为了兼容旧版本;
JDK1.8的Nod节点中value和next都用volatile修饰,保证并发的可见性。
可以理解为,synchronized 只锁定当前链表或红黑⼆叉树的首节点,这样只要 hash 不冲突,就不会产生并发,效率又提升 N 倍。
4、管理类
ThreadPoolExecutor
通过线程池创建线程的方式为:
//可变大小线程池
ExecutorService e = Executors.newCachedThreadPool();
//单线程池
ExecutorService e = Executors.newSingleThreadExecutor();
//固定大小线程池
ExecutorService e = Executors.newFixedThreadPool(3);
//运行线程
e.execute(new MyRunnableImpl());
查看源码可以发现,线程池创建线程的方式都是通过ThreadPoolExecutor实现的,本质上,都是ThreadPoolExecutor类的各种实现版本。
以最后一个构造方法(参数最多),对参数进行解释:
public ThreadPoolExecutor(int corePoolSize, // 1
int maximumPoolSize, // 2
long keepAliveTime, // 3
TimeUnit unit, // 4
BlockingQueue<Runnable> workQueue, // 5
ThreadFactory threadFactory, // 6
RejectedExecutionHandler handler ) { //7
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
其中的参数如下:
序号 | 名称 | 类型 | 含义 |
---|---|---|---|
1 | corePoolSize | int | 核心线程池大小 |
2 | maximumPoolSize | int | 最大线程池大小 |
3 | keepAliveTime | long | 线程最大空闲时间 |
4 | unit | TimeUnit | 时间单位 |
5 | workQueue | BlockingQueue | 线程等待队列 |
6 | threadFactory | ThreadFactory | 线程创建工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
16、生产者消费者模型
1.用synchronized、wait和notify的方式
1、创建一个仓库
public class ShareData {
public static AtomicInteger atomicInteger = new AtomicInteger();
public volatile boolean flag = true;
public static final int MAX_COUNT = 10;
public static final List<Integer> pool = new ArrayList<>();
public void produce(){
while (flag){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (pool){
while (pool.size() == MAX_COUNT){
System.out.println("商品满啦。。。");
try {
pool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
pool.add(atomicInteger.incrementAndGet());
System.out.println(Thread.currentThread().getName() + "正在生产商品,目前剩余:" + pool.size());
pool.notifyAll();
}
}
}
public void consumue(){
while (flag){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (pool){
while (pool.size() == 0){
System.out.println("没货啦。。。。");
try {
pool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
pool.remove(0);
System.out.println(Thread.currentThread().getName() + "消费了商品,目前剩余:" + pool.size());
pool.notifyAll();
}
}
}
public void stop(){
flag = false;
}
}
2、创建线程(也可直接使用lamda表达式)
public class C implements Runnable{
private ShareData shareData;
public C(ShareData shareData){
this.shareData = shareData;
}
@Override
public void run() {
shareData.consumue();
}
}
public class P implements Runnable{
private ShareData shareData;
public P(ShareData shareData){
this.shareData = shareData;
}
@Override
public void run() {
shareData.produce();
}
}
3、启动线程
public class Client {
/**
* 设置线程名
*/
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();
//手动创建线程池,阿里巴巴java规范不允许使用Executers的方式创建线程池
private static ExecutorService e = new ThreadPoolExecutor(5,
20,200L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),namedThreadFactory);
public static void main(String[] args) {
ShareData shareData = new ShareData();
e.submit(new Thread(new P(shareData)));
e.submit(new Thread(new C(shareData)));
e.submit(new Thread(new P(shareData)));
e.submit(new Thread(new C(shareData)));
e.submit(new Thread(new P(shareData)));
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("即将停止线程");
shareData.stop();
//关闭线程池
e.shutdown();
}
}
2.使用Lock,Condition的await和signal方法
JUC包下的锁Lock替代synchronize关键字。await方法代替wait,signal代替notifyall。
1、创建仓库
public class ShareData {
private int count = 0;
private int max = 10;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public volatile boolean flag = true;
public void produce(){
while (flag){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
while (count == max){
System.out.println("货满啦。。。");
condition.await();
}
count++;
System.out.println(Thread.currentThread().getName() + "生产了商品,目前剩余:" + count);
condition.signalAll();
} catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public void consumue(){
while (flag){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
while (count == 0){
System.out.println("没货啦。。。");
condition.await();
}
count--;
System.out.println(Thread.currentThread().getName() + "消费了商品,目前剩余:" + count);
condition.signalAll();
} catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public void stop(){
flag = false;
}
}
2、创建线程(也可使用lamda表达式)
public class C implements Runnable{
private ShareData shapeData;
public C(ShareData shapeData){
this.shapeData = shapeData;
}
@Override
public void run() {
shapeData.consumue();
}
}
public class P implements Runnable{
private ShareData shapeData;
public P(ShareData shapeData){
this.shapeData = shapeData;
}
@Override
public void run() {
shapeData.produce();
}
}
3、启动线程
public class Client {
/**
* 设置线程名
*/
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();
//手动创建线程池,拉里巴巴java规范不允许使用Executers的方式创建线程池
private static ExecutorService e = new ThreadPoolExecutor(5,
20,200L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),namedThreadFactory);
public static void main(String[] args) {
ShareData shareData = new ShareData();
e.submit(new C(shareData));
e.submit(new P(shareData));
e.submit(new C(shareData));
e.submit(new P(shareData));
e.submit(new P(shareData));
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("即将停止线程");
shareData.stop();
//关闭线程池
e.shutdown();
}
}
3、使用阻塞队列
当阻塞队列为空时,从阻塞队列中取数据的操作会被阻塞。
当阻塞队列为满时,往阻塞队列中添加数据的操作会被阻塞。
1、创建仓库
public class ShareData {
public static final int MAX = 10;
/**
* 阻塞队列
*/
private static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(MAX);
private volatile boolean flag = true;
private AtomicInteger atomicInteger = new AtomicInteger();
public void produce() throws InterruptedException{
while (flag){
boolean retvalue = queue.offer(atomicInteger.incrementAndGet(), 2, TimeUnit.SECONDS);
if(retvalue){
System.out.println(Thread.currentThread().getName() + "生产资源:" + atomicInteger.get() + "成功,目前剩余:" + queue.size());
}else{
System.out.println(Thread.currentThread().getName() + "生产资源:" + atomicInteger.get() + "失败,目前剩余:" + queue.size());
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println("生产停止。。。");
}
public void consume() throws InterruptedException{
Integer result = null;
while (true){
result = queue.poll(2, TimeUnit.SECONDS);
if(result == null){
System.out.println(Thread.currentThread().getName() + "未获取到资源,即将退出");
return;
}
System.out.println(Thread.currentThread().getName() + "获取资源成功,目前剩余:" + queue.size());
Thread.sleep(1500);
}
}
public void stop(){
flag = false;
}
}
2、创建线程
public class C implements Runnable{
private ShareData shareData;
public C(ShareData shareData){
this.shareData = shareData;
}
@Override
public void run() {
try {
shareData.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class P implements Runnable{
private ShareData shareData;
public P(ShareData shareData){
this.shareData = shareData;
}
@Override
public void run() {
try {
shareData.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3、启动线程
public class Client {
/**
* 设置线程名
*/
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();
//手动创建线程池,拉里巴巴java规范不允许使用Executers的方式创建线程池
private static ExecutorService e = new ThreadPoolExecutor(5,
20,200L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),namedThreadFactory);
public static void main(String[] args) {
ShareData shareData = new ShareData();
e.submit(new C(shareData));
e.submit(new P(shareData));
e.submit(new P(shareData));
e.submit(new C(shareData));
e.submit(new P(shareData));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("即将停止线程");
shareData.stop();
//关闭线程池
e.shutdown();
}
}
4、结果
thread-call-runner-1生产资源:1成功,目前剩余:0
thread-call-runner-2生产资源:2成功,目前剩余:1
thread-call-runner-0获取资源成功,目前剩余:0
thread-call-runner-3获取资源成功,目前剩余:0
thread-call-runner-4生产资源:3成功,目前剩余:1
thread-call-runner-1生产资源:6成功,目前剩余:2
可以看到结果看起来并不像是线程安全的样子,这是因为入队、出队语句与打印结果语句是分开的,不是一个原子操作,实际上入队出队是线程安全的,可以在入队、出队语句和打印结果语句中加入lock锁或者synchronize。
public class ShareData {
public static final int MAX = 10;
/**
* 阻塞队列
*/
private static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(MAX);
private volatile boolean flag = true;
private AtomicInteger atomicInteger = new AtomicInteger();
public void produce() throws InterruptedException{
while (flag){
//进行同步
synchronized (queue){
boolean retvalue = queue.offer(atomicInteger.incrementAndGet(), 2, TimeUnit.SECONDS);
if(retvalue){
System.out.println(Thread.currentThread().getName() + "生产资源:" + atomicInteger.get() + "成功,目前剩余:" + queue.size());
}else{
System.out.println(Thread.currentThread().getName() + "生产资源:" + atomicInteger.get() + "失败,目前剩余:" + queue.size());
}
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println("生产停止。。。");
}
public void consume() throws InterruptedException{
Integer result = null;
while (true){
//进行同步
synchronized (queue){
result = queue.poll(2, TimeUnit.SECONDS);
if(result == null){
System.out.println(Thread.currentThread().getName() + "未获取到资源,即将退出");
return;
}
System.out.println(Thread.currentThread().getName() + "获取资源成功,目前剩余:" + queue.size());
}
Thread.sleep(1500);
}
}
public void stop(){
flag = false;
}
}
结果:
thread-call-runner-0未获取到资源,即将退出
thread-call-runner-4生产资源:1成功,目前剩余:1
thread-call-runner-3获取资源成功,目前剩余:0
thread-call-runner-2生产资源:2成功,目前剩余:1
thread-call-runner-1生产资源:3成功,目前剩余:2
thread-call-runner-2生产资源:4成功,目前剩余:3
thread-call-runner-1生产资源:5成功,目前剩余:4
thread-call-runner-4生产资源:6成功,目前剩余:5
thread-call-runner-3获取资源成功,目前剩余:4
thread-call-runner-1生产资源:7成功,目前剩余:5
thread-call-runner-4生产资源:8成功,目前剩余:6
thread-call-runner-2生产资源:9成功,目前剩余:7
thread-call-runner-3获取资源成功,目前剩余:6
thread-call-runner-1生产资源:10成功,目前剩余:7
thread-call-runner-2生产资源:11成功,目前剩余:8
thread-call-runner-4生产资源:12成功,目前剩余:9
thread-call-runner-1生产资源:13成功,目前剩余:10
thread-call-runner-2生产资源:14失败,目前剩余:10
即将停止线程
thread-call-runner-1生产资源:15失败,目前剩余:10
thread-call-runner-3获取资源成功,目前剩余:9
thread-call-runner-4生产资源:16成功,目前剩余:10
生产停止。。。
生产停止。。。
thread-call-runner-2生产资源:17失败,目前剩余:10
thread-call-runner-3获取资源成功,目前剩余:9