Java多线程学习

线程的创建

先不扯淡什么鬼的理论,先把多线程给跑起来再说了。
通常认为实现多线程有两种方式(其实用三种实现方式,暂不讨论)
第一种是直接继承Thread
直接上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package pw.gouzai.main.Interview.multithreading;

public class MultithreadingDemo_Thread extends Thread{

@Override
public void run() {
// TODO Auto-generated method stub
for (int i = 0; i < 5; i++) {
System.err.println(Thread.currentThread().getName() + "-->"+i);
}
}

public static void main(String[] args) {
MultithreadingDemo_Thread demo = new MultithreadingDemo_Thread();
MultithreadingDemo_Thread demo1 = new MultithreadingDemo_Thread();
demo.start();
demo1.start();
}

}

先把demo跑起来再说。
结果如下:

1
2
3
4
5
6
7
8
9
10
Thread-0-->0
Thread-1-->0
Thread-0-->1
Thread-0-->2
Thread-1-->1
Thread-0-->3
Thread-1-->2
Thread-1-->3
Thread-1-->4
Thread-0-->4

另一种是现实Runnable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MultithreadingDemo_Runnable implements Runnable{

public void run() {
// TODO Auto-generated method stub
for (int i = 0; i < 5; i++) {
System.err.println(Thread.currentThread().getName() + "-->"+i);
}
}

public static void main(String[] args) {
MultithreadingDemo_Runnable runnable = new MultithreadingDemo_Runnable();
Thread t1 = new Thread(runnable);
Thread t2 = new Thread(runnable);

t1.start();
t2.start();
}

}

1
2
3
4
5
6
7
8
9
10
Thread-1-->0
Thread-1-->1
Thread-0-->0
Thread-1-->2
Thread-0-->1
Thread-0-->2
Thread-0-->3
Thread-0-->4
Thread-1-->3
Thread-1-->4

通过上面的代码可以发现,他们不同的线程是在同时运行,在抢夺资源进行输出。谁先抢到cpu,就谁能够打印i。
从以上两份不同的代码中可以发现,其实多线程以下的特点:
1.多线程运行的代码是同样的代码
2.多线程中的一方抢到cpu,另一方就必须等待。但是这种等待并不是下一次就是你了。
3.执行的都是Run的方法
4.不是直接运行.run()方法,这是为什么呢?

以上两种实现多线程的优缺点:
Java是单继承的,所以基本都是实现实现Rnnable接口为主

线程状态

说完多线程的常用实现方法,那就先简简单单谈一下,线程有哪几种状态。
直接搬运dalao的https://www.cnblogs.com/yjd_hycf_space/p/7526608.html
1.新建状态(new) : 新创建了一个线程对象
2.就绪状态(Runnable) : 线程对象创建后,其他线程调用了该线程的start()方法。该状态的线程于可运行线程池中,变得可运行,等待获取CPU的使用权。
3.运行状态(Runing) : 就绪状态的线程获取CPU,执行程序代码
4.阻塞状态(Blocked) : 阻塞状态是线程因为某种原因放弃CPU的使用权,暂停停止运行。直到线程进入就绪状态,才有机会转到运行状态。阻塞状态分为三种:
(一)、等待阻塞:运行的线程执行wait方法,JVM会把该呈现放入等待池中。(wait会释放持有的锁)
(二)、同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池中。
(三)、其他阻塞:运行的线程执行sleep()或者join()方法,或者放出了I/O请求时,JVM会把该线程设置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。(注意,sleep是不会释放持有的锁)
5.死亡状态(Dead): 线程执行完了或者因异常退出了run()方法,该线程结束生命周期。

图:略。有空再画了

特殊的多线程 || 带有返回值的多线程

带有返回结果的多线程Callable
跟Runnable实现方式差不多

test.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> ft = new FutureTask<>(new MyCallable());
Thread thread = new Thread(ft);
thread.start();
Integer integer = ft.get();//获取返回结果
System.err.println(integer);
System.err.println(123456);
}
}

