![Java核心技术·卷Ⅱ:高级特性(原书第10版)](https://wfqqreader-1252317822.image.myqcloud.com/cover/937/34339937/b_34339937.jpg)
1.14 并行流
流使得并行处理块操作变得很容易。这个过程几乎是自动的,但是需要遵守一些规则。首先,必须有一个并行流。可以用Collection.parallelStream()方法从任何集合中获取一个并行流:
![](https://epubservercos.yuewen.com/F21227/18365861501241106/epubprivate/OEBPS/Images/034-i.jpg?sign=1739298703-AC9nma9kcktgITKsCPIzWtU7ZUBbESCC-0-9fe8a1f80f1ad18052dd153935a903eb)
而且,parallel方法可以将任意的顺序流转换为并行流。
![](https://epubservercos.yuewen.com/F21227/18365861501241106/epubprivate/OEBPS/Images/034-2-i.jpg?sign=1739298703-X9C5LMWfqA5A6jsh1v1xUjluzyCO08nN-0-bcf94d859c739649152ec45017effa18)
只要在终结方法执行时,流处于并行模式,那么所有的中间流操作都将被并行化。
当流操作并行运行时,其目标是要让其返回结果与顺序执行时返回的结果相同。重要的是,这些操作可以以任意顺序执行。
下面的示例是一项你无法完成的任务。假设你想要对字符串流中的所有短单词计数:
![](https://epubservercos.yuewen.com/F21227/18365861501241106/epubprivate/OEBPS/Images/035-i.jpg?sign=1739298703-2DP7uj4ClVuQsvbEtw7KJ6lqHMkOevNC-0-fd32c65680566b65d1e40e618907d82d)
这是一种非常非常糟糕的代码。传递给forEach的函数会在多个并发线程中运行,每个都会更新共享的数组。正如我们在卷Ⅰ第14章中所解释的,这是一种经典的竞争情况。如果多次运行这个程序,你很可能就会发现每次运行都会产生不同的计数值,而且每个都是错的。
你的职责是要确保传递给并行流操作的任何函数都可以安全地并行执行,达到这个目的的最佳方式是远离易变状态。在本例中,如果用长度将字符串群组,然后分别对它们进行计数,那么就可以安全地并行化这项计算。
![](https://epubservercos.yuewen.com/F21227/18365861501241106/epubprivate/OEBPS/Images/035-2-i.jpg?sign=1739298703-feiidUL4hmaZRL2zj0nPxcNsWcH0dlW8-0-46d467f117d7a4848518eb1e63fd740c)
警告:传递给并行流操作的函数不应该被堵塞。并行流使用fork-join池来操作流的各个部分。如果多个流操作被阻塞,那么池可能就无法做任何事情了。
默认情况下,从有序集合(数组和列表)、范围、生成器和迭代产生的流,或者通过调用Stream.sorted产生的流,都是有序的。它们的结果是按照原来元素的顺序累积的,因此是完全可预知的。如果运行相同的操作两次,将会得到完全相同的结果。
排序并不排斥高效的并行处理。例如,当计算stream.map(fun)时,流可以被划分为n的部分,它们会被并行地处理。然后,结果将会按照顺序重新组装起来。
当放弃排序需求时,有些操作可以被更有效地并行化。通过在流上调用unordered方法,就可以明确表示我们对排序不感兴趣。Stream.distinct就是从这种方式中获益的一种操作。在有序的流中,distinct会保留所有相同元素中的第一个,这对并行化是一种阻碍,因为处理每个部分的线程在其之前的所有部分都被处理完之前,并不知道应该丢弃哪些元素。如果可以接受保留唯一元素中任意一个的做法,那么所有部分就可以并行地处理(使用共享的集来跟踪重复元素)。
还可以通过放弃排序要求来提高limit方法的速度。如果只想从流中取出任意n个元素,而并不在意到底要获取哪些,那么可以调用:
![](https://epubservercos.yuewen.com/F21227/18365861501241106/epubprivate/OEBPS/Images/035-3-i.jpg?sign=1739298703-K3FlTmjQrhu1av3CsPeb60GX9XP2IlEy-0-05341e3c6a2142997cc5378d180f2101)
正如1.9节所讨论的,合并映射表的代价很高昂。正是因为这个原因,Collectors.groupByConcurrent方法使用了共享的并发映射表。为了从并行化中获益,映射表中值的顺序不会与流中的顺序相同。
![](https://epubservercos.yuewen.com/F21227/18365861501241106/epubprivate/OEBPS/Images/036-2-i.jpg?sign=1739298703-92ixe8i5QK35IpaA7sl8yrunvusEXqlS-0-9897c812a532765f03ead2330efdce43)
当然,如果使用独立于排序的下游收集器,那么就不必在意了,例如:
![](https://epubservercos.yuewen.com/F21227/18365861501241106/epubprivate/OEBPS/Images/036-3-i.jpg?sign=1739298703-6zPvzx1YtKc5fOo8ZCqaDSpEYiaFBibY-0-829dbcb7aeddf199b8df419ab8f34809)
警告:不要修改在执行某项流操作后会将元素返回到流中的集合(即使这种修改是线程安全的)。记住,流并不会收集它们的数据,数据总是在单独的集合中。如果修改了这样的集合,那么流操作的结果就是未定义的。JDK文档对这项需求并未做出任何约束,并且对顺序流和并行流都采用了这种处理方式。
更准确地讲,因为中间的流操作都是惰性的,所以直到执行终结操作时才对集合进行修改仍旧是可行的。例如,下面的操作尽管并不推荐,但是仍旧可以工作:
![](https://epubservercos.yuewen.com/F21227/18365861501241106/epubprivate/OEBPS/Images/036-4-i.jpg?sign=1739298703-Gr0DRU1pxfwHMtlGVj6P7pF0GwpjfEB8-0-20eb145fad9f1f60bf34fe2653056f50)
但是,下面的代码是错误的:
![](https://epubservercos.yuewen.com/F21227/18365861501241106/epubprivate/OEBPS/Images/036-5-i.jpg?sign=1739298703-8jmMODjTDECA6d0ZERnME9khrqFsC1Ah-0-3e99f3df470dd8b2f6ddb8625fa1cbb6)
为了让并行流正常工作,需要满足大量的条件:
·数据应该在内存中。必须等到数据到达是非常低效的。
·流应该可以被高效地分成若干个子部分。由数组或平衡二叉树支撑的流都可以工作得很好,但是Stream.iterate返回的结果不行。
·流操作的工作量应该具有较大的规模。如果总工作负载并不是很大,那么搭建并行计算时所付出的代价就没有什么意义。
·流操作不应该被阻塞。
换句话说,不要将所有的流都转换为并行流。只有在对已经位于内存中的数据执行大量计算操作时,才应该使用并行流。
程序清单1-8中的示例程序展示了如何操作并行流。
程序清单1-8 parallel/ParallelStreams.java
![](https://epubservercos.yuewen.com/F21227/18365861501241106/epubprivate/OEBPS/Images/036-6-i.jpg?sign=1739298703-70brsTDpX3ZYHt2O8GxAWhtNBPRirfzA-0-4cecdd0eca9335984748ccef775b01b3)
![](https://epubservercos.yuewen.com/F21227/18365861501241106/epubprivate/OEBPS/Images/037-i.jpg?sign=1739298703-utk6zu5oVzBTnnI6hq5F8iHcZLzU74Zk-0-652e71855471aa9e2583aadf80e2f0bf)
java.util.stream.BaseStream<T,S extends BaseStream<T,S>>8
·S parallel()
产生一个与当前流中元素相同的并行流。
·S unordered()
产生一个与当前流中元素相同的无序流。
java.util.Collection<E>1.2
·Stream<E>parallelStream()8
用当前集合中的元素产生一个并行流。
在本章中,你学习到了如何运用Java 8的流库。下一章将讨论另一个重要的主题:处理输入和输出。