Flink 窗口API 窗口分配器
Flink 窗口API 窗口分配器
1、窗口API1,1、按键分区(Keyed)和非按键分区(Non-Keyed)在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是说,在调用窗口算子之前,是否有keyBy操作。
按键分区窗口(Keyed Windows)
经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。在代码实现上,我们需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。
stream.keyBy(...) .window(...)
非按键分区(Non-Keyed Windows)
如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。所以在实际应用中一般不推荐使用这种方式。
stream.windowAll(...)
这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。
1.2、代码中窗口API的调用有了前置的基础,接下来我们就可以真正在代码中实现一个窗口操作了。简单来说,窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
tream.keyBy(key selector).window(window assigner).aggregate(window function)
其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,我们接下来就详细展开讲解。另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做keyBy,后面调用.window()时直接换成.windowAll()就可以了。
2、窗口分配器定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。之前的介绍中我们知道,窗口分配数据的规则,其实就对应着不同的窗口类型。所以可以说,窗口分配器其实就是在指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。
2.1、时间窗口时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。在较早的版本中,可以直接调用.timeWindow()来定义时间窗口;这种方式非常简洁,但使用事件时间语义时需要另外声明,程序员往往因为忘记这点而导致运行结果错误。所以在1.12版本之后,这种方式已经被弃用了,标准的声明方式就是直接调用.window(),在里面传入对应时间语义下的窗口分配器。这样一来,我们不需要专门定义时间语义,默认就是事件时间;如果想用处理时间,那么在这里传入处理时间的窗口分配器就可以了。
环境准备
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//
设置全局并行度 env.setParallelism(1
);
//
设置水位线生成间隔 env.getConfig().setAutoWatermarkInterval(100
); SingleOutputStreamOperatorEvent eventStream = env.addSource(
new
ClickSource()) .assignTimestampsAndWatermarks(WatermarkStrategy
//
无序水位线:延迟2s .EventforBoundedOutOfOrderness(Duration.ofSeconds(2
)) .withTimestampAssigner(
new SerializableTimestampAssignerEvent
() {
//
指定水位线的字段:这里从 timestamp 读取时间信息
@Override
public
long extractTimestamp(Event element,
long
recordTimestamp) {
return
element.timestamp; } }));
滚动处理时间窗口
/**
* 滚动时间窗口 * 窗口大小 10 min ,偏移 0
*/
eventStream.map(data - Tuple2.of(data.user, 1L
)) .keyBy(data -
data.f0) .window(TumblingEventTimeWindows.of(Time.seconds(10
))) .reduce(
new ReduceFunctionTuple2String, Long
() { @Override
public Tuple2String, Long reduce(Tuple2String, Long value1, Tuple2String, Long value2)
throws
Exception {
return Tuple2.of(value1.f0, value1.f1 +
value2.f1); } }) ;
/**
* 滚动时间窗口 * 窗口大小 10 min ,偏移 2 min
*/
WindowedStreamEvent, String, TimeWindow window2 =
eventStream.keyBy(data -
data.user) .window(TumblingEventTimeWindows.of(Time.minutes(10), Time.minutes(2)));
偏移量说明:我们知道,不同国家分布在不同的时区。标准时间戳其实就是1970年1月1日0时0分0秒0毫秒开始计算的一个毫秒数,而这个时间是以UTC时间,也就是0时区(伦敦时间)为标准的。我们所在的时区是东八区,也就是UTC+8,跟UTC有8小时的时差。我们定义1天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天0点开启窗口,这时是北京时间早上8点。那怎样得到北京时间每天0点开启的滚动窗口呢?只要设置-8小时的偏移量就可以了:
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
滑动处理时间窗口
/**
* 滑动事件时间窗口 * 窗口大小 30 min,步长 5 min 无偏移量 * window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(10)))
*/
WindowedStreamEvent, String, TimeWindow window1 = eventStream.keyBy(data -
data.user) .window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(10
)));
/**
* 滑动事件时间窗口 * 窗口大小 30 min,步长 5 min 偏移量 -8h * SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(5), Time.hours(8)
*/
WindowedStreamEvent, String, TimeWindow window11 = eventStream.keyBy(data -
data.user) .window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(5), Time.hours(-8)));
其它一些窗口分配器
//
事件时间会话窗口:窗口大小 5s WindowedStreamEvent, String, TimeWindow window3 =
eventStream.keyBy(data -
data.user) .window(EventTimeSessionWindows.withGap(Time.seconds(5
)));
//
滑动计数窗口 窗口大小 10 :滑动步长 2 WindowedStreamEvent, String, GlobalWindow eventStringGlobalWindowWindowedStream =
eventStream.keyBy(data -
data.user) .countWindow(10, 2);
Flink 窗口API 窗口分配器 相关文章
- windows API窗口程序
windows API窗口程序 了解 windows操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 掌握WinMain函数的基本框架,窗口定义、窗口创建、消息循环及窗口过程函数; 过程 在visual studio2019 下创建基于windows平台的c++桌面...
- 第一个windows API窗口程序
第一个windows API窗口程序 步骤 编写WinMain函数 设计窗口类 注册窗口类 创建窗口 显示并更新窗口 编写消息循环 编写窗口过程函数 总结 整体源码 效果图 第一次写Windows窗口程序,第一次写博客,遇到很多问题。 编写WinMain函数 Wi...
- Windows API 窗口的创建
Windows API 窗口的创建 提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 Windows API 窗口的创建 前言 创建一个Win32应用程序的步骤: 一个基于windows API 的基于窗体、消息循环、事件驱动的 Windows C语言风格的W...
- OS X中如何管理窗口大小
OS X 10.7系统中,窗口的管理方法已经有所改变,相对于之前的系统来说,OS X 10.7系统管理窗口更加的方便,任意拖动窗口的边框,都可以对窗口的大小进行调整。现在就和小编一起来看看OS X中有哪些管理窗口大小的技巧吧。 OS X...
- Windows API程序设计(窗口创建)
Windows API程序设计(窗口创建) 目的 了解 windows操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 掌握WinMain函数的基本框架,窗口定义、窗口创建、消息循环及窗口过程函数; 创建一个Win32应用程序的步骤: 定义...
- Windows API程序设计(窗口创建)
Windows API程序设计(窗口创建) Windows API程序设计(窗口创建) 了解 windows操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 掌握WinMain函数的基本框架,窗口定义、窗口创建、消息循环及窗口过程函数; 设计一个窗...
- Windows API程序设计(窗口创建)
Windows API程序设计(窗口创建) Windows API程序设计(窗口创建) 了解 windows 操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 掌握 WinMain 函数的基本框架,窗口定义、窗口创建、消息循环及窗口过程函数; 1. 创建弹...
- 写一个基于Windows API简单窗口的程序
写一个基于Windows API简单窗口的程序 Window API程序设计(窗口创建) 1、了解 windows操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 2、掌握WinMain函数的基本框架,窗口定义、窗口创建、消息循环及窗口过程函...
- linux窗口切换快捷键_分配快捷键以在Linux上激活打开的应用程序
linux窗口切换快捷键_分配快捷键以在Linux上激活打开的应用程序窗口 linux窗口切换快捷键 We’ve already shown you how to customize shortcut keys in any Linux application, but for today’s lesson we’ll take it a step beyondand assign a shortc
- Windows API程序设计入门(一个简单的窗口)
Windows API程序设计入门(一个简单的窗口) 了解 windows操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 掌握WinMain函数的基本框架,窗口定义、窗口创建、消息循环及窗口过程函数; 1.设计窗口类 WNDCLASS win;//窗...
- Windows API程序设计入门----创建一个简单的窗口
Windows API程序设计入门----创建一个简单的窗口 目录 一、目的 二、工具 三、步骤及代码实现 1、步骤及代码 2、运行结果 1.了解 windows操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 2.掌握WinMain函数的基本框...
- Mac OS X 10.2系统中如何移动非活动窗口
在MAC的使用过程中,我们有时需要将一些非活动窗口进行移动,或者是将一些文件从一个磁盘移动到另一个磁盘,却不想让它拷贝过去,那么,该怎么解决这些问题呢?今天小编就给大家介绍一下这些问题的解决方法吧。 快速...
- 第07课:使用 HTML 5 API 创建子窗口
第07课:使用 HTML 5 API 创建子窗口 在 Electron 中还存在一种创建窗口的方式,就是使用 HTML 5 的 API 创建窗口。在 HTML 5 中提供了 window.open 方法用于打开一个子窗口,该方法返回一个 BrowserWindowProxy 对象,并且打开了一个功能受限...
- Mac OS X 10.2窗口选项快捷键汇总
在MAC使用过程中,我们可以通过一些快捷键来提高我们的速度,加快我们的工作效率。今天小编给大家带来的是关于系统画面中的窗口选单的一些快捷键。 cmd-选取项目 关闭窗口。 cmd-shift-选取项目 将弹出式窗口归位。 cmd-opt-选...
- MAC OS X Lion如何打开非活动程序的所有窗口
MAC OS X Lion系统中,用户们开启了三指下滑手势后,发现通过这种操作只能打开当前应用程序的窗口,而对其他非活动程序却无效。但是有时我们想要在不切换应用程序的情况下,打开其他应用程序的窗口,这该怎么办呢? 解决...
- 第15章 TCP数据流与窗口管理;延时确认(Delayed Ack);Nagle算法;窗口通告;滑动窗口;窗口的滑动;零窗口和TCP持续计时器
"交互式"TCP连接是指该连接需要在客户端和服务器之间传输用户输入信息,如按键操作、短消息、操作杆、鼠标的动作等。 每个输入的字符会生成4个TCP报文段:客户输入字符的发送,服务端回复ack;服务端回显数据发送,客户...
- qt 开了子窗口,关闭主窗口后,子窗口仍存在的解决方法
qt 开了子窗口,关闭主窗口后,子窗口仍存在的解决方法 转载:https://blog.csdn.net/qq_36170958/article/details/108686841 项目场景: 在使用VS+Qt做图像处理软件项目开发时,有一个父(主)窗口和若干子窗口,点击主窗口的按钮,子窗口...
- qt 窗口不可调整大小_将不可调整大小的窗口变成可调整大小的窗口
qt 窗口不可调整大小_将不可调整大小的窗口变成可调整大小的窗口 qt 窗口不可调整大小 Are you frustrated with Windows app windows that can not be resized at all? Now you can apply some “attitude adjustment” and resize those windows with Resiz
- JFrame创建窗口与关闭窗口
1,利用内部类构造方法创建窗口 import java.awt.*; import javax.swing.*; import java.awt.event.*; //先调用要用到的包 public class JFrame_test extends JFrame implements ActionListener{ //继承JFrame类实现ActionListener接口,JFrame类创建窗口,Ac
- MFC 主窗口调用子窗口
MFC 主窗口调用子窗口 功能:MFC主窗口创建子窗口,同时隐藏本窗口;当子窗口关闭时,继续显示主窗口;子窗口关闭时,复写关闭窗口函数。 1 在主窗口,创建两个按钮。 2 创建两个子窗口界面。资源视图-Dialog(右键)-添加资源...