Flink 窗口API 窗口分配器

作者:神秘网友 发布时间:2022-06-23 07:03:09

Flink 窗口API 窗口分配器

1、窗口API1,1、按键分区(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是说,在调用窗口算子之前,是否有keyBy操作。

按键分区窗口(Keyed Windows)

经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。在代码实现上,我们需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。


stream.keyBy(...) .window(...)

非按键分区(Non-Keyed Windows)

如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。所以在实际应用中一般不推荐使用这种方式。

stream.windowAll(...)

这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

1.2、代码中窗口API的调用

有了前置的基础,接下来我们就可以真正在代码中实现一个窗口操作了。简单来说,窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

tream.keyBy(key selector).window(window assigner).aggregate(window function)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,我们接下来就详细展开讲解。另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做keyBy,后面调用.window()时直接换成.windowAll()就可以了。

2、窗口分配器

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。之前的介绍中我们知道,窗口分配数据的规则,其实就对应着不同的窗口类型。所以可以说,窗口分配器其实就是在指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

2.1、时间窗口

时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。在较早的版本中,可以直接调用.timeWindow()来定义时间窗口;这种方式非常简洁,但使用事件时间语义时需要另外声明,程序员往往因为忘记这点而导致运行结果错误。所以在1.12版本之后,这种方式已经被弃用了,标准的声明方式就是直接调用.window(),在里面传入对应时间语义下的窗口分配器。这样一来,我们不需要专门定义时间语义,默认就是事件时间;如果想用处理时间,那么在这里传入处理时间的窗口分配器就可以了。

环境准备

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//
设置全局并行度 env.setParallelism(1
);
//
设置水位线生成间隔 env.getConfig().setAutoWatermarkInterval(100
); SingleOutputStreamOperatorEvent eventStream = env.addSource(
new
ClickSource()) .assignTimestampsAndWatermarks(WatermarkStrategy
//
无序水位线:延迟2s .EventforBoundedOutOfOrderness(Duration.ofSeconds(2
)) .withTimestampAssigner(
new SerializableTimestampAssignerEvent
() {
//
指定水位线的字段:这里从 timestamp 读取时间信息
@Override
public
long extractTimestamp(Event element,
long
recordTimestamp) {
return
element.timestamp; } }));

滚动处理时间窗口

        
/**
* 滚动时间窗口 * 窗口大小 10 min ,偏移 0
*/
eventStream.map(data - Tuple2.of(data.user, 1L
)) .keyBy(data -
data.f0) .window(TumblingEventTimeWindows.of(Time.seconds(10
))) .reduce(
new ReduceFunctionTuple2String, Long
() { @Override
public Tuple2String, Long reduce(Tuple2String, Long value1, Tuple2String, Long value2)
throws
Exception {
return Tuple2.of(value1.f0, value1.f1 +
value2.f1); } }) ;
/**
* 滚动时间窗口 * 窗口大小 10 min ,偏移 2 min
*/
WindowedStreamEvent, String, TimeWindow window2 =
eventStream.keyBy(data -
data.user) .window(TumblingEventTimeWindows.of(Time.minutes(10), Time.minutes(2)));

偏移量说明:我们知道,不同国家分布在不同的时区。标准时间戳其实就是1970年1月1日0时0分0秒0毫秒开始计算的一个毫秒数,而这个时间是以UTC时间,也就是0时区(伦敦时间)为标准的。我们所在的时区是东八区,也就是UTC+8,跟UTC有8小时的时差。我们定义1天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天0点开启窗口,这时是北京时间早上8点。那怎样得到北京时间每天0点开启的滚动窗口呢?只要设置-8小时的偏移量就可以了:

.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))

滑动处理时间窗口

        
/**
* 滑动事件时间窗口 * 窗口大小 30 min,步长 5 min 无偏移量 * window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(10)))
*/
WindowedStreamEvent, String, TimeWindow window1 = eventStream.keyBy(data -
data.user) .window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(10
)));
/**
* 滑动事件时间窗口 * 窗口大小 30 min,步长 5 min 偏移量 -8h * SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(5), Time.hours(8)
*/
WindowedStreamEvent, String, TimeWindow window11 = eventStream.keyBy(data -
data.user) .window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(5), Time.hours(-8)));

