Java异步优化模板

以下代码逐步升级优化,提高效率,并且保证不重复生产以及消费,可以自行替换//生产任务//消费任务其中的模块为自己的具体实现

一、单个主线程中执行

通过main来调用主线程进行数据消费

1
2
3
4
5
6
7
public static void main(String[] args) {
//消费任务
for (int j = 1; j <= 9000; j++) {
System.out.println(Thread.currentThread().getName() + " consumed: " + j);
}

}

二、多个线程执行

由于单个主线程很大程度的限制了CPU的效率,采用多个线程(这里采用自定义线程池)可以很好的提高

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) {
ThreadPoolExecutor consumerExecutor = new ThreadPoolExecutor(3, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(13), new CustomizableThreadFactory("消费线程"), new ThreadPoolExecutor.CallerRunsPolicy());

AtomicInteger counter = new AtomicInteger(1);

for (int i = 0; i < NUM_CONSUMERS; i++) {
consumerExecutor.execute(() -> {
while (true) {
int currentValue = counter.getAndIncrement();
if (currentValue > 9000) {
break; // 当达到最大值时退出循环
}
//消费任务
System.out.println(Thread.currentThread().getName() + " consumed: " + currentValue);
}
});
}
// 等待所有任务完成
consumerExecutor.shutdown();

}

三、单一生产、消费者

引入生产者、消费者模式可以解藕出来独立工作,通过共享的缓冲区(通常是队列)来进行通信适用于解决生产者和消费者之间的速度不匹配问题,以及确保线程安全的同时提高系统的效率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static void main(String[] args) {
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(3, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(13), new CustomizableThreadFactory("线程"), new ThreadPoolExecutor.CallerRunsPolicy());

BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<>();

// 生产者
customExecutor.execute(() -> {
for (int i = 1; i <= 9000; i++) {
try {
sharedQueue.put(i);
//生产任务
System.out.println(Thread.currentThread().getName() + " product: " + i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

// 消费者
customExecutor.execute(() -> {
while (true) {
try {
Integer value = sharedQueue.take();
if (value == -1) {
break; // 收到终止信号,退出循环
}
//消费任务
System.out.println(Thread.currentThread().getName() + " Consumed: " + value);
// 模拟消费的间隔
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});


// 关闭线程池
customExecutor.shutdown();
}

四、多生产、消费者+CompletableFuture

由于单一生产者消费者虽然将生产和消费解藕出来提高了吞吐,但是还是CPU的效率依旧没有最大化利用,多个生产者和消费者可以很好的利用CPU提高效率,用CompletableFuture来异步并发执行,等待运行完毕并释放线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import cn.hutool.core.text.CharSequenceUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
* @author WX
*/
@Slf4j
@Component
public class ThreadUtil {

private static final int NUM_PRODUCERS = 2;
private static final int NUM_CONSUMERS = 100;
/**
* 定义阻塞队列
*/
private static final BlockingQueue<Integer> SHARED_QUEUE = new ArrayBlockingQueue<>(100);

public static void main(String[] args) {
//这里最好根据系统负载量评估一个阈值,避免OOM问题
ThreadPoolExecutor producerExecutor = new ThreadPoolExecutor(1, NUM_PRODUCERS, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new CustomizableThreadFactory("生产线程"), new ThreadPoolExecutor.CallerRunsPolicy());
ThreadPoolExecutor consumerExecutor = new ThreadPoolExecutor(50, NUM_CONSUMERS, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(13), new CustomizableThreadFactory("消费线程"), new ThreadPoolExecutor.CallerRunsPolicy());

long start = System.currentTimeMillis();
//生产者线程
CompletableFuture<Void> producersFuture = CompletableFuture.runAsync(() -> {
try {
//生产任务
for (int j = 1; j <= 9000; j++) {
SHARED_QUEUE.put(j);
System.out.println(Thread.currentThread().getName() + " product: " + j);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
} finally {
try {
for (int i = 0; i < NUM_CONSUMERS; i++) {
SHARED_QUEUE.put(-1);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, producerExecutor);

//消费者线程
List<CompletableFuture<Void>> consumersFutures = new ArrayList<>();
for (int i = 0; i < NUM_CONSUMERS; i++) {
CompletableFuture<Void> consumerFuture = CompletableFuture.runAsync(() -> {
try {
while (true) {
int value = SHARED_QUEUE.take();
if (value == -1) {
break;
}
//消费任务
System.out.println(Thread.currentThread().getName() + " consumed: " + value);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
log.error("消费者线程被中断:" + e.getMessage());
}
}, consumerExecutor);

consumersFutures.add(consumerFuture);
}


CompletableFuture<Void> allFutures = CompletableFuture.allOf(producersFuture);
allFutures.join();
System.out.println("----生产者全部生产完毕----");

// 使用 thenRun 来在所有任务完成后执行后续的操作
CompletableFuture<Void> allConsumersFutures = CompletableFuture.allOf(consumersFutures.toArray(new CompletableFuture[0]));
allConsumersFutures.thenRun(() -> {
System.out.println(CharSequenceUtil.format("----消费者全部消费完毕,耗时{}ms----", System.currentTimeMillis() - start));

// 关闭线程池
producerExecutor.shutdown();
consumerExecutor.shutdown();
}).join();
}

}

五、采用分布式队列

由于Java的阻塞队列:ArrayBlockingQueue 主要用于解决单个应用程序内的线程通信问题,要想再次提高效率以及吞吐可以使用像Kafka、RabbitMQ这种队列替代,更适合构建大规模、高可用性、持久化的分布式消息系统,特别是在需要处理大量数据和实现实时处理的场景下