一、简介
Flink官网的自我介绍:ApacheFlink?—StatefulComputationsoverDataStreams,可以看出状态计算是Flink引以为豪的杀手锏。那什么是带状态的计算呢?简单说计算任务的结果不仅仅依赖于输入,还依赖于它的当前状态。
实时计算如果任务失败导致中间状态丢失,将是一个非常可怕的事情。
比如实时计算每天的pv,uv等指标,任务掉线后中间状态也丢失了,那只能从凌晨数据重新计算。
如果是有状态的计算大可不必担心,从任务掉线的时刻继续计算,妈妈再也不用担心我的任务掉线了。
下面介绍一下Flink如何实现状态计算和状态管理。
二、Flink中的状态管理
按照数据的划分和扩张方式,Flink中大致分为2类:
KeyedStates:记录每个Key对应的状态值一个Task上可能包含多个Key不同Task上不会出现相同的Key,常用的MapState,ValueState
OperatorStates:记录每个Task对应的状态值数据类型
ListState:并发度在改变的时候,会将并发上的每个List都取出,然后把这些List合并到一个新的List,然后根据元素的个数在均匀分配给新的Task;UnionListState:相比于ListState更加灵活,把划分的方式交给用户去做,当改变并发的时候,会将原来的List拼接起来。然后不做划分,直接交给用户;BroadcastState:如大表和小表做Join时,小表可以直接广播给大表的分区,在每个并发上的数据都是完全一致的。做的更新也相同,当改变并发的时候,把这些数据COPY到新的Task即可
state存储在StateBackend中,StateBackend一共有三种:
MemoryStateBackend
FsStateBackend
RocksDBStateBackend。
但是最终都要持久化到磁盘上更保险,一般存储在hdfs中,那么flink的KeyedStates和OperatorStates如何存储在hdfs中的呢?
三、State在hdfs中的存储格式
KeyedStates和OperatorStates会存储再一个带有编号的chk目录中,比如说一个flink任务的KeyedStates的subTask个数是4,OperatorStates对应的subTask也是4,那么chk会存一个元数据文件_metadata,四个KeyedStates文件,四个OperatorStates的文件。
也就是说KeyedStates和OperatorStates会分别存储subTask总数个状态文件。
四、State存在形式
KeyedState和OperatorState存在两种形式:managed(托管状态)和raw(原始状态)。
托管状态是由Flink框架管理的状态;而原始状态是由用户自行管理状态的具体数据结构,框架在做checkpoint的时候,使用bytes数组读写状态内容,对其内部数据结构一无所知。
通常所有的datastreamfunctiоns都可以使用托管状态,但是原始状态接口仅仅能够在实现operators的时候使用。
推荐使用managedstate而不是使用rawstate,因为使用托管状态的时候Flink可以在parallelism发生改变的情况下能够动态重新分配状态,而且还能更好的进行内存管理。
五、State的使用
其实flink中的sum(),reduce()等聚合算子都是有状态的计算,我们不妨看看他们的源码,里面肯定使用了KeyedStates或者OperatorStates,我们以sum()算子源码举例,sum底层调用了StreamGroupedReduce,看到源码发现真的使用了ValueState,相信看完源码的你已经会自定义带有状态的算子了。
publicclassStreamGroupedReduce<IN>extendsAbstractUdfStreamOperator<IN,ReduceFunction<IN>>
implementsOneInputStreamOperator<IN,IN>{
privatestaticfinallongserialVersionUID=1L;
privatestaticfinalStringSTATE_NAME=”_op_state”;
privatetransientValueState<IN>values;
privateTypeSerializer<IN>serializer;
publicStreamGroupedReduce(ReduceFunction<IN>reducer,TypeSerializer<IN>serializer){
super(reducer);
this.serializer=serializer;
}
@Override
publicvoidopen()throwsException{
super.open();
ValueStateDescriptor<IN>stateId=newValueStateDescriptor<>(STATE_NAME,serializer);
values=getPartitionedState(stateId);
}
@Override
publicvoidprocessElement(StreamRecord<IN>element)throwsException{
INvalue=element.getValue();
INcurrentValue=values.value();
//如果任务不是第一次启动,也就是hdfs中已经存储过中间状态
if(currentValue!=null){
INreduced=userFunction.reduce(currentValue,value);
values.update(reduced);
output.collect(element.replace(reduced));
}
//如果任务第一次启动
else{
values.update(value);
output.collect(element.replace(value));
}
}
}
六、State过期时间TTL
使用flink进行实时计算中,会遇到一些状态数不断累积,导致状态量越来越大的情形。
例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的GROUPBY语句,以及执行了没有时间窗口限制的双流JOIN等等操作。
对于这些情况,经常导致堆内存出现OOM,或者堆外内存(RocksDB)用量持续增长导致超出容器的配额上限,造成作业的频繁崩溃。从Flink1.6版本开始引入了StateTTL特性,该特性可以允许对作业中定义的Keyed状态进行超时自动清理,对于TableAPI和SQL模块引入了空闲状态保留时间(IdleStateRetentionTime)进行状态管理,下面我们具体介绍一下。
1、StateTTL功能的用法
在Flink的官方文档中给我们展示了StateTTL的基本用法,用法示例如下:
importorg.apache.flink.api.common.state.StateTtlConfig;
importorg.apache.flink.api.common.state.ValueStateDescriptor;
importorg.apache.flink.api.common.time.Time;
StateTtlConfigttlConfig=StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String>stateDescriptor=newValueStateDescriptor<>(“textstate”,String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
可以看到,要使用StateTTL功能,首先要定义一个StateTtlConfig对象。这个StateTtlConfig对象可以通过构造器模式(BuilderPattern)来创建,典型地用法是传入一个Time对象作为TTL时间,然后设置更新类型(UpdateType)和状态可见性(StateVisibility),这两个功能的含义将在下面的文章中详细描述。当StateTtlConfig对象构造完成后,即可在后续声明的状态描述符(StateDescriptor)中启用StateTTL功能了。
从上述的代码也可以看到,StateTTL功能所指定的过期时间并不是全局生效的,而是和某个具体的状态所绑定。换而言之,如果希望对所有状态都生效,那么就需要对所有用到的状态定义都传入StateTtlConfig对象。对Flink源码感兴趣的同学,可以尝试为Flink增加一个默认的StateTTL选项,实现起来很简单,这里不再展开说明了。
StateTTL使用的更多案例,可以参见官方的flink-stream-state-ttl-test包,它提供了很多测试用例可以参考。
2、StateTtlConfig的参数说明
TTL:表示状态的过期时间,是一个org.apache.flink.api.common.time.Time对象。一旦设置了TTL,那么如果上次访问的时间戳+TTL超过了当前时间,则表明状态过期了(这是一个简化的说法,严谨的定义请参考org.apache.flink.runtime.state.ttl.TtlUtils类中关于expired的实现)。
UpdateType:表示状态时间戳的更新的时机,是一个Enum对象。如果设置为Disabled,则表明不更新时间戳;如果设置为OnCreateAndWrite,则表明当状态创建或每次写入时都会更新时间戳;如果设置为OnReadAndWrite,则除了在状态创建和写入时更新时间戳外,读取也会更新状态的时间戳。
StateVisibility:表示对已过期但还未被清理掉的状态如何处理,也是Enum对象。如果设置为ReturnExpiredIfNotCleanedUp,那么即使这个状态的时间戳表明它已经过期了,但是只要还未被真正清理掉,就会被返回给调用方;如果设置为NeverReturnExpired,那么一旦这个状态过期了,那么永远不会被返回给调用方,只会返回空状态,避免了过期状态带来的干扰。
TimeCharacteristic以及TtlTimeCharacteristic:表示StateTTL功能所适用的时间模式,仍然是Enum对象。前者已经被标记为Deprecated(废弃),推荐新代码采用新的TtlTimeCharacteristic参数。截止到Flink1.8,只支持ProcessingTime一种时间模式,对EventTime模式的StateTTL支持还在开发中。
CleanupStrategies:表示过期对象的清理策略,目前来说有三种Enum值。当设置为FULL_STATE_SCAN_SNAPSHOT时,对应的是EmptyCleanupStrategy类,表示对过期状态不做主动清理,当执行完整快照(Snapshot/Checkpoint)时,会生成一个较小的状态文件,但本地状态并不会减小。
唯有当作业重启并从上一个快照点恢复后,本地状态才会实际减小,因此可能仍然不能解决内存压力的问题。为了应对这个问题,Flink还提供了增量清理的枚举值,分别是针对HeapStateBackend的INCREMENTAL_CLEANUP(对应IncrementalCleanupStrategy类),以及对RocksDBStateBackend有效的ROCKSDB_COMPACTION_FILTER(对应RocksdbCompactFilterCleanupStrategy类)。
对于增量清理功能,Flink可以被配置为每读取若干条记录就执行一次清理操作,而且可以指定每次要清理多少条失效记录;对于RocksDB的状态清理,则是通过JNI来调用C++语言编写的FlinkCompactionFilter来实现,底层是通过RocksDB提供的后台Compaction操作来实现对失效状态过滤的。
配置中有下面几个配置项可以选择:StateTtlConfig中的newBuilder这个方法是必须的,它是设置生存周期的值。
TTL刷新策略(默认OnCreateAndWrite)策略类型描述StateTtlConfig.UpdateType.Disabled禁用TTL,永不过期StateTtlConfig.UpdateType.OnCreateAndWrite每次写操作都会更新State的最后访问时间StateTtlConfig.UpdateType.OnReadAndWrite每次读写操作都会跟新State的最后访问时间状态可见性(默认NeverReturnExpired)策略类型描述StateTtlConfig.StateVisibility.NeverReturnExpired永不返回过期状态StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp可以返回过期但尚未被清理的状态值3、Notes:
状态后端存储最后一次修改的时间戳和值,这意味着启用该特性会增加状态存储的消耗。堆状态后端在内存中存储一个附加的Java对象,其中包含对用户状态对象的引用和一个原始长值。RocksDB状态后端为每个存储值、列表条目或映射条目添加8个字节;
目前只支持与处理时间相关的TTLs;
如果试图使用启用TTL的描述符或使用启用TTL的描述符恢复先前在没有TTL的情况下配置的状态,将导致兼容性失败和statmigration异常;
TTL配置不是check-orsavepoints的一部分,而是Flink在当前运行的作业中如何处理它的一种方式
七、State清除策略
1、Cleanupinfullsnapshot默认情况下,过期值只有在显式读出时才会被删除,例如通过调用ValueState.value()方法。此外,您可以在获取完整状态快照时激活清理操作,这将减少其大小。在当前实现下,本地状态不会被清除,但在从前一个快照恢复时,它不会包含已删除的过期状态。可以在StateTtlConfig中配置。(1)下面的配置选项不适用于RocksDBstatebackend上的increamentalcheckpointing;(2)对于现有作业,此清理策略可以在StateTtlConfig中随时激活或停用,例如从保存点重新启动后可以使用。
importorg.apache.flink.api.common.state.StateTtlConfig
importorg.apache.flink.api.common.time.Time
valttlConfig=StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot
.build
2、Incrementalcleanup
另一个选项是增量地触发对某些状态项的清理。触发器可以是来自每个状态访问或/和每个记录处理的回调。如果这个清理策略在某个状态下活跃的,那么存储后端会在其所有条目上为该状态保留一个惰性全局迭代器。
每次触发增量清理时,迭代器都会被提升。检查遍历的状态项,并清理过期的状态项。这个特性可以在StateTtlConfig中激活:
importorg.apache.flink.api.common.state.StateTtlConfig
valttlConfig=StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupIncrementally
.build
上面的策略有两个参数,第一个参数:第是每次清理触发的检查状态的条件。如果启用,则每次状态访问都将触发它。第二个参数:是否为每个记录处理额外触发清理。Notes:
如果对状态没有访问或者没有任何处理的记录,那么状态会一直保留;
增量状态的清理增加了记录处理的延迟;
目前,增量状态的清理策略仅仅在对堆状态后端被实现了,对于设置了RocksDB的将没有效果;
如果使用堆状态后端进行同步快照,全局迭代器在跌倒时会保留所有键的副本,因为它的特性不支持对并发数的修改。使用此功能将增加内存消耗。异步快照进行对状态的保存就没有这种情况发生;
对于现有的作业,可以通过在StateTtlConfig中设置这种清理策略能够随时被激活和停用,例如:从保存点重新启动后。
3、CleanupduringRocksDBcompaction如果使用RocksDB进行状态的管理,另一个清理策略就是激活Flink的压缩过滤这个策略。RocksDB会定期使用异步压缩来合并状态的更新和减少储存。Flink压缩过滤器使用TTL检查状态的过期时间戳,并排除过期值。默认情况下是关闭该特性的。对于RocksDB进行状态管理首先要做的就是要激活,通过Flink配置文件配置state.backend.rocksdb.ttl.compaction.filter.enabled,或者对于一个Flinkjob来说如果一个自定义的RocksDB状态管理被创建那么它可以调用RocksDBStateBackend::enableTtlCompactionFilter。然后任何带有TTL的状态都可以配置来去使用过滤器。
importorg.apache.flink.api.common.state.StateTtlConfig
valttlConfig=StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter
.build
RocksDBcompactionfilter将会从Flink每次处理完一定数据量的状态之后,从Flink查询用于检查过期的当前时间戳,这个数字默认是1000。你也可以选择更改它,并将自定义值传递给StateTtlConfig.newBuilder(…)。cleanupInRocksdbCompactFilter(longqueryTimeAfterNumEntries)方法。频繁的跟新时间错可以提高清理的数据但是会降低压缩性能,因为它使用了来自本地的JNI的调用。
Notes:
在压缩过程中调用TTL过滤器会减慢它的速度。TTL过滤器必须解析上次访问的时间戳,并检查每个正在压缩的键的每个存储状态条目的过期时间。对于集合状态类型(列表或映射),每个存储的元素也调用该检查;对于现有作业,此清理策略可以在StateTtlConfig中随时激活或停用,例如从保存点重新启动后。
目前,管理operatorstate仅仅支持使用List类型。当前,支持List样式的托管运算符状态,彼此之间相互独立,因此可以在重新缩放时可以重新分配。换句话说,这些对象是可以重新分配non-keyedstate的最佳粒度。根据状态访问方法,定义一下重新分配方案。
八、TableAPI和SQL模块状态管理
上面介绍了FlinkStateTTL机制,这项机制对于应对通用的状态暴增特别有效。
然而,这个特性也有其缺陷,例如不能保证一定可以及时清理掉失效的状态,以及目前仅支持ProcessingTime时间模式等等,另外对于旧版本的Flink(1.6之前),StateTTL功能也无法使用。
针对TableAPI和SQL模块的持续查询/聚合语句,Flink还提供了另一项失效状态清理机制,这就是本文要提到的IdleStateRetentionTime选项,Flink很早就提供了这个选项,该特性是借助QueryConfiguration配置项来定义的,但很多人并未启用,也不理解其中隐藏的暗坑。本文将对这一特性做说明,并给出一些使用建议。
1、问题引入
同样以官网文档的案例为起点,一个持续查询的GROUPBY语句,它没有时间窗口的定义,理论上会无限地计算下去:
SELECTsessionId,COUNT(*)FROMclicksGROUPBYsessionId;
这就带来了一个问题:随着时间的不断推进,内存中积累的状态会越来越多,因为数据流是无穷无尽、持续流入的,Flink并不知道如何丢弃旧的数据。在这种情况下,如果放任不管,那么迟早有一天作业的状态数达到了存储系统的容量极限,从而造成作业的崩溃。
针对这个问题,Flink提出了空闲状态保留时间(IdleStateRetentionTime)的概念。
通过为每个状态设置Timer,如果这个状态中途被访问过,则重新设置Timer;否则(如果状态一直未被访问,长期处于Idle状态)则在Timer到期时做状态清理。
这样,就可以确保每个状态都能得到及时的清理。
通过调用StreamQueryConfig的withIdleStateRetentionTime方法,可以为这个QueryConfig对象设置最小和最大的清理周期。
这样,Flink可以保证最早和最晚的状态清理时间。
需要注意的是,旧版本Flink允许只指定一个参数,表示最早和最晚清理周期相同,但是这样可能会导致同一时间段有很多状态都到期,从而造成瞬间的处理压力。
新版本的Flink要求两个参数之间的差距至少要达到5分钟,从而避免大量状态瞬间到期,对系统造成的冲击。
StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min,max);
或者
StreamQueryConfigqConfig=…
//setidlestateretentiontime:min=12hours,max=24hours
qConfig.withIdleStateRetentionTime(Time.hours(12),Time.hours(24));
注意:默认情况下StreamQueryConfig的设置并不是全局的。因此当设置了清理周期以后,需要在StreamTableEnvironment类调用toAppendStream或toRetractStream将Table转为DataStream时,显式传入这个QueryConfig对象作为参数,才可以令该功能生效。
为什么会有这种限制呢?看一下源码就知道了。
2、如何实现的
idlestateretentiontime特性在底层以o.a.f.table.runtime.functiоns.CleanupState接口来表示,代码如下。
publicinterfaceCleanupState{
defaultvoidregisterProcessingCleanupTimer(
ValueState<Long>cleanupTimeState,
longcurrentTime,
longminRetentionTime,
longmaxRetentionTime,
TimerServiсеtimerService)
throwsException{
//lastregisteredtimer
LongcurCleanupTime=cleanupTimeState.value();
//checkifacleanuptimerisregisteredand
//thatthecurrentcleanuptimerwon’tdeletestateweneedtokeep
if(curCleanupTime==null||(currentTime+minRetentionTime)>curCleanupTime){
//weneedtoregisteranew(later)timer
longcleanupTime=currentTime+maxRetentionTime;
//registertimerandrememberclean-uptime
timerService.registerProcessingTimeTimer(cleanupTime);
//deleteexpiredtimer
if(curCleanupTime!=null){
timerService.deleteProcessingTimeTimer(curCleanupTime);
}
cleanupTimeState.update(cleanupTime);
}
}
}
由上可知,每个key对应的最近状态清理时间会单独维护在ValueState中。如果满足以下两条件之一:
ValueState为空(即这个key是第一次出现)或者当前时间加上minRetentionTime已经超过了最近清理的时间
就用当前时间加上maxRetentionTime注册新的Timer,并将其时间戳存入ValueState,用于触发下一次清理。如果有已经过期了的Timer,则一并删除之。可见,如果minRetentionTime和maxRetentionTime的间隔设置太小,就会比较频繁地产生Timer与更新ValueState,维护Timer的成本会变大
新版本的Flink提供了一个QueryConfigProvider类(它实现了PlannerConfig接口,允许嵌入一个StreamQueryConfig对象),可以通过对TableConfig设置PlannerConfig的方式(调用addPlannerConfig方法),来传入设置好StreamQueryConfig对象的QueryConfigProvider.这样,当StreamPlanner将定义的Table翻译为Plan时,可以自动使用之前定义的StreamQueryConfig,从而实现全局的StreamQueryConfig设定。对于旧的Flink版本,只能通过修改源码的方式来设置,较为繁琐。
3、Flink1.9.0状态的新功能(StateProcessorAPI)
能够在外部直接访问Flink任务的状态是一个社区呼声比较高的需求,在Flink的最新版本1.9.0中就引入了StateProcessorAPI,该API让用户可以通过FlinkDataSet作业来灵活读取、写入和修改Flink的Savepoint和Checkpoint。
ApacheFlink的状态处理器API提供了强大的功能,可使用Flink的批处理DataSetAPI读取,写入和修改保存点和检查点。这对于诸如分析有趣模式的状态,通过检查差异进行故障排除或审核作业以及为新应用程序引导状态之类的任务很有用。
回复【实时数仓】获取实时数仓完整视频教程一套回复【离线数仓】获取离线数仓完整视频教程一套回复【数据治理】获取数据治理全套pdf回复【面X】获取大数据所有组件面X回复【笔X】获取X友真实现场笔X照片回复【进X】和大佬咨询沟通开发和线上问题