JUC 学习
本文最后更新于:1 年前
JUC 学习
1 什么是 JUC ?
- java.util 工具包,包,分类
- 业务:普通的线程代码 Thread
- Runnable 没有返回值,效率相比 Callable 相对较低
2 线程和进程
2.1 进程
进程:一个进程往往可以包含几个线程,至少包含一个!
java默认有几个线程?2个:main, GC
对于Java 而言: Thread,Runable,Callable
Java真的可以开启线程吗?开不了
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
// 本地方法,底层C++,Java无法操作硬件
private native void start0();
2.2 并发和并行
并发(多个线程操作同一个资源)
- CPU 一个内核,模拟出来的多线程,快速交替
并行(多个人一起行走)
- CPU 多个内核,多个线程可以同时执行
public class Test1 {
public static void main(String[] args) {
// 获取CPU核数
// CPU 密集型, IO 密集型
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
并发编程的本质:充分利用CPU的资源
2.3 线程的状态
public enum State {
// 新生
NEW,
// 运行中
RUNNABLE,
// 阻塞
BLOCKED,
// 等待,死死等
WAITING,
// 超时等待
TIMED_WAITING,
// 终止
TERMINATED;
}
2.4 wait/sleep的区别
- 来自不同的类
- wait => Object
- sleep => Thread
- 关于锁的释放
- wait会释放锁
- sleep 抱着锁睡觉,不会释放
- 使用范围不同
- wait 必须在同步代码块
- sleep 可以在任何地方睡
3 Lock
Lock 接口
公平锁: 十分公平, 可以先来后到
非公平锁: 可以插队
synchronized
package com.pnca.kaungshen.demo01; import lombok.Synchronized; /** * 真正的多线程, 公司中的开发 * 线程就是一个单独的资源类, 没有任何的附属操作 * 1 属性, 方法 */ public class SaleTicketDemo01 { public static void main(String[] args) { // 并发: 多线程操作同一个资源类, 把资源类丢入线程 Ticket ticket = new Ticket(); // @FunctionalInterface 函数式接口. jdk1.8 lambda 表达式 (参数) -> { 代码 } new Thread(() -> { for (int i = 0; i < 50; i++) { ticket.sale(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 50; i++) { ticket.sale(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 50; i++) { ticket.sale(); } }, "C").start(); } } // 资源类 OOP class Ticket { // 属性 方法 private int number = 50; // 卖票的方法 // synchronized 本质: 队列, 锁 public synchronized void sale() { if (number > 0) { System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "票, 剩余: " + number + "票!"); } } }
Lock
package com.pnca.kaungshen.demo01; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author pncalbl * @date 2021/5/19 20:39 * @e-mail pncalbl@qq.com * @description **/ public class SaleTicketDemo02 { public static void main(String[] args) { // 并发: 多线程操作同一个资源类, 把资源类丢入线程 Ticket2 ticket = new Ticket2(); // @FunctionalInterface 函数式接口. jdk1.8 lambda 表达式 (参数) -> { 代码 } new Thread(() -> { for (int i = 0; i < 50; i++) { ticket.sale(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 50; i++) { ticket.sale(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 50; i++) { ticket.sale(); } }, "C").start(); } } // 资源类 OOP class Ticket2 { // 属性 方法 private int number = 50; Lock lock = new ReentrantLock(); // 卖票的方法 public void sale() { lock.lock(); try { if (number > 0) { System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "票, 剩余: " + number + "票!"); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
synchronized 和 lock 的区别
- synchronized 内置的 Java 关键字, Lock 是一个 Java 类
- synchronized 无法判断获取锁的状态, Lock 可以判断是否获取到了锁
- synchronized 会自动释放锁, Lock 必须要手动释放锁! 如果不释放锁, 就会导致死锁
- synchronized 线程1(获得锁, 阻塞), 线程2(等待, 傻傻的等); Lock 锁就不一定会等待下去
- synchronized 可重入锁, 不可以中断, 非公平; Lock, 可重入锁, 可以判断锁, 自己设置公平与非公平
- synchronized 适合锁少量的代码, Lock 适合锁大量的同步代码
锁是什么, 如何判断锁的是谁?
4 生产者和消费者问题
单个生产者和消费者(synchronized)
package com.pnca.kaungshen.pc; /** * @author pncalbl * @date 2021/5/21 21:16 * @e-mail pncalbl@qq.com * @description **/ /* * 线程之间的通信问题: 生产者和消费者问题! 等待唤醒, 通知唤醒 * 线程交替执行 A B 操作同一个变量 num = 0 */ public class A { public static void main(String[] args) { Data data = new Data(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); } } // 等待 业务 通知 class Data { private int number = 0; // +1 public synchronized void increment() throws InterruptedException { if (number != 0) { // 等待 this.wait(); } number++; System.out.println(Thread.currentThread().getName() + " ==> " + number); // 通知其他线程, 我 +1 完毕 this.notifyAll(); } // -1 public synchronized void decrement() throws InterruptedException { if (number == 0) { // 等待 this.wait(); } number--; System.out.println(Thread.currentThread().getName() + " ==> " + number); // 通知其他线程, 我 -1 完毕 this.notifyAll(); } }
虚假唤醒
多个生产者和消费者(synchronized)
package com.pnca.kaungshen.pc; /** * @author pncalbl * @date 2021/5/21 21:16 * @e-mail pncalbl@qq.com * @description **/ /* * 线程之间的通信问题: 生产者和消费者问题! 等待唤醒, 通知唤醒 * 线程交替执行 A B 操作同一个变量 num = 0 */ public class A { public static void main(String[] args) { Data data = new Data(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "D").start(); } } // 等待 业务 通知 class Data { private int number = 0; // +1 public synchronized void increment() throws InterruptedException { while (number != 0) { // 等待 this.wait(); } number++; System.out.println(Thread.currentThread().getName() + " ==> " + number); // 通知其他线程, 我 +1 完毕 this.notifyAll(); } // -1 public synchronized void decrement() throws InterruptedException { while (number == 0) { // 等待 this.wait(); } number--; System.out.println(Thread.currentThread().getName() + " ==> " + number); // 通知其他线程, 我 -1 完毕 this.notifyAll(); } }
JUC版的生产者和消费者
通过 Lock 找到 Condition
代码
package com.pnca.kaungshen.pc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author pncalbl * @date 2021/5/21 21:16 * @e-mail pncalbl@qq.com * @description **/ /* * 线程之间的通信问题: 生产者和消费者问题! 等待唤醒, 通知唤醒 * 线程交替执行 A B 操作同一个变量 num = 0 */ public class B { public static void main(String[] args) { Data2 data = new Data2(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "D").start(); } } // 等待 业务 通知 class Data2 { private int number = 0; final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); // +1 public void increment() throws InterruptedException { lock.lock(); try { while (number != 0) { // 等待 notFull.await(); } number++; System.out.println(Thread.currentThread().getName() + " ==> " + number); // 通知其他线程, +1 完毕 notEmpty.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } // -1 public void decrement() throws InterruptedException { lock.lock(); try { while (number == 0) { // 等待 notEmpty.await(); } number--; System.out.println(Thread.currentThread().getName() + " ==> " + number); // 通知其他线程, -1 完毕 notFull.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
Condation 精确唤醒
package com.pnca.kaungshen.pc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author pncalbl * @date 2021/5/21 21:54 * @e-mail pncalbl@qq.com * @description **/ /* 依次调用 A -> B -> C */ public class C { public static void main(String[] args) { Data3 data = new Data3(); new Thread(() -> { for (int i = 0; i < 10; i++) { data.printA(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { data.printB(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { data.printC(); } }, "C").start(); } } class Data3 { private final Lock lock = new ReentrantLock(); Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); Condition condition3 = lock.newCondition(); private int number = 1; public void printA() { lock.lock(); try { // 业务 while (number != 1) { condition1.await(); } System.out.println(Thread.currentThread().getName() + " ==> AAA"); number = 2; condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB() { lock.lock(); try { // 业务 while (number != 2) { condition2.await(); } System.out.println(Thread.currentThread().getName() + " ==> BBB"); number = 3; condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC() { lock.lock(); try { // 业务 while (number != 3) { condition3.await(); } System.out.println(Thread.currentThread().getName() + " ==> CCC"); number = 1; condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
4 八锁现象
如何判断锁的是谁!锁到底锁的是谁?
锁会锁住:对象、Class。
深刻理解我们的锁。
八个问题
两个同步方法,先执行发短信还是打电话
public class dome01 { public static void main(String[] args) { Phone phone = new Phone(); new Thread(() -> { phone.sendMs(); }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { phone.call(); }).start(); } } class Phone { public synchronized void sendMs() { System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } }
输出结果为
发短信
打电话
为什么? 如果你认为是顺序在前? 这个答案是错误的!
我们再来看:我们让发短信 延迟4s
public class dome01 { public static void main(String[] args) throws InterruptedException { Phone phone = new Phone(); new Thread(() -> { try { phone.sendMs(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { phone.call(); }).start(); } } class Phone { public synchronized void sendMs() throws InterruptedException { TimeUnit.SECONDS.sleep(4); System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } }
现在结果是什么呢?
结果:还是先发短信,然后再打电话!
why?
原因:并不是顺序执行,而是synchronized 锁住的对象是方法的调用!对于两个方法用的是同一个锁,谁先拿到谁先执行,另外一个等待
加一个普通方法
public class dome01 { public static void main(String[] args) throws InterruptedException { Phone phone = new Phone(); new Thread(() -> { try { phone.sendMs(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { phone.hello(); }).start(); } } class Phone { public synchronized void sendMs() throws InterruptedException { TimeUnit.SECONDS.sleep(4); System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } public void hello() { System.out.println("hello"); } }
输出结果为
hello
发短信
原因:hello是一个普通方法,不受synchronized锁的影响,不用等待锁的释放
如果我们使用的是两个对象,一个调用发短信,一个调用打电话,那么整个顺序是怎么样的呢?
public class dome01 { public static void main(String[] args) throws InterruptedException { Phone phone1 = new Phone(); Phone phone2 = new Phone(); new Thread(() -> { try { phone1.sendMs(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { phone2.call(); }).start(); } } class Phone { public synchronized void sendMs() throws InterruptedException { TimeUnit.SECONDS.sleep(4); System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } public void hello() { System.out.println("hello"); } }
输出结果
打电话
发短信
原因:两个对象两把锁,不会出现等待的情况,发短信睡了4s,所以先执行打电话
6.如果我们把synchronized的方法加上static变成静态方法!那么顺序又是怎么样的呢?
我们先来使用一个对象调用两个方法!
答案是:先发短信,后打电话
如果我们使用两个对象调用两个方法!
答案是:还是先发短信,后打电话
原因是什么呢? 为什么加了static就始终前面一个对象先执行呢!为什么后面会等待呢?
原因是:对于static静态方法来说,对于整个类Class来说只有一份,对于不同的对象使用的是同一份方法,相当于这个方法是属于这个类的,如果静态static方法使用synchronized锁定,那么这个synchronized锁会锁住整个对象!不管多少个对象,对于静态的锁都只有一把锁,谁先拿到这个锁就先执行,其他的进程都需要等待!
如果我们使用一个静态同步方法、一个同步方法、一个对象调用顺序是什么?
public class dome01 { public static void main(String[] args) throws InterruptedException { Phone phone1 = new Phone(); // Phone phone2 = new Phone(); new Thread(() -> { try { phone1.sendMs(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { phone1.call(); }).start(); } } class Phone { public static synchronized void sendMs() throws InterruptedException { TimeUnit.SECONDS.sleep(4); System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } public void hello() { System.out.println("hello"); } }
输出结果
打电话
发短信
原因:因为一个锁的是Class类的模板,一个锁的是对象的调用者。所以不存在等待,直接运行。
如果我们使用一个静态同步方法、一个同步方法、两个对象调用顺序是什么?
public class dome01 { public static void main(String[] args) throws InterruptedException { Phone phone1 = new Phone(); Phone phone2 = new Phone(); new Thread(() -> { try { phone1.sendMs(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { phone2.call(); }).start(); } } class Phone { public static synchronized void sendMs() throws InterruptedException { TimeUnit.SECONDS.sleep(4); System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } public void hello() { System.out.println("hello"); } }
输出结果
打电话
发短信
原因:两把锁,锁的不是同一个东西
小结:
new 出来的 this 是具体的一个对象
static Class 是唯一的一个模板
6 集合不安全
6.1 List 不安全
// java.util.ConcurrentModificationException 并发修改异常!
// jdk-11, 无法重新此报错
public class ListTest {
public static void main(String[] args) {
List<Object> arrayList = new ArrayList<>();
for(int i=1;i<=10;i++){
new Thread(()->{
arrayList.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(arrayList);
},String.valueOf(i)).start();
}
}
}
会导致 java.util.ConcurrentModificationException 并发修改异常!
ArrayList 在并发情况下是不安全的
解决方案:
public class ListTest {
public static void main(String[] args) {
/**
* 解决方案
* 1. List<String> list = new Vector<>();
* 2. List<String> list = Collections.synchronizedList(new ArrayList<>());
* 3. List<String> list = new CopyOnWriteArrayList<>();
*/
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 1; i <=10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
CopyOnWriteArrayList:写入时复制! COW 计算机程序设计领域的一种优化策略
- 核心思想是,如果有多个调用者(Callers)同时要求相同的资源(如内存或者是磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。这过程对其他的调用者都是透明的(transparently)。此做法主要的优点是如果调用者没有修改资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。
- 读的时候不需要加锁,如果读的时候有多个线程正在向CopyOnWriteArrayList添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的CopyOnWriteArrayList。
- 多个线程调用的时候,list,读取的时候,固定的,写入(存在覆盖操作);在写入的时候避免覆盖,造成数据错乱的问题;
CopyOnWriteArrayList比Vector厉害在哪里?
Vector底层是使用synchronized关键字来实现的:效率特别低下。
CopyOnWriteArrayList使用的是Lock锁,效率会更加高效!
6.2 Set 不安全
Set和List同理可得: 多线程情况下,普通的Set集合是线程不安全的;
解决方案还是两种:
- 使用Collections工具类的synchronized包装的Set类
- 使用CopyOnWriteArraySet 写入复制的JUC解决方案
public class SetTest {
public static void main(String[] args) {
/**
* 1. Set<String> set = Collections.synchronizedSet(new HashSet<>());
* 2. Set<String> set = new CopyOnWriteArraySet<>();
*/
// Set<String> set = new HashSet<>();
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 30; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
HashSet底层是什么?
hashSet底层就是一个HashMap;
6.3 Map不安全
//map 是这样用的吗? 不是,工作中不使用这个
//默认等价什么? new HashMap<>(16,0.75);
Map<String, String> map = new HashMap<>();
//加载因子、初始化容量
默认加载因子是0.75,默认的初始容量是16
只有保证length的长度是2^n
的话,才可以实现利用取模运算,计算哈希。而位运算(&)效率要比代替取模运算(%)高很多,主要原因是位运算直接对内存数据进行操作,不需要转成十进制,因此处理速度非常快。
// 使用位运算计算哈希值
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
同样的HashMap基础类也存在并发修改异常!
public class MapTest {
public static void main(String[] args) {
//map 是这样用的吗? 不是,工作中不使用这个
//默认等价什么? new HashMap<>(16,0.75);
/**
* 解决方案
* 1. Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
* Map<String, String> map = new ConcurrentHashMap<>();
*/
Map<String, String> map = new ConcurrentHashMap<>();
//加载因子、初始化容量
for (int i = 1; i < 100; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
7 Callable
- 可以有返回值;
- 可以抛出异常;
- 方法不同,run()/call()
package com.pnca.kaungshen.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
for (int i = 0; i < 10; i++) {
MyThread myThread = new MyThread();
FutureTask<Integer> futureTask = new FutureTask<>(myThread);
// 放入 Thread 中,使用, 结果会被缓存
new Thread(futureTask, String.valueOf(i)).start();
// 这个 Get 方法可能会被阻塞,
// 如果在call方法中是一个耗时的方法,
// 所以一般情况我们会把这个放在最后,或者使用异步通信
Integer a = futureTask.get();
System.out.println("返回值: " + a);
}
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("call()");
return 1024;
}
}
8 常用的辅助类
8.1 CountDownLatch
package com.pnca.kaungshen.utils;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 总数是6
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " ==> Go Out");
countDownLatch.countDown(); // 每个线程都数量减 1
}, String.valueOf(i)).start();
}
countDownLatch.await(); // 等待计数器归零, 然后向下执行
System.out.println("close door");
}
}
主要方法:
- countDown 减一操作;
- await 等待计数器归零
await 等待计数器归零,就唤醒,再继续向下运行
8.2 CyclickBarrier
package com.pnca.kaungshen.utils;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 主线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> System.out.println("召唤神龙成功"));
for (int i = 1; i <= 7; i++) {
// 子线程
int finalI = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "收集了第 " + finalI + " 颗龙珠!");
try {
cyclicBarrier.await(); // 加法计数 等待
} catch (BrokenBarrierException | InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
8.3 Semaphore
package com.pnca.kaungshen.utils;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
// 线程数量, 停车位, 限流
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i <= 6; i++) {
new Thread(() -> {
// acquire() 得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "强到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // release 释放
}
}).start();
}
}
}
原理:
semaphore.acquire()获得资源,如果资源已经使用完了,就等待资源释放后再进行使用!
semaphore.release()释放,会将当前的信号量释放+1,然后唤醒等待的线程!
作用: 多个共享资源互斥的使用! 并发限流,控制最大的线程数!
9 读写锁
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
int num = 6;
for (int i = 1; i <= num; i++) {
int finalI = i;
new Thread(() -> {
myCache.write(String.valueOf(finalI), String.valueOf(finalI));
},String.valueOf(i)).start();
}
for (int i = 1; i <= num; i++) {
int finalI = i;
new Thread(() -> {
myCache.read(String.valueOf(finalI));
},String.valueOf(i)).start();
}
}
}
/**
* 方法未加锁,导致写的时候被插队
*/
class MyCache {
private volatile Map<String, String> map = new HashMap<>();
public void write(String key, String value) {
System.out.println(Thread.currentThread().getName() + "线程开始写入");
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "线程写入ok");
}
public void read(String key) {
System.out.println(Thread.currentThread().getName() + "线程开始读取");
map.get(key);
System.out.println(Thread.currentThread().getName() + "线程写读取ok");
}
}
6线程开始读取
3线程开始读取
2线程开始读取
6线程写读取ok
5线程开始读取
5线程开始写入
2线程开始写入
4线程开始写入
4线程开始读取
4线程写读取ok
1线程开始读取
5线程写读取ok
3线程写读取ok
3线程开始写入
3线程写入ok
1线程开始写入
2线程写读取ok
6线程开始写入
1线程写入ok
1线程写读取ok
2线程写入ok
4线程写入ok
5线程写入ok
6线程写入ok
所以如果我们不加锁的情况,多线程的读写会造成数据不可靠的问题。
我们也可以采用synchronized这种重量锁和轻量锁 lock去保证数据的可靠。
但是这次我们采用更细粒度的锁:ReadWriteLock 读写锁来保证
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache2 myCache = new MyCache2();
int num = 6;
for (int i = 1; i <= num; i++) {
int finalI = i;
new Thread(() -> {
myCache.write(String.valueOf(finalI), String.valueOf(finalI));
},String.valueOf(i)).start();
}
for (int i = 1; i <= num; i++) {
int finalI = i;
new Thread(() -> {
myCache.read(String.valueOf(finalI));
},String.valueOf(i)).start();
}
}
}
class MyCache2 {
private volatile Map<String, String> map = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void write(String key, String value) {
lock.writeLock().lock(); // 写锁
try {
System.out.println(Thread.currentThread().getName() + "线程开始写入");
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "线程写入ok");
}finally {
lock.writeLock().unlock(); // 释放写锁
}
}
public void read(String key) {
lock.readLock().lock(); // 读锁
try {
System.out.println(Thread.currentThread().getName() + "线程开始读取");
map.get(key);
System.out.println(Thread.currentThread().getName() + "线程写读取ok");
}finally {
lock.readLock().unlock(); // 释放读锁
}
}
}
1线程开始写入
1线程写入ok
4线程开始写入
4线程写入ok
2线程开始写入
2线程写入ok
5线程开始写入
5线程写入ok
6线程开始写入
6线程写入ok
3线程开始写入
3线程写入ok
2线程开始读取
1线程开始读取
6线程开始读取
4线程开始读取
2线程写读取ok
3线程开始读取
6线程写读取ok
5线程开始读取
4线程写读取ok
3线程写读取ok
1线程写读取ok
5线程写读取ok
10 阻塞队列
10.1 BlockQueue
是Collection的一个子类
什么情况下我们会使用阻塞队列
多线程并发处理、线程池
BlockingQueue 有四组api
方式 | 抛出异常 | 不会抛出异常,有返回值 | 阻塞,等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer | put | offer(timenum.timeUnit) |
移出 | remove | poll | take | poll(timenum,timeUnit) |
判断队首元素 | element | peek | - | - |
/**
* 抛出异常
*/
public static void test1(){
//需要初始化队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
//抛出异常:java.lang.IllegalStateException: Queue full
// System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//如果多移除一个
//这也会造成 java.util.NoSuchElementException 抛出异常
System.out.println(blockingQueue.remove());
}
=======================================================================================
/**
* 不抛出异常,有返回值
*/
public static void test2(){
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//添加 一个不能添加的元素 使用offer只会返回false 不会抛出异常
System.out.println(blockingQueue.offer("d"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//弹出 如果没有元素 只会返回null 不会抛出异常
System.out.println(blockingQueue.poll());
}
=======================================================================================
/**
* 等待 一直阻塞
*/
public static void test3() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//一直阻塞 不会返回
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
//如果队列已经满了, 再进去一个元素 这种情况会一直等待这个队列 什么时候有了位置再进去,程序不会停止
// blockingQueue.put("d");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//如果我们再来一个 这种情况也会等待,程序会一直运行 阻塞
System.out.println(blockingQueue.take());
}
=======================================================================================
/**
* 等待 超时阻塞
* 这种情况也会等待队列有位置 或者有产品 但是会超时结束
*/
public static void test4() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
System.out.println("开始等待");
blockingQueue.offer("d",2, TimeUnit.SECONDS); //超时时间2s 等待如果超过2s就结束等待
System.out.println("结束等待");
System.out.println("===========取值==================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println("开始等待");
blockingQueue.poll(2,TimeUnit.SECONDS); //超过两秒 我们就不要等待了
System.out.println("结束等待");
}
10.2 同步队列
同步队列 没有容量,也可以视为容量为1的队列;
进去一个元素,必须等待取出来之后,才能再往里面放入一个元素;
put方法 和 take方法;
Synchronized 和 其他的BlockingQueue 不一样 它不存储元素;
put了一个元素,就必须从里面先take出来,否则不能再put进去值!
并且SynchronousQueue 的take是使用了lock锁保证线程安全的。
public class SynchronousQueue {
public static void main(String[] args) {
BlockingQueue<String> synchronousQueue = new java.util.concurrent.SynchronousQueue<>();
// 往queue中添加元素
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "put 01");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName() + "put 02");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName() + "put 03");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 取出元素
new Thread(()-> {
try {
System.out.println(Thread.currentThread().getName() + "take" + synchronousQueue.take());
System.out.println(Thread.currentThread().getName() + "take" + synchronousQueue.take());
System.out.println(Thread.currentThread().getName() + "take" + synchronousQueue.take());
}catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
Thread-0put 01
Thread-0put 02
Thread-1take1
Thread-0put 03
Thread-1take2
Thread-1take3
11 线程池
线程池:三大方式、七大参数、四种拒绝策略
池化技术
程序的运行,本质:占用系统的资源!我们需要去优化资源的使用 ===> 池化技术
线程池、JDBC的连接池、内存池、对象池 等等。。。。
资源的创建、销毁十分消耗资源
池化技术:事先准备好一些资源,如果有人要用,就来我这里拿,用完之后还给我,以此来提高效率。
11.1 线程池的好处
降低资源的消耗;
提高响应的速度;
方便管理;
线程复用、可以控制最大并发数、管理线程;
11.2 线程池:三大方法
- ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 单个线程
- ExecutorService threadPool2 = Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小
- ExecutorService threadPool3 = Executors.newCachedThreadPool(); // 可伸缩的
//工具类 Executors 三大方法;
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
ExecutorService threadPool2 = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小
ExecutorService threadPool3 = Executors.newCachedThreadPool(); //可伸缩的
//线程池用完必须要关闭线程池
try {
for (int i = 1; i <=100 ; i++) {
//通过线程池创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+ " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
11.3 七大参数
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize, //最大的线程池大小
long keepAliveTime, //超时了没有人调用就会释放
TimeUnit unit, //超时单位
BlockingQueue<Runnable> workQueue, //阻塞队列
ThreadFactory threadFactory, //线程工厂 创建线程的 一般不用动
RejectedExecutionHandler handler //拒绝策略
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
阿里巴巴的Java操作手册中明确说明:对于Integer.MAX_VALUE初始值较大,所以一般情况我们要使用底层的ThreadPoolExecutor来创建线程池。
public class PollDemo {
public static void main(String[] args) {
// 获取cpu 的核数
int max = Runtime.getRuntime().availableProcessors();
ExecutorService service =new ThreadPoolExecutor(
2,
max,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
for (int i = 1; i <= 10; i++) {
service.execute(() -> {
System.out.println(Thread.currentThread().getName() + "ok");
});
}
}catch (Exception e) {
e.printStackTrace();
}
finally {
service.shutdown();
}
}
}
11.4 拒绝策略
new ThreadPoolExecutor.AbortPolicy(): //该拒绝策略为:银行满了,还有人进来,不处理这个人的,并抛出异常
超出最大承载,就会抛出异常:队列容量大小+maxPoolSize
new ThreadPoolExecutor.CallerRunsPolicy(): //该拒绝策略为:哪来的去哪里 main线程进行处理
new ThreadPoolExecutor.DiscardPolicy(): //该拒绝策略为:队列满了,丢掉异常,不会抛出异常。
new ThreadPoolExecutor.DiscardOldestPolicy(): //该拒绝策略为:队列满了,尝试去和最早的进程竞争,不会抛出异常
11.5 如何设置线程池的大小
1、CPU密集型:电脑的核数是几核就选择几;选择maximunPoolSize的大小
// 获取cpu 的核数
int max = Runtime.getRuntime().availableProcessors();
ExecutorService service =new ThreadPoolExecutor(
2,
max,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
2、I/O密集型:
在程序中有15个大型任务,io十分占用资源;I/O密集型就是判断我们程序中十分耗I/O的线程数量,大约是最大I/O数的一倍到两倍之间。
12 四大函数式接口
新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算
函数式接口:只有一个方法的接口
12.1 Function 函数型接口
public class FunctionDemo {
public static void main(String[] args) {
Function<String, String> function = (str) -> {return str;};
System.out.println(function.apply("aaaaaaaaaa"));
}
}
12.2 Predicate 断定型接口
public class PredicateDemo {
public static void main(String[] args) {
Predicate<String> predicate = (str) -> {return str.isEmpty();};
// false
System.out.println(predicate.test("aaa"));
// true
System.out.println(predicate.test(""));
}
}
12.3 Suppier 供给型接口
/**
* 供给型接口,只返回,不输入
*/
public class Demo4 {
public static void main(String[] args) {
Supplier<String> supplier = ()->{return "1024";};
System.out.println(supplier.get());
}
}
12.4 Consummer 消费型接口
/**
* 消费型接口 没有返回值!只有输入!
*/
public class Demo3 {
public static void main(String[] args) {
Consumer<String> consumer = (str)->{
System.out.println(str);
};
consumer.accept("abc");
}
}
13 Stream 流式计算
/**
* Description:
* 题目要求: 用一行代码实现
* 1. Id 必须是偶数
* 2.年龄必须大于23
* 3. 用户名转为大写
* 4. 用户名倒序
* 5. 只能输出一个用户
*
* @author jiaoqianjin
* Date: 2020/8/12 14:55
**/
public class StreamDemo {
public static void main(String[] args) {
User u1 = new User(1, "a", 23);
User u2 = new User(2, "b", 23);
User u3 = new User(3, "c", 23);
User u4 = new User(6, "d", 24);
User u5 = new User(4, "e", 25);
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
// lambda、链式编程、函数式接口、流式计算
list.stream()
.filter(user -> {return user.getId()%2 == 0;})
.filter(user -> {return user.getAge() > 23;})
.map(user -> {return user.getName().toUpperCase();})
.sorted((user1, user2) -> {return user2.compareTo(user1);})
.limit(1)
.forEach(System.out::println);
}
}
14 ForkJoin
ForkJoin 在JDK1.7,并行执行任务!提高效率~。在大数据量速率会更快!
大数据中:MapReduce 核心思想->把大任务拆分为小任务!
14.1 ForkJoin 特点: 工作窃取!
实现原理是:双端队列!从上面和下面都可以去拿到任务进行执行!\
14.2 如何使用ForkJoin?
通过ForkJoinPool来执行
计算任务 execute(ForkJoinTask<?> task)
计算类要去继承ForkJoinTask;
ForkJoin 的计算类
package com.pnca.kaungshen.forkjoin; import java.util.concurrent.RecursiveTask; /** * @author pncalbl * @date 2021/6/5 21:01 * @e-mail pncalbl@qq.com * @description **/ public class ForkJoinDemo extends RecursiveTask<Long> { private final Long start; private final Long end; public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { // 临界值 long temp = 1000000L; if ((end - start) < temp) { long sum = 0L; for (long i = start; i < end; i++) { sum += i; } return sum; } else { // 使用ForkJoin 分而治之 计算 // 1. 计算平均值 long middle = (start + end) / 2; ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(start, middle); // 2. 拆分任务,把任务压入线程队列 forkJoinDemo1.fork(); ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo(middle, end); forkJoinDemo2.fork(); long taskSum; taskSum = forkJoinDemo1.join() + forkJoinDemo2.join(); return taskSum; } } }
测试类
package com.pnca.kaungshen.forkjoin; import org.junit.Test; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; /** * @author pncalbl * @date 2021/6/5 21:09 * @e-mail pncalbl@qq.com * @description **/ public class ForkJoinTest { private static final long SUM = 20_0000_0000; /** * 使用普通方法 */ @Test public void test1() { long star = System.currentTimeMillis(); long sum = 0L; for (long i = 1; i < SUM; i++) { sum += i; } long end = System.currentTimeMillis(); System.out.println(sum); System.out.println("时间:" + (end - star) + " ms"); System.out.println("----------------------"); } /** * 使用ForkJoin 方法 */ @Test public void test2() throws ExecutionException, InterruptedException { long star = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L, SUM); ForkJoinTask<Long> submit = forkJoinPool.submit(task); Long along = submit.get(); System.out.println(along); long end = System.currentTimeMillis(); System.out.println("时间:" + (end - star) + " ms"); System.out.println("-----------"); } /** * 使用 Stream 流计算 */ @Test public void test3() { long star = System.currentTimeMillis(); long sum = LongStream.range(0L, SUM).parallel().reduce(0, Long::sum); System.out.println(sum); long end = System.currentTimeMillis(); System.out.println("时间:" + (end - star) + " ms"); System.out.println("-----------"); } }
.parallel().reduce(0, Long::sum)使用一个并行流去计算整个计算,提高效率。
15 异步回调
Future 设计的初衷:对将来的某个事件结果进行建模!
其实就是前端 –> 发送 ajax 异步请求给后端
但是我们平时都使用CompletableFuture
15.1 没有返回值的runAsync异步回调
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 发起一个请求
System.out.println(System.currentTimeMillis());
System.out.println("---------------------");
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
//发起一个异步任务
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ".....");
});
System.out.println(System.currentTimeMillis());
System.out.println("---------------------");
//输出执行结果
System.out.println(future.get()); //获取执行结果
}
}
15.2 有返回值的异步回调supplyAsync
//有返回值的异步回调
CompletableFuture<Integer> completableFuture=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
int i=1/0;
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1024;
});
System.out.println(completableFuture.whenComplete((t, u) -> {
//success 回调
System.out.println("t=>" + t); //正常的返回结果
System.out.println("u=>" + u); //抛出异常的 错误信息
}).exceptionally((e) -> {
//error回调
System.out.println(e.getMessage());
return 404;
}).get());
whenComplete: 有两个参数,一个是t 一个是u
T:是代表的 正常返回的结果;
U:是代表的 抛出异常的错误信息;
如果发生了异常,get可以获取到exceptionally返回的值;
16 JMM
16.1 对Volatile 的理解
Volatile 是 Java 虚拟机提供 轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排
如何实现可见性
volatile变量修饰的共享变量在进行写操作的时候回多出一行汇编:
0x01a3de1d:movb $0×0,0×1104800(%esi);0x01a3de24:lock addl $0×0,(%esp);
Lock前缀的指令在多核处理器下会引发两件事情。
- 将当前处理器缓存行的数据写回到系统内存。
- 这个写回内存的操作会使其他cpu里缓存了该内存地址的数据无效。
多处理器总线嗅探:
- 为了提高处理速度,处理器不直接和内存进行通信,而是先将系统内存的数据读到内部缓存后再进行操作,但操作不知道何时会写到内存。
- 如果对声明了volatile的变量进行写操作,JVM就会向处理器发送一条lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。
- 但是在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己的缓存值是不是过期了,如果处理器发现自己缓存行对应的内存地址被修改了,就会将当前处理器的缓存行设置无效状态,当处理器对这个数据进行修改操作的时候,会重新从系统内存中把数据库读到处理器缓存中。
16.2 什么是JMM?
JMM:JAVA内存模型,不存在的东西,是一个概念,也是一个约定!
关于JMM的一些同步的约定:
- 线程解锁前,必须把共享变量立刻刷回主存;
- 线程加锁前,必须读取主存中的最新值到工作内存中;
- 加锁和解锁是同一把锁;
线程中分为 工作内存、主内存
8种操作:
Read(读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用;
load(载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中;
Use(使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令;
assign(赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中;
store(存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用;
write(写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中;
lock(锁定):作用于主内存的变量,把一个变量标识为线程独占状态;
unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定;
JMM对这8种操作给了相应的规定:
不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write。
不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存。
不允许一个线程将没有assign的数据从工作内存同步回主内存。
一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是对变量实施use、store操作之前,必须经过assign和load操作。
一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁。lock和unlock必须成对出现。
如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值。
如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量。
对一个变量进行unlock操作之前,必须把此变量同步回主内存。
遇到问题:程序不知道主存中的值已经被修改过了!;
17 volatile
17.1 保证可见性
public class JMMDemo01 {
// 如果不加volatile 程序会死循环
// 加了volatile是可以保证可见性的
private volatile static Integer number = 0;
public static void main(String[] args) {
//main线程
//子线程1
new Thread(()->{
while (number==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//子线程2
new Thread(()->{
while (number==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
number=1;
System.out.println(number);
}
}
17.2 不保证原子性
原子性:不可分割;
线程A在执行任务的时候,不能被打扰的,也不能被分割的,要么同时成功,要么同时失败。
/**
* 不保证原子性
* number <=2w
*
*/
public class VDemo02 {
private static volatile int number = 0;
public static void add(){
number++;
//++ 不是一个原子性操作,是两个~3个操作
//
}
public static void main(String[] args) {
//理论上number === 20000
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 1; j <= 1000 ; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){
//main gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+",num="+number);
}
}
如果不加lock和synchronized ,怎么样保证原子性?
使用原子类
public class VDemo02 {
private static volatile AtomicInteger number = new AtomicInteger();
public static void add(){
// number++;
number.incrementAndGet(); //底层是CAS保证的原子性
}
public static void main(String[] args) {
//理论上number === 20000
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 1; j <= 1000 ; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){
//main gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+",num="+number);
}
}
这些类的底层都直接和操作系统挂钩!是在内存中修改值。
Unsafe类是一个很特殊的存在;
原子类为什么这么高级?
17.3 禁止指令重排
什么是指令重排?
我们写的程序,计算机并不是按照我们自己写的那样去执行的
源代码–>编译器优化重排–>指令并行也可能会重排–>内存系统也会重排–>执行
处理器在进行指令重排的时候,会考虑数据之间的依赖性!
int x=1; //1
int y=2; //2
x=x+5; //3
y=x*x; //4
//我们期望的执行顺序是 1_2_3_4 可能执行的顺序会变成2134 1324
//可不可能是 4123? 不可能的
1234567
可能造成的影响结果:前提:a b x y这四个值 默认都是0
线程A | 线程B |
---|---|
x=a | y=b |
b=1 | a=2 |
正常的结果: x = 0; y =0;
线程A | 线程B |
---|---|
b=1 | a=2 |
x=a | y=b |
可能在线程A中会出现,先执行b=1,然后再执行x=a;
在B线程中可能会出现,先执行a=2,然后执行y=b;
那么就有可能结果如下:x=2; y=1.
volatile可以避免指令重排:
volatile中会加一道内存的屏障,这个内存屏障可以保证在这个屏障中的指令顺序。
内存屏障:CPU指令。作用:
保证特定的操作的执行顺序;
可以保证某些变量的内存可见性(利用这些特性,就可以保证volatile实现的可见性)
17.4 总结
- volatile可以保证可见性;
- 不能保证原子性
- 由于内存屏障,可以保证避免指令重排的现象产生
面试官:那么你知道在哪里用这个内存屏障用得最多呢?单例模式
18 玩转单例模式
饿汉式、DCL懒汉式
18.1 饿汉式
/**
* 饿汉式单例
*/
public class Hungry {
/**
* 可能会浪费空间
*/
private byte[] data1=new byte[1024*1024];
private byte[] data2=new byte[1024*1024];
private byte[] data3=new byte[1024*1024];
private byte[] data4=new byte[1024*1024];
private Hungry(){
}
private final static Hungry hungry = new Hungry();
public static Hungry getInstance(){
return hungry;
}
}
18.2 DCL懒汉式
//懒汉式单例模式
public class LazyMan {
private static boolean key = false;
private LazyMan(){
synchronized (LazyMan.class){
if (key==false){
key=true;
}
else{
throw new RuntimeException("不要试图使用反射破坏异常");
}
}
System.out.println(Thread.currentThread().getName()+" ok");
}
private volatile static LazyMan lazyMan;
//双重检测锁模式 简称DCL懒汉式
public static LazyMan getInstance(){
//需要加锁
if(lazyMan==null){
synchronized (LazyMan.class){
if(lazyMan==null){
lazyMan=new LazyMan();
/**
* 1、分配内存空间
* 2、执行构造方法,初始化对象
* 3、把这个对象指向这个空间
*
* 就有可能出现指令重排问题
* 比如执行的顺序是1 3 2 等
* 我们就可以添加volatile保证指令重排问题
*/
}
}
}
return lazyMan;
}
//单线程下 是ok的
//但是如果是并发的
public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchFieldException {
//Java中有反射
// LazyMan instance = LazyMan.getInstance();
Field key = LazyMan.class.getDeclaredField("key");
key.setAccessible(true);
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true); //无视了私有的构造器
LazyMan lazyMan1 = declaredConstructor.newInstance();
key.set(lazyMan1,false);
LazyMan instance = declaredConstructor.newInstance();
System.out.println(instance);
System.out.println(lazyMan1);
System.out.println(instance == lazyMan1);
}
}
18.3 静态内部类
//静态内部类
public class Holder {
private Holder(){
}
public static Holder getInstance(){
return InnerClass.holder;
}
public static class InnerClass{
private static final Holder holder = new Holder();
}
}
单例不安全, 因为反射
18.4 枚举
//enum 是什么? enum本身就是一个Class 类
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
EnumSingle instance1 = EnumSingle.INSTANCE;
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);
declaredConstructor.setAccessible(true);
//java.lang.NoSuchMethodException: com.ogj.single.EnumSingle.<init>()
EnumSingle instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
使用枚举,我们就可以防止反射破坏了。
public final class EnumSingle extends Enum
{
public static EnumSingle[] values()
{
return (EnumSingle[])$VALUES.clone();
}
public static EnumSingle valueOf(String name)
{
return (EnumSingle)Enum.valueOf(com/ogj/single/EnumSingle, name);
}
private EnumSingle(String s, int i)
{
super(s, i);
}
public EnumSingle getInstance()
{
return INSTANCE;
}
public static final EnumSingle INSTANCE;
private static final EnumSingle $VALUES[];
static
{
INSTANCE = new EnumSingle("INSTANCE", 0);
$VALUES = (new EnumSingle[] {
INSTANCE
});
}
}
19 深入理解CAS
19.1 什么是CAS?
大厂必须深入研究底层!!!!修内功!操作系统、计算机网络原理、组成原理、数据结构
public class CasDemo {
//CAS : compareAndSet 比较并交换
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//boolean compareAndSet(int expect, int update)
//期望值、更新值
//如果实际值 和 我的期望值相同,那么就更新
//如果实际值 和 我的期望值不同,那么就不更新
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
//因为期望值是2020 实际值却变成了2021 所以会修改失败
//CAS 是CPU的并发原语
atomicInteger.getAndIncrement(); //++操作
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
}
}
Unsafe 类
19.2 总结
CAS:比较当前工作内存中的值 和 主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环,使用的是自旋锁。
缺点:
- 循环会耗时;
- 一次性只能保证一个共享变量的原子性;
- 它会存在ABA问题
CAS:ABA问题?(狸猫换太子)
线程1:期望值是1,要变成2;
线程2:两个操作:
- 1、期望值是1,变成3
- 2、期望是3,变成1
所以对于线程1来说,A的值还是1,所以就出现了问题,骗过了线程1;
public class casDemo {
//CAS : compareAndSet 比较并交换
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
//boolean compareAndSet(int expect, int update)
//期望值、更新值
//如果实际值 和 我的期望值相同,那么就更新
//如果实际值 和 我的期望值不同,那么就不更新
System.out.println(atomicInteger.compareAndSet(2021, 2020));
System.out.println(atomicInteger.get());
//因为期望值是2020 实际值却变成了2021 所以会修改失败
//CAS 是CPU的并发原语
// atomicInteger.getAndIncrement(); //++操作
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
}
}
20 原子引用
解决ABA问题,对应的思想:就是使用了乐观锁~
带版本号的 原子操作!
Integer 使用了对象缓存机制,默认范围是-128~127,推荐使用静态工厂方法valueOf获取对象实例,而不是new,因为valueOf使用缓存,而new一定会创建新的对象分配新的内存空间。
带版本号的原子操作
package com.marchsoft.lockdemo;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;
public class CASDemo {
/**AtomicStampedReference 注意,如果泛型是一个包装类,注意对象的引用问题
* 正常在业务操作,这里面比较的都是一个个对象
*/
static AtomicStampedReference<Integer> atomicStampedReference = new
AtomicStampedReference<>(1, 1);
// CAS compareAndSet : 比较并交换!
public static void main(String[] args) {
new Thread(() -> {
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("a1=>" + stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 修改操作时,版本号更新 + 1
atomicStampedReference.compareAndSet(1, 2,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1);
System.out.println("a2=>" + atomicStampedReference.getStamp());
// 重新把值改回去, 版本号更新 + 1
System.out.println(atomicStampedReference.compareAndSet(2, 1,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1));
System.out.println("a3=>" + atomicStampedReference.getStamp());
}, "a").start();
// 乐观锁的原理相同!
new Thread(() -> {
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("b1=>" + stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicStampedReference.compareAndSet(1, 3,
stamp, stamp + 1));
System.out.println("b2=>" + atomicStampedReference.getStamp());
}, "b").start();
}
}
21 各种锁的理解
21.1 公平锁,非公平锁
公平锁:非常公平,不能插队,必须先来后到
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); }
非公平锁:非常不公平,允许插队,可以改变顺序
/** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
21.2 可重入锁
Synchonized 锁
public class Demo01 { public static void main(String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.sms(); },"A").start(); new Thread(()->{ phone.sms(); },"B").start(); } } class Phone{ public synchronized void sms(){ System.out.println(Thread.currentThread().getName()+"=> sms"); call();//这里也有一把锁 } public synchronized void call(){ System.out.println(Thread.currentThread().getName()+"=> call"); } }
Lock 锁
//lock public class Demo02 { public static void main(String[] args) { Phone2 phone = new Phone2(); new Thread(()->{ phone.sms(); },"A").start(); new Thread(()->{ phone.sms(); },"B").start(); } } class Phone2{ Lock lock=new ReentrantLock(); public void sms(){ lock.lock(); //细节:这个是两把锁,两个钥匙 //lock锁必须配对,否则就会死锁在里面 try { System.out.println(Thread.currentThread().getName()+"=> sms"); call();//这里也有一把锁 } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } public void call(){ lock.lock(); try { System.out.println(Thread.currentThread().getName() + "=> call"); }catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } } }
- lock锁必须配对,相当于lock和 unlock 必须数量相同;
- 在外面加的锁,也可以在里面解锁;在里面加的锁,在外面也可以解锁;
21.3 自旋锁
spinlock
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
自我设计自旋锁
public class SpinlockDemo { // 默认 // int 0 //thread null AtomicReference<Thread> atomicReference=new AtomicReference<>(); //加锁 public void myLock(){ Thread thread = Thread.currentThread(); System.out.println(thread.getName()+"===> mylock"); //自旋锁 while (!atomicReference.compareAndSet(null,thread)){ System.out.println(Thread.currentThread().getName()+" ==> 自旋中~"); } } //解锁 public void myUnlock(){ Thread thread=Thread.currentThread(); System.out.println(thread.getName()+"===> myUnlock"); atomicReference.compareAndSet(thread,null); } }
public class TestSpinLock { public static void main(String[] args) throws InterruptedException { ReentrantLock reentrantLock = new ReentrantLock(); reentrantLock.lock(); reentrantLock.unlock(); //使用CAS实现自旋锁 SpinlockDemo spinlockDemo=new SpinlockDemo(); new Thread(()->{ spinlockDemo.myLock(); try { TimeUnit.SECONDS.sleep(3); } catch (Exception e) { e.printStackTrace(); } finally { spinlockDemo.myUnlock(); } },"t1").start(); TimeUnit.SECONDS.sleep(1); new Thread(()->{ spinlockDemo.myLock(); try { TimeUnit.SECONDS.sleep(3); } catch (Exception e) { e.printStackTrace(); } finally { spinlockDemo.myUnlock(); } },"t2").start(); } }
运行结果:
t2进程必须等待t1进程Unlock后,才能Unlock,在这之前进行自旋等待。。
21.4 死锁
package com.ogj.lock;
import java.util.concurrent.TimeUnit;
public class DeadLock {
public static void main(String[] args) {
String lockA= "lockA";
String lockB= "lockB";
new Thread(new MyThread(lockA,lockB),"t1").start();
new Thread(new MyThread(lockB,lockA),"t2").start();
}
}
class MyThread implements Runnable{
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName()+" lock"+lockA+"===>get"+lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName()+" lock"+lockB+"===>get"+lockA);
}
}
}
}
如何解开死锁
使用jps定位进程号,jdk的bin目录下: 有一个jps
命令:
jps -l
使用
jstack
进程进程号 找到死锁信息一般情况信息在最后:
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!