class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
for (int i = 0; i < 10; i++) {
System.err.println(Thread.currentThread().getName()+":"+(i+1));
}
return 123;
}
}

线程池的使用

test.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class test {
public static void main(String[] args) {
/**
* 线程池管理(管理多个异步任务的执行,无需程序员显示的管理现成的生命周期。这里的异步是指多个任务的执行互不干扰。不需要进行同步操作)
* Executors为线程工厂,主要有三种子类
* CachedThreadPool:一个任务创建一个线程
* FixedThreadPool:所有任务只能使用固定大小的线程
* SingleThreadExecutor:相当于大小为1的FixedThreadPool
*/
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(new MyRunnable());
}
executorService.shutdown();//停止接收新任务,原来的任务继续执行。//shutdownNow() 停止接收新任务,原来的任务停止执行。

}
}

class MyRunnable implements Runnable {
@Override
public void run() {
System.err.println(Thread.currentThread().getName());
}
}

线程的常用方法

线程的常用方法:

  1. Thread.currentThread() 获取当前运行的线程
  2. setPriority() 设置线程的优先级(线程的优先级分别为1-10,默认的为5,权重越大,越容易抢到CPU(相对而言,即使你把他设置为10也不能一定保证它能够抢到CPU的使用权))
  3. sleep 线程休眠,参数线程休眠时间(毫秒)。使线程转到阻塞状态,当休眠结束后,转成就绪状态。
  4. Object类中的wait()方法,导致线程等待,知道其他线程调用此对象的notify()方法或者是notifyAll()唤醒方法。(这两个唤醒也是Object的方法)
    5.yield() 暂停当前正在执行的线程对象,把执行机会让给相同或者更高优先级的线程。
    6.join() 等待其他线程终止。在当前线程中调用另一个线程的join()方法,则当前线程转入阻塞状态,直到另一个线程运行结束,当前线程再由阻塞转成就绪状态。
  5. setDaemon(true) 设置线程为守护线程。当所有非守护线程结束时,程序也就终止,同事会杀死所有守护线程。

线程的中断

普通线程的中断

通过调用一个线程的interrupt() 方法来中断线程,如果该线程处于阻塞、限期等待或者和无限期等待状态,那么就会抛出InterrupedException异常,从而提前结束该线程,不执行之后的语句。但是不能中断I/O阻塞和synchronized 锁阻塞。
例如:
test.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class test {
public static void main(String[] args) {
MyThread myThread = new MyThread();
myThread.start();
myThread.interrupt();
System.err.println("Main run");
}
}

