KeySelector的确定性问题

文/李鹏飞

Flink支持自定义KeySelector,从注释(见下图)可以看到对于同一个input要保证多次调用获取的key是相同的,否则作业会遇到异常和正确性问题。

Flink KeySelector相关代码注释

举一个实际中遇到的例子,用户自定义的KeySelector如下所示,为每个input随机生成一个[0, 10000)之间的key,作业可以简化为两级,即source经过keyBy后接timeWindow。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Custom KeySelector.
*/
class UidKeySelector implements KeySelector<Input, Integer> {

private static final long serialVersionUID = 1L;

@Override
public Integer getKey(Input in) throws Exception {
return ThreadLocalRandom.current().nextInt(10000);
}
}

// job topology
source.keyBy(new UidKeySelector())
.timeWindow(Time.of(5, TimeUnit.MINUTES), );

作业在实际运行中会遇到如下异常,timer sercive在注册timer时发现某个key所属的key group不属于该并发。

异常信息

异常的原因在于用户的KeySelector是非确定性的,使用ThreadLocalRandom导致对于同一个input多次调用返回的key可能是不同的。source在向下游发送数据时会根据key计算其所在的key group,进而决定发送到下游包含该key group的并发;在下游的window中,timer service会根据key计算其所在的key group,并验证是否属于当前并发。如果key是random的,那么下游计算得到的key group与上游是不同的,就有可能不属于当前并发,所以校验失败出现上述异常。

值得注意的是用户不一定能及时的发现这个问题,如果将timeWindow换成countWindow,因为不涉及到timer service的使用,就不会对key group进行校验从而抛出异常,因此问题无法暴露出来。

#

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×