JavaJava异步优化模板
DreamCollector
以下代码逐步升级优化,提高效率,并且保证不重复生产以及消费,可以自行替换//生产任务
和//消费任务
其中的模块为自己的具体实现
一、单个主线程中执行
通过main来调用主线程进行数据消费
1 2 3 4 5 6 7
| public static void main(String[] args) { for (int j = 1; j <= 9000; j++) { System.out.println(Thread.currentThread().getName() + " consumed: " + j); }
}
|
二、多个线程执行
由于单个主线程很大程度的限制了CPU的效率,采用多个线程(这里采用自定义线程池)可以很好的提高
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public static void main(String[] args) { ThreadPoolExecutor consumerExecutor = new ThreadPoolExecutor(3, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(13), new CustomizableThreadFactory("消费线程"), new ThreadPoolExecutor.CallerRunsPolicy());
AtomicInteger counter = new AtomicInteger(1);
for (int i = 0; i < NUM_CONSUMERS; i++) { consumerExecutor.execute(() -> { while (true) { int currentValue = counter.getAndIncrement(); if (currentValue > 9000) { break; } System.out.println(Thread.currentThread().getName() + " consumed: " + currentValue); } }); } consumerExecutor.shutdown();
}
|
三、单一生产、消费者
引入生产者、消费者模式可以解藕出来独立工作,通过共享的缓冲区(通常是队列)来进行通信适用于解决生产者和消费者之间的速度不匹配问题,以及确保线程安全的同时提高系统的效率
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public static void main(String[] args) { ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(3, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(13), new CustomizableThreadFactory("线程"), new ThreadPoolExecutor.CallerRunsPolicy());
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<>();
customExecutor.execute(() -> { for (int i = 1; i <= 9000; i++) { try { sharedQueue.put(i); System.out.println(Thread.currentThread().getName() + " product: " + i); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } });
customExecutor.execute(() -> { while (true) { try { Integer value = sharedQueue.take(); if (value == -1) { break; } System.out.println(Thread.currentThread().getName() + " Consumed: " + value); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } });
customExecutor.shutdown(); }
|
四、多生产、消费者+CompletableFuture
由于单一生产者消费者虽然将生产和消费解藕出来提高了吞吐,但是还是CPU的效率依旧没有最大化利用,多个生产者和消费者可以很好的利用CPU提高效率,用CompletableFuture
来异步并发执行,等待运行完毕并释放线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| import cn.hutool.core.text.CharSequenceUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*;
@Slf4j @Component public class ThreadUtil {
private static final int NUM_PRODUCERS = 2; private static final int NUM_CONSUMERS = 100;
private static final BlockingQueue<Integer> SHARED_QUEUE = new ArrayBlockingQueue<>(100);
public static void main(String[] args) { ThreadPoolExecutor producerExecutor = new ThreadPoolExecutor(1, NUM_PRODUCERS, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new CustomizableThreadFactory("生产线程"), new ThreadPoolExecutor.CallerRunsPolicy()); ThreadPoolExecutor consumerExecutor = new ThreadPoolExecutor(50, NUM_CONSUMERS, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(13), new CustomizableThreadFactory("消费线程"), new ThreadPoolExecutor.CallerRunsPolicy());
long start = System.currentTimeMillis(); CompletableFuture<Void> producersFuture = CompletableFuture.runAsync(() -> { try { for (int j = 1; j <= 9000; j++) { SHARED_QUEUE.put(j); System.out.println(Thread.currentThread().getName() + " product: " + j); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); } finally { try { for (int i = 0; i < NUM_CONSUMERS; i++) { SHARED_QUEUE.put(-1); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }, producerExecutor);
List<CompletableFuture<Void>> consumersFutures = new ArrayList<>(); for (int i = 0; i < NUM_CONSUMERS; i++) { CompletableFuture<Void> consumerFuture = CompletableFuture.runAsync(() -> { try { while (true) { int value = SHARED_QUEUE.take(); if (value == -1) { break; } System.out.println(Thread.currentThread().getName() + " consumed: " + value); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); log.error("消费者线程被中断:" + e.getMessage()); } }, consumerExecutor);
consumersFutures.add(consumerFuture); }
CompletableFuture<Void> allFutures = CompletableFuture.allOf(producersFuture); allFutures.join(); System.out.println("----生产者全部生产完毕----");
CompletableFuture<Void> allConsumersFutures = CompletableFuture.allOf(consumersFutures.toArray(new CompletableFuture[0])); allConsumersFutures.thenRun(() -> { System.out.println(CharSequenceUtil.format("----消费者全部消费完毕,耗时{}ms----", System.currentTimeMillis() - start));
producerExecutor.shutdown(); consumerExecutor.shutdown(); }).join(); }
}
|
五、采用分布式队列
由于Java的阻塞队列:ArrayBlockingQueue
主要用于解决单个应用程序内的线程通信问题,要想再次提高效率以及吞吐可以使用像Kafka、RabbitMQ这种队列替代,更适合构建大规模、高可用性、持久化的分布式消息系统,特别是在需要处理大量数据和实现实时处理的场景下