其它一些窗口分配器

   
//
事件时间会话窗口:窗口大小 5s WindowedStreamEvent, String, TimeWindow window3 =
eventStream.keyBy(data -
data.user) .window(EventTimeSessionWindows.withGap(Time.seconds(5
)));
//
滑动计数窗口 窗口大小 10 :滑动步长 2 WindowedStreamEvent, String, GlobalWindow eventStringGlobalWindowWindowedStream =
eventStream.keyBy(data -
data.user) .countWindow(10, 2);

Flink 窗口API 窗口分配器 相关文章

  1. windows API窗口程序

    windows API窗口程序 了解 windows操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 掌握WinMain函数的基本框架,窗口定义、窗口创建、消息循环及窗口过程函数; 过程 在visual studio2019 下创建基于windows平台的c++桌面...

  2. 第一个windows API窗口程序

    第一个windows API窗口程序 步骤 编写WinMain函数 设计窗口类 注册窗口类 创建窗口 显示并更新窗口 编写消息循环 编写窗口过程函数 总结 整体源码 效果图 第一次写Windows窗口程序,第一次写博客,遇到很多问题。 编写WinMain函数 Wi...

  3. Windows API 窗口的创建

    Windows API 窗口的创建 提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 Windows API 窗口的创建 前言 创建一个Win32应用程序的步骤: 一个基于windows API 的基于窗体、消息循环、事件驱动的 Windows C语言风格的W...

  4. OS X中如何管理窗口大小

    OS X 10.7系统中,窗口的管理方法已经有所改变,相对于之前的系统来说,OS X 10.7系统管理窗口更加的方便,任意拖动窗口的边框,都可以对窗口的大小进行调整。现在就和小编一起来看看OS X中有哪些管理窗口大小的技巧吧。 OS X...

  5. Windows API程序设计(窗口创建)

    Windows API程序设计(窗口创建) 目的 了解 windows操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 掌握WinMain函数的基本框架,窗口定义、窗口创建、消息循环及窗口过程函数; 创建一个Win32应用程序的步骤: 定义...

  6. Windows API程序设计(窗口创建)

    Windows API程序设计(窗口创建) Windows API程序设计(窗口创建) 了解 windows操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 掌握WinMain函数的基本框架,窗口定义、窗口创建、消息循环及窗口过程函数; 设计一个窗...

  7. Windows API程序设计(窗口创建)

    Windows API程序设计(窗口创建) Windows API程序设计(窗口创建) 了解 windows 操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 掌握 WinMain 函数的基本框架,窗口定义、窗口创建、消息循环及窗口过程函数; 1. 创建弹...

  8. 写一个基于Windows API简单窗口的程序

    写一个基于Windows API简单窗口的程序 Window API程序设计(窗口创建) 1、了解 windows操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 2、掌握WinMain函数的基本框架,窗口定义、窗口创建、消息循环及窗口过程函...

  9. linux窗口切换快捷键_分配快捷键以在Linux上激活打开的应用程序

    linux窗口切换快捷键_分配快捷键以在Linux上激活打开的应用程序窗口 linux窗口切换快捷键 We’ve already shown you how to customize shortcut keys in any Linux application, but for today’s lesson we’ll take it a step beyondand assign a shortc

  10. Windows API程序设计入门(一个简单的窗口)

    Windows API程序设计入门(一个简单的窗口) 了解 windows操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 掌握WinMain函数的基本框架,窗口定义、窗口创建、消息循环及窗口过程函数; 1.设计窗口类 WNDCLASS win;//窗...

  11. Windows API程序设计入门----创建一个简单的窗口

    Windows API程序设计入门----创建一个简单的窗口 目录 一、目的 二、工具 三、步骤及代码实现 1、步骤及代码 2、运行结果 1.了解 windows操作系统应用程序开发的基本概念,win32 API函数、消息与事件驱动; 2.掌握WinMain函数的基本框...

  12. Mac OS X 10.2系统中如何移动非活动窗口

    在MAC的使用过程中,我们有时需要将一些非活动窗口进行移动,或者是将一些文件从一个磁盘移动到另一个磁盘,却不想让它拷贝过去,那么,该怎么解决这些问题呢?今天小编就给大家介绍一下这些问题的解决方法吧。 快速...

  13. 第07课:使用 HTML 5 API 创建子窗口

    第07课:使用 HTML 5 API 创建子窗口 在 Electron 中还存在一种创建窗口的方式,就是使用 HTML 5 的 API 创建窗口。在 HTML 5 中提供了 window.open 方法用于打开一个子窗口,该方法返回一个 BrowserWindowProxy 对象,并且打开了一个功能受限...

  14. Mac OS X 10.2窗口选项快捷键汇总

    在MAC使用过程中,我们可以通过一些快捷键来提高我们的速度,加快我们的工作效率。今天小编给大家带来的是关于系统画面中的窗口选单的一些快捷键。 cmd-选取项目 关闭窗口。 cmd-shift-选取项目 将弹出式窗口归位。 cmd-opt-选...

  15. MAC OS X Lion如何打开非活动程序的所有窗口

    MAC OS X Lion系统中,用户们开启了三指下滑手势后,发现通过这种操作只能打开当前应用程序的窗口,而对其他非活动程序却无效。但是有时我们想要在不切换应用程序的情况下,打开其他应用程序的窗口,这该怎么办呢? 解决...

  16. 第15章 TCP数据流与窗口管理;延时确认(Delayed Ack);Nagle算法;窗口通告;滑动窗口;窗口的滑动;零窗口和TCP持续计时器

    "交互式"TCP连接是指该连接需要在客户端和服务器之间传输用户输入信息,如按键操作、短消息、操作杆、鼠标的动作等。 每个输入的字符会生成4个TCP报文段:客户输入字符的发送,服务端回复ack;服务端回显数据发送,客户...

  17. qt 开了子窗口,关闭主窗口后,子窗口仍存在的解决方法

    qt 开了子窗口,关闭主窗口后,子窗口仍存在的解决方法 转载:https://blog.csdn.net/qq_36170958/article/details/108686841 项目场景: 在使用VS+Qt做图像处理软件项目开发时,有一个父(主)窗口和若干子窗口,点击主窗口的按钮,子窗口...

  18. qt 窗口不可调整大小_将不可调整大小的窗口变成可调整大小的窗口

    qt 窗口不可调整大小_将不可调整大小的窗口变成可调整大小的窗口 qt 窗口不可调整大小 Are you frustrated with Windows app windows that can not be resized at all? Now you can apply some “attitude adjustment” and resize those windows with Resiz

  19. JFrame创建窗口与关闭窗口

    1,利用内部类构造方法创建窗口 import java.awt.*; import javax.swing.*; import java.awt.event.*; //先调用要用到的包 public class JFrame_test extends JFrame implements ActionListener{ //继承JFrame类实现ActionListener接口,JFrame类创建窗口,Ac

  20. MFC 主窗口调用子窗口

    MFC 主窗口调用子窗口 功能:MFC主窗口创建子窗口,同时隐藏本窗口;当子窗口关闭时,继续显示主窗口;子窗口关闭时,复写关闭窗口函数。 1 在主窗口,创建两个按钮。 2 创建两个子窗口界面。资源视图-Dialog(右键)-添加资源...

每天更新java,php,javaScript,go,python,nodejs,vue,android,mysql等相关技术教程,教程由网友分享而来,欢迎大家分享IT技术教程到本站,帮助自己同时也帮助他人!

Copyright 2021, All Rights Reserved. Powered by 跳墙网(www.tqwba.com)|网站地图|关键词