使用虚拟线程管理吞吐量 - 一口 Java

虚拟线程是 Project Loom 中期待已久的头条功能,已成为 JDK 21 中的最终功能。虚拟线程是 Java 中并发的新篇章的开端,它为开发人员提供了轻量级线程,允许轻松且资源便宜地分解任务以并发执行。然而,这些更改带来了有关如何最好地管理此吞吐量的疑问。让我们来看看开发人员在使用虚拟线程时如何管理吞吐量。

使用 ExecutorService 创建虚拟线程

在大多数情况下,开发人员无需自己创建虚拟线程。例如,对于 Web 应用程序,Tomcat 或 Jetty 等底层框架将为每个传入请求自动生成一个虚拟线程。

如果应用程序的一部分可以通过分解为各个任务并并发执行而受益,请考虑使用 Executors.newVirtualThreadPerTaskExecutor()) 创建新的虚拟线程。

在此假设示例中,正在按顺序生成三个随机数

public class SerialExample {
	static RandomGenerator generator = RandomGenerator.getDefault();

	public Integer sumThreeRandomValues() {
		return generator.nextInt() + generator.nextInt() + generator.nextInt();
	}
}

可以重新编写此代码,以便并行生成随机数

public class ParallelExample {
	private ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();

	public Integer sumThreeRandomValues() 
			throws InterruptedException, ExecutionException {

		Callable<Integer> generateRandomInt = new Callable<Integer>() {
			static RandomGenerator generator = RandomGenerator.getDefault();

			@Override
			public Integer call() throws Exception {
				return generator.nextInt();
			}
		};

		Future<Integer> randomValue1 = service.submit(generateRandomInt);
		Future<Integer> randomValue2 = service.submit(generateRandomInt);
		Future<Integer> randomValue3 = service.submit(generateRandomInt);
		return randomValue1.get() + randomValue2.get() + randomValue3.get();
	}
}

AutoCloseable ExecutorService

ExecutorService 接口扩展了 AutoCloseable,因此 ExecutorService 实例从 Executors.newVirtualThreadPerTaskExecutor(); 返回,可以放在 try-with-resources 块中,以便在需要时自动关闭,如下例所示

public class ParallelExampleAutoCloseable {
	
	public Integer sumThreeRandomValues() 
			throws InterruptedException, ExecutionException {
		
		Callable<Integer> generateRandomInt = new Callable<Integer>() {
			static RandomGenerator generator = RandomGenerator.getDefault();
			
			@Override
			public Integer call() throws Exception {
				return generator.nextInt();
			}
		};
		
		try (ExecutorService service = Executors.newVirtualThreadPerTaskExecutor();) {
			Future<Integer> value1 = service.submit(generateRandomInt);
			Future<Integer> value2 = service.submit(generateRandomInt);
			Future<Integer> value3 = service.submit(generateRandomInt);
			return value1.get() + value2.get() + value3.get();
		}
	}
}

但是,在大多数情况下,可能更愿意让 ExecutorService 存在于应用程序的生命周期中,或至少封装对象,就像第一个示例中那样。

结构化并发即将到来

使用 Executors.newVirtualThreadPerTaskExecutor()) 创建和执行任务作为虚拟线程在很大程度上是一种临时解决方案。理想情况下,在将工作分解为要并发执行的各个任务时,应使用结构化并发。结构化并发在 JDK 21 中处于预览模式;您可以在 此处 阅读有关预览功能的信息。有关结构化并发的更多信息,请查看 José Paumard 的此视频

限制虚拟线程速率

当我介绍虚拟线程时,我经常听到的一个问题是如何防止外部服务在使用虚拟线程时因太多调用而不知所措。这个问题的答案是使用 java.util.concurrent.SemaphoreSemaphore 的工作原理类似于池,但不是在引入虚拟线程之前池化连接或线程等稀缺资源。 Semaphore 而是池化许可证,这只是一个计数器。

下面的代码示例演示了如何实现Semaphore。许可证数量应设置为外部服务的可接受速率限制,并且对服务的调用应放置在获取许可证的acquire()和返回许可证的release()之间。务必将release()放在finally块中,以防止许可证泄漏。

public class SemaphoreExample {
	private static final Semaphore POOL = new Semaphore(10); //Set permit limit

	public void callOldService(...) {
		try{
			POOL.acquire();//Takes a permit and blocks calls if no permit is available
		} catch(InterruptedException e){
			//handle exception if acquiring a permit fails		
		}
	
		try {
			//call service
		} finally {
			POOL.release();//Releases a permit
		}
	}
}

⚠️ 警告:不要池化虚拟线程,因为它们不是稀缺资源。

池中池

如果您已经使用连接池来管理与服务的连接,请避免使用Semaphore。如果您发现迁移到 JDK 21 并使用虚拟线程时,某个服务被流量淹没,请改而配置相关的连接池,使其具有外部服务可以管理的较小连接池。

配置平台线程

虚拟线程在平台线程之上运行,我在此视频中介绍了这个主题。JVM 启动时创建的平台线程数量基于 JVM 可用的内核数量,平台线程池的默认最大大小为 256。在绝大多数情况下,这些默认值应该可以正常工作。但是,如果您遇到这些默认值无法满足您要求的极端情况,则可以使用 VM 参数对其进行修改

  • jdk.virtualThreadScheduler.parallelism:配置每个内核创建的平台线程数量,JVM 可以访问这些内核。

  • jdk.virtualThreadScheduler.maxPoolSize :配置将创建的最大平台线程数量。

调试虚拟线程

当使用虚拟线程的应用程序出现问题时,请记住,JDK Flight Recorder (JFR) 和jcmd等可观察性工具已经设置为处理虚拟线程。

JFR 和虚拟线程

JFR 已更新,为虚拟线程添加了四个新事件

jdk.VirtualThreadStart
jdk.VirtualThreadEnd
jdk.VirtualThreadPinned
jdk.VirtualThreadSubmitFailed

默认情况下,jdk.VirtualThreadStartjdk.VirtualThreadEnd被禁用,但您可以通过 JFR 配置文件或 JDK Mission Control 启用它们。您可以在此处阅读如何启用 JFR 事件。

使用虚拟线程进行线程转储

可以使用以下命令执行线程转储

jcmd <PID> Thread.dump_to_file -format=[text|json] <file>

线程转储将包括在网络 I/O 操作中被阻塞的虚拟线程和由ExecutorService接口创建的虚拟线程,这是首选使用ExecutorService来创建虚拟线程的原因之一。

不包括传统线程转储中出现的对象地址、锁、JNI 统计信息、堆统计信息和其他信息。

其他读物

虚拟线程 - JEP 444

虚拟线程指南

结构化并发 - JEP 453

Java 21 新功能:虚拟线程 #RoadTo21

编码愉快!