class MyThread extends Thread {
@Override
public void run() {
try {
Thread.sleep(2000);
System.err.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

1
2
3
4
Main run
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at pw.gouzai.MyThread.run(test.java:18)

若果一个线程的run() 方法执行一个而无线循环,并且没有执行sleep() 等会抛出InterrupedException 的操作,那么调用线程的interrupt() 方法就无法使线程提前结束。
但是调用interrupt() 方法会设置线程的中断标记,此时调用interrupted() 方法会返回true。因此可以在循环体中使用interrupted() 方法判断线程是否处于中断状态,从而提前结束线程。
test.java

1
2
3
4
5
6
7
8
9
class MyThread extends Thread {
@Override
public void run() {
while (!interrupted()){
System.err.println(new Random().nextInt(100));
}
System.err.println(Thread.currentThread().getName());
}
}

线程池的中断

调用Executor 的shutdown() 方法会等待线程都执行完毕之后再关闭,但是如果调用的是 shutdownNow()方法,则相当于调用每个现成的interrupt()方法中断。
test.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class test {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute( () -> {
try {
Thread.sleep(2000);
System.err.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdownNow();
}
}

1
2
3
4
5
6
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at pw.gouzai.test.lambda$main$0(test.java:12)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

如果只想中断Executor中的一个线程,可以通过submit() 方法来提交一个线程,它会返回一个Future<?>对象,通过调用该对象的cancel(true)方法皆可以中断线程。
test.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class test {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<?> future = executorService.submit(() -> {
try {
Thread.sleep(2000);
System.err.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdownNow();
future.cancel(true);
}
}

Java 互斥同步

synchronized 关键字

synchronized 的作用域有两种:

某个对象实例内: synchronized method 防范可以防止多个线程同时访问这个对象的synchronized 方法(如果一个对象有多个synchronized方法,只要一个线程访问其中的一个synchronized方法,其他线程不能同时访问这个对象中任何一个synchronized方法)。这时,不同的对象实例的synchronized方法是不相干扰的。也就是说,其他线程照样可以访问相同类的另一个对象实例中的synchronized方法。

某个类的范围,synchronized static method 防止多个线程同时访问这个类中的synchronized static 方法。他是可以对类的所有对象实例起作用。

synchronized 除了可以修饰方法,还可以作用于方法中的某个区块,表示这个区块的资源实行互斥访问。synchronized(this){System.out.println(666);}.作用于同一个对象,如果调用两个对象的同步代码块,则不会进行同步操作.

另外,synchronized 关键字是不能继承的,也就是说,基类的方法synchronized method 在继承中并不是 synchronized 方法,而是变成同步方法。若需要synchronized 修饰,在继承类的时候显式的指定它的某个方法为synchronized方法.

ps:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//同步一个代码块
public void function(){
synchronized(this){

}
}

//同步一个方法
public synchronized void function(){
}

//同步一个类
public void function(){
synchronized(test.class){

}
}

//同步一个静态方法
public synchronized static void function(){
}

同步一个代码块跟同一个方法(不是静态方法),作用于同一个对象上。
同步整个类,也就是说两个线程调用同一个类的不同对象上的这种同步语句,也会进行同步。
同步静态方法,作用于整个类。

ReentrantLock

ReentrantLock 是Java.util.concurrent包下的锁。
test.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class test {
public static void main(String[] args) {
LockExample lockExample = new LockExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> lockExample.function());
executorService.execute(() -> lockExample.function());
executorService.execute(() -> lockExample.function());
}
}

class LockExample {
private Lock lock = new ReentrantLock();

public void function(){
lock.lock();
try{
for (int i = 0; i < 10; i++) {
System.err.println(i+"\t");
}
} finally {
lock.unlock();
}
}
}

多线程协作

join

在线程总调用另外一个线程的join方法,会将当前线程挂起,而不是忙等待,知道目标线程结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class test {
private class A extends Thread{
@Override
public void run() {
System.err.println("A");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private class B extends Thread{
private A a;

B(A a){
this.a = a;
}

@Override
public void run() {
try {
a.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("B");
}
}

public void test() {
A a = new A();
B b = new B(a);
b.start();
a.start();
}

public static void main(String[] args) {
test t = new test();
t.test();
}
}

wait() notify() notifyAll()

这三个方法都是属于Object的方法,不属于Thread的方法。
调用wait()使线程等待某个条件满足,线程在等待时会被挂起。当其他线程的运行使这个条件满足时,其他线程会调用notify()或者notifyAll()来唤醒挂起的线程。
只能用在同步方法或者是同步控制块中使用,否则会在运行时抛出IllegalMonitorStateException。
使用wait()挂起期间,线程会释放锁。这是因为,若果没有释放锁,那么其他线程就无法进入对象的同步方法或者同步控制块中。那么就无法执行notify() 或者notifyAll()来唤醒挂起的线程,造成死锁。
WaitNotifyExample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class WaitNotifyExample {
public synchronized void before(){
System.err.println("before");
notifyAll();
}

public synchronized void after(){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("after");
}

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
WaitNotifyExample example = new WaitNotifyExample();
executorService.execute(() -> example.after());
try {
//确保xeample.after()先执行,否则没线程唤醒他了,会一在等待唤醒,处于一直阻塞状态中。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.execute(() ->example.before());
}
}

wait() 和Thread.sleep()的区别
> wait() 是Object的方法,而sleep()是Thrad的静态方法;
> wait() 会释放锁,sleep() 不会释放锁。(除非产生异常或者是时间到了);

### await() signal() signalAll()
java.util.concurrent 类库中提供了Condition类来实现线程之间的协调,可以在Condition上调用await()方法使线程等待,其他线程调用signal()或者signalAll()方法唤醒等待的线程。
相对于wait()这种等待方式,await()可以指定等待的条件,更加灵活。

AwaitSignalExample.java
```java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AwaitSignalExample {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

public void before(){
lock.lock();
try {
System.out.println("before");
condition.signalAll();
} finally {
lock.unlock();
}
}

public void after(){
lock.lock();
try {
condition.await();
System.out.println("after");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
AwaitSignalExample example = new AwaitSignalExample();
executorService.execute(() -> example.after());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.execute(() -> example.before());
}
}

CountDownLatch

用来控制一个线程等待多个线程。

Method

countDown()通知当前线程已经处理完毕,每次调用countDown()方法会让计数器的值-1,减到0的时候。那些因为调用await()方法而等待的线程就会被唤醒
await()等待线程执行完毕;
CountDownlatchExample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExample {
/**
* 适用于场景:列车(事件)检查,多个工作人员(线程)进行列车检查(执行方法)。检查OK了才可以启动(线程执行完毕才可以执行其他的处理)
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final int totalThread = 10;
CountDownLatch countDownLatch = new CountDownLatch(totalThread);
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.err.println("run...");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.err.println("end");
executorService.shutdown();
}
}

1
2
3
4
5
6
7
8
9
10
11
run...
run...
run...
run...
run...
run...
run...
run...
run...
run...
end

CyclicBarrier

用来控制多线程互相等待,只有当多个线程都达到时(执行到await()),这些线程才会继续执行(await()方法之后)。

CyclicBarrierExample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierExample {
/**
* 适用场景:一群人出去玩。等所有人到齐了再次出发。
* @param args
*/
public static void main(String[] args) {
final int totalThread = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() ->{
System.err.println("before...");//去到集合点
try {
cyclicBarrier.await();//等人
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.err.println("after...");//到达目的地
});
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
before...
before...
before...
before...
before...
before...
before...
before...
before...
before...
after...
after...
after...
after...
after...
after...
after...
after...
after...
after...

Semaphore

控制互斥资源的访问线程数。(就是控制线程每次只能同时访问的数量)
SemaphoreExample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreExample {
/**
* 适用场景:一条三车道的道路公路,汽车占一个道路,货车占用两个道路。那么这条道路最大允许通过的车辆就是三辆小汽车,最小的允许通过的就是一辆汽车一辆货车
* @param args
*/
public static void main(String[] args) {
final int clientCount = 3;//路宽
final int totalRequestCount = 10;
Semaphore semaphore = new Semaphore(clientCount);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
executorService.execute(() ->{
int flag = new Random().nextInt(2)+1;
String type = flag == 1 ? "汽车" : "货车";
String curName = Thread.currentThread().getName();
System.err.println(curName +"当前需要通过的是:"+ type);
try {
semaphore.acquire(flag);//一次占用路宽度
System.err.println(curName + type +"正在通过。。。"+"剩余的宽度为:"+(semaphore.availablePermits()));
Thread.sleep(200);
semaphore.release(flag);//释放占用的宽度
System.err.println(curName + type +"车辆通过完毕。。。"+"剩余的宽度为:"+semaphore.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}

认真查看输出有惊喜。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
pool-1-thread-2当前需要通过的是:货车
pool-1-thread-2货车正在通过。。。剩余的宽度为:1
pool-1-thread-4当前需要通过的是:汽车
pool-1-thread-4汽车正在通过。。。剩余的宽度为:0
pool-1-thread-6当前需要通过的是:汽车
pool-1-thread-8当前需要通过的是:货车
pool-1-thread-10当前需要通过的是:货车
pool-1-thread-1当前需要通过的是:货车
pool-1-thread-3当前需要通过的是:货车
pool-1-thread-5当前需要通过的是:汽车
pool-1-thread-7当前需要通过的是:货车
pool-1-thread-9当前需要通过的是:汽车
pool-1-thread-6汽车正在通过。。。剩余的宽度为:1
pool-1-thread-2货车车辆通过完毕。。。剩余的宽度为:1
pool-1-thread-8货车正在通过。。。剩余的宽度为:0
pool-1-thread-4汽车车辆通过完毕。。。剩余的宽度为:0
pool-1-thread-6汽车车辆通过完毕。。。剩余的宽度为:1
pool-1-thread-10货车正在通过。。。剩余的宽度为:1
pool-1-thread-8货车车辆通过完毕。。。剩余的宽度为:1
pool-1-thread-10货车车辆通过完毕。。。剩余的宽度为:3
pool-1-thread-1货车正在通过。。。剩余的宽度为:1
pool-1-thread-3货车正在通过。。。剩余的宽度为:1
pool-1-thread-5汽车正在通过。。。剩余的宽度为:0
pool-1-thread-1货车车辆通过完毕。。。剩余的宽度为:0
pool-1-thread-7货车正在通过。。。剩余的宽度为:0
pool-1-thread-3货车车辆通过完毕。。。剩余的宽度为:0
pool-1-thread-9汽车正在通过。。。剩余的宽度为:0
pool-1-thread-5汽车车辆通过完毕。。。剩余的宽度为:0
pool-1-thread-9汽车车辆通过完毕。。。剩余的宽度为:3
pool-1-thread-7货车车辆通过完毕。。。剩余的宽度为:3

FutureTask

在介绍Callable时我们知道它可以有返回值,返回值可以通过Future进行封装。
FuntureTask实现了RunnableFuture接口,该接口继承自Runnable和Future接口,这使得FutureTask既可以当一个任务执行,也可以有返回值。

1
2
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable,Funture<V>

FutureTask 可用于异步获取执行结果或取消执行任务的场景。当一个计算任务需要执行很长时间,那么就可以用FutureTask来封装这个任务,主线程在完成自己的任务之后再去获取结果。

FutureTaskExample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 100; i++) {
Thread.sleep(10);
result +=i;
}
return result;
}
});
Thread computeThread = new Thread(futureTask);
computeThread.start();

Thread other = new Thread(() ->{
System.err.println("other task is running...");
try{
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("other task end");
});
other.start();
System.err.println("start get");
System.err.println(futureTask.get());
System.err.println("main end");
}
}

Queue

BlockingQueue

FIFO队列:linkedBlockingQueue、ArrayBlockingQueue
优先级队列:PriorityBlockingQueue

提供了阻塞的take()和put()方法:如果队列为空,take()将阻塞,知道队列中有内容;如果队列已满put()将阻塞,知道队列有空闲位置。
使用Blockingqueue实现生产者消费者问题。
BlockingQueueExample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueExample {
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
private static class Producer extends Thread{
@Override
public void run() {
try {
queue.put("product");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("product...");
}
}
private static class Consumer extends Thread{
@Override
public void run() {
try {
String take = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("consume...");
}
}

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 7; i++) {
Producer producer = new Producer();
producer.start();
}
Thread.sleep(1000);
for (int i = 0; i <10; i++) {
Consumer consumer = new Consumer();
consumer.start();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
product...
product...
product...
product...
product...
consume...
consume...
consume...
consume...
consume...
product...
product...
consume...
consume...

ForkJoin

主要用于并行计算中,和MaoReduce原理类似,都是把大的任务拆分成多个小人物并行计算。
ForkJoinExample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ForkJoinExample extends RecursiveTask<Integer> {
private final int threshold = 5;//每次能够执行的任务
private int first;
private int last;

public ForkJoinExample(int first, int last) {
this.first = first;
this.last = last;
}

@Override
protected Integer compute() {
int result =0;
if(last - first <= threshold){
//任务足够小则直接计算
for (int i = first; i <= last; i++) {
result += i;
}
} else{
//拆分小任务
int middle = first + (last-first)/2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);//1-4
ForkJoinExample rightTask = new ForkJoinExample(middle+1, last);//5-10
leftTask.fork();
rightTask.fork();
result = leftTask.join() + rightTask.join();
}
return result;
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinExample example = new ForkJoinExample(1,10);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> result = forkJoinPool.submit(example);
System.err.println(result.get());
}
}

result:

1
55

线程安全

线程安全有以下几种实现方式:

不可变

不可变(Immutable)的对象一定是线程安全的,不需要再采取任何的线程安全保障措施。只要一个不可变的对象被正确地被构建出来,永远也不会见到它在多个线程之中处于不一致的状态。多线程环境下,应当尽量使对象成为不可变,来满足线程安全。

不可变类型:

  • final关键字修饰的基本数据类型
  • String
  • 枚举类型
  • Number部分子类,如Long和Double等数值包装类型,BigInteger和BigDecimal等大数据类型。但同为Number的原子类AtomicInteger和AtomicLong则是可变的。

对于集合类型,可以使用Collections.unmodifiablexxx() 方法来获取一个不可变的集合。

ImmutableExample.java

1
2
3
4
5
6
7
8
9
10
11
12
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class ImmutableExample {
public static void main(String[] args) {
Map<String,Integer> map = new HashMap<>();
map.put("a", 1);
Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
unmodifiableMap.put("b", 2);
}
}

1
2
3
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at pw.gouzai.ImmuutableExample.main(ImmuutableExample.java:12)

Collections.unmodifiableXXX()先对原始的集合进行拷贝,需要对几个进行修改的方法都直接抛出异常。

UnmodifiableMap.java

1
2
3
4
5
6
7
8
9
10
11
12
public V put(K key, V value) {
throw new UnsupportedOperationException();
}
public V remove(Object key) {
throw new UnsupportedOperationException();
}
public void putAll(Map<? extends K, ? extends V> m) {
throw new UnsupportedOperationException();
}
public void clear() {
throw new UnsupportedOperationException();
}

AtomicExample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicExample {
private AtomicInteger cnt = new AtomicInteger();

public void add(){
cnt.incrementAndGet();
}

public int get(){
return cnt.get();
}

public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000000;
AtomicExample example = new AtomicExample();
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() ->{
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.err.println(example.get());
}
}

互斥同步

synchronized和ReentrantLock

非阻塞同步

无同步方案

要保证线程安全,并不是一定就要进行同步,如果一个方法本来就不涉及共享数据,那他自然就无须任何同步措施去保证正确性。

栈封闭

多个线程访问同一个方法的局部变量时,不会出现线程安全问题,因为局部变量存储在虚拟栈中,属于线程私有的。
StackClosedExample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class StackClosedExample {
public void add(){
int cnt =0;
for (int i = 0; i < 100; i++) {
cnt++;
}
System.err.println(cnt);
}

public static void main(String[] args) {
StackClosedExample example = new StackClosedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> example.add());
executorService.execute(() -> example.add());
executorService.shutdown();
}
}

1
2
100
100

线程本地存储

将共享数据限制在同一个线程之内

ThreadLocalExample.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadLocalExample {
public static void main(String[] args) {
ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
Thread thread1 = new Thread(() ->{
threadLocal.set(1);
threadLocal.set(3);
try{
Thread.sleep(1000);
} catch (InterruptedException e){
e.printStackTrace();
}
System.err.println(threadLocal.get());
});

Thread thread2 = new Thread(()->{
threadLocal.set(2);
threadLocal.remove();
});

thread1.start();
thread2.start();
}
}

1
3

可重入代码

锁优化

这里的锁优化主要是指JVM对synchronized的优化

自旋锁

自旋锁是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果这段时间内能获得锁,那么就可以避免进入阻塞状态。

锁消除

锁消除是指对于被检测出来不可能存在竞争的共享数据的锁进行消除。

锁粗化

如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的枷锁操作就会导致性能损耗。

偏向锁

轻量级锁

感谢两位dalao的博客。本篇文章基于两位大佬的文章进行整理(照搬再改)。
https://www.cnblogs.com/yjd_hycf_space/p/7526608.html
https://github.com/CyC2018/CS-Notes/blob/master/notes/Java%20%E5%B9%B6%E5%8F%91.md

谢谢,爱你么么哒