Java的Stream编程
自从Java 8将函数式编程引入语言后,该语言通过Lambda表达式和Stream库两者的结合,展现了全新的编码方式。实现了简洁,高效,以及类似声明式的编码风格。
举个例子,如我们有个集合里面存放的是颜色的标签,我们想在集合中找出以“b”字符开头的标签,并进行排序,然后转换成大写字符标签,最后输出。如果用Java 8以前的编码方式,我们会进行多次轮询,最后输出结果:
public static void main(String[] args) {
List<String> colors = Arrays.asList("blue", "green", "brown", "grey", "red", "white", "black", "beige", "purple");
List<String> filteredAndSortedColors = new ArrayList<>();
for (String color : colors) {
if (color.startsWith("b")) {
filteredAndSortedColors.add(color);
}
}
Collections.sort(filteredAndSortedColors);
for (String color : filteredAndSortedColors) {
System.out.println(color.toUpperCase());
}
}
如果用Java 8的Lambda、Stream来实现,则代码会更加简洁、易懂,执行效率也会提高:
public static void main(String args[]) {
Stream.of("blue", "green", "brown", "grey", "red", "white", "black", "beige", "purple")
.filter(s -> s.startsWith("b"))
.sorted()
.map(s -> s.toUpperCase())
.forEach(System.out::println);
}
Go的Stream编程
go语言中函数是第一公民,理论上我们可以实现更简洁的Stream框架。Stream的核心有以下几个方面:
- 基于流的操作
- 支持并行计算
- 声明式编程风格
基于流的操作
是指数据不断流入,对每个数据依次执行相应的操作并得到最后结果,然后再处理下一个数据,直到所有数据处理完成,最终程序结束。举例说明比较直观,我们有数据集Z,对应操作A,B,C。以前我们对数据的处理方式是对整个数据集Z进行遍历执行操作A得到数据集Z1,然后再对数据集Z1进行遍历执行操作B得到Z2,遍历Z2执行操作C得到最后结果Z3,这里我们有多少操作就需要遍历多少次。基于流的操作方式针对数据集只需遍历一次,对数据集Z进行遍历,对遍历到的第一个数据执行A、B、C三个操作,然后依次对后面的数据执行同样的操作,直至遍历完成。
在实现中这里有两个难点:
- 每个stage怎么连接起来,操作怎么连接起来?
- 操作怎样惰性求值?
在go stream的实现中,建立了一个pipeline结构体代表stage,这个结构体是一个双向链表同时指向前一个和后一个stage。在每个pipeline初始化的时候通过链表指针指向了前后的stage,这样就把所有操作的stage连接起来。
type pipeline[T any] struct {
previousStage *pipeline[T]
nextStage *pipeline[T]
sourceStage *pipeline[T]
depth int
streamOpFlag stateType
streamSink sink[T]
sourceData []T
}
func (p *pipeline[T]) init(previousStage *pipeline[T], opFlag stateType, sink sink[T]) {
if opFlag == head {
p.previousStage = nil
p.sourceStage = p
p.depth = 0
} else {
p.previousStage = previousStage
p.previousStage.nextStage = p
p.sourceStage = previousStage.sourceStage
p.depth = p.previousStage.depth + 1
p.streamSink = sink
}
p.streamOpFlag = opFlag
}
那我们操作怎么惰性求值的呢?注意在pipeline结构体里面有个sink[T]接口,实现这个接口的结构体含有具体的操作。我们以map操作为例,map操作里的pipeline实现了mapSink[T any]这个结构体,结构体里面mapper函数就是具体对数据集里数据的操作。
type sink[T any] interface {
begin(int)
accept(T)
end()
isCancellationWasRequested() bool
cancellationRequested() bool
canParallel() bool
setDownStreamSink(sink[T])
}
type mapSink[T any] struct {
mapper func(T) T
downstream sink[T]
}
在最后一个stage里,我们会调用evaluate函数,这个函数会对所有的sink结构体进行叠加,这里只进行wrap操作,等所有sink都叠加完成后,调用copyInto函数遍历每个数据进行sink操作的真正执行。这样我们就完成了惰性求值。
func (p *pipeline[T]) evaluate(s sink[T]) {
p.copyInto(p.wrapSink(s), p.sourceStage.sourceData)
}
func (p *pipeline[T]) wrapSink(sink sink[T]) sink[T] {
for ; p.depth > 0; p = p.previousStage {
sink = p.opWrapSink(sink)
}
return sink
}
func (p *pipeline[T]) copyInto(wrapSink sink[T], slice []T) {
....
wrapSink.begin(len(slice))
if !wrapSink.isCancellationWasRequested() {
for _, v := range slice {
wrapSink.accept(v)
}
} else {
for _, v := range slice {
if wrapSink.cancellationRequested() {
break
}
wrapSink.accept(v)
}
}
wrapSink.end()
}
支持并行计算
有了上面基于流的操作,以及go协程的强大,并行计算就采用协程来实现。在go stream中新增方法Parallel()来表示要对数据集进行并行计算。在并行计算中监测当前平台具备多少核心的CPU,通过把数据集按核心数等分,调用go协程并行执行流的操作来加快整个计算速度。
func (s *parallelSink[T]) end() {
if s.canparallel {
var wg sync.WaitGroup
cores := runtime.NumCPU()
for _, slice := range splitSlice(s.list, cores) {
wg.Add(1)
go func(slice []T) {
defer wg.Done()
for _, v := range slice {
s.downstream.accept(v)
}
}(slice)
}
wg.Wait()
}
s.downstream.end()
}
声明式编程风格
stream带来编程风格的转变,便是这种声明式的编程风格,你告诉程序需要对数据集进行什么样的操作,比如filter,sort,然后你再给它需要的规则filter里哪些特征的数据为true,sort里数据大小比较规则等,不用关注内部算法细节,框架自己完成整个计算过程。这里我们通过stream接口定义了整个操作集。
type stream[T any] interface {
Parallel() stream[T]
Map(func(T) T) stream[T]
Reduce(func(T, T) T) T
ReduceWithInitValue(T, func(T, T) T) T
ForEach(func(T))
Sorted() stream[T]
SortedWith(func(T, T) bool) stream[T]
Filter(func(T) bool) stream[T]
Limit(int) stream[T]
FindFirst() T
ToList() []T
Distinct() stream[T]
DistinctWith(func(T, T) bool) stream[T]
}
上面我针对go stream这个框架,介绍了基于流的操作,怎样支持并行计算,以及声明式编程风格。这个框架我在实现时并行计算部分实现采用了比较基础的方式,还可以再进一步实现更多的功能,改进算法得到更高的效率。大家有兴趣可以一起研究。
该项目开源地址:gostream
基于Go-1.18实现