14、Flink 实战 - Keyed State状态管理之MapState使用案例

一、MapState的方法

MapState的方法和Java的Map的方法极为相似,所以上手相对容易。
常用的有如下:

  • get()方法获取值
  • put(),putAll()方法更新值
  • remove()删除某个key
  • contains()判断是否存在某个key
  • isEmpty() 判断是否为空
     

二、定义MapStateDescriptor和获取MapState

MapStateDescriptor<String, Integer> diffCountDescriptor = new MapStateDescriptor<String, Integer>(
        "diff-count-map",//state的id
        String.class,//key的类型
        Integer.class);//value的类型
diffCountMap = getRuntimeContext().getMapState(diffCountDescriptor);

三、统计温度升高/降低超过阈值的次数

程序主题和上一篇博客Keyed State状态之ValueState一样

public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, SensorRecord, Tuple3<String, Integer, Integer>> {
    private int tempDiff;
    public MyKeyedProcessFunction(int tempDiff) {
        this.tempDiff = tempDiff;
    }
    //上次温度
    private transient ValueState<Double> lastTemp;
    //温度升高/降低超过预警值的次数
    private transient MapState<String, Integer> diffCountMap;
    private String upKey = "upKey";
    private String downKey = "downKey";
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ValueStateDescriptor<Double> lastTempDescriptor = new ValueStateDescriptor<Double>(
                "last-temp",
                Double.class);
        lastTemp = getRuntimeContext().getState(lastTempDescriptor);
        MapStateDescriptor<String, Integer> diffCountDescriptor = new MapStateDescriptor<String, Integer>(
                "diff-count-map",
                String.class,
                Integer.class);
        diffCountMap = getRuntimeContext().getMapState(diffCountDescriptor);
    }
    @Override
    public void processElement(SensorRecord value, Context ctx, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
        //第一条数据,需要处理
        if (lastTemp.value() == null) {
            lastTemp.update(Double.MIN_VALUE);
            if (!diffCountMap.contains(upKey)){
                diffCountMap.put(upKey, 0);
            }
            if (!diffCountMap.contains(downKey)){
                diffCountMap.put(downKey, 0);
            }
        }
        else {
            boolean needOut = false;
            //温度升高超过阈值
            if (value.getRecord() - lastTemp.value() > tempDiff){
                diffCountMap.put(upKey, diffCountMap.get(upKey) + 1);
                needOut = true;
            }
            //温度降低超过阈值
            else if (lastTemp.value() - value.getRecord() > tempDiff) {
                diffCountMap.put(downKey, diffCountMap.get(downKey) + 1);
                needOut = true;
            }
            if (needOut && !diffCountMap.isEmpty()){
                out.collect(Tuple3.of(value.getId(), diffCountMap.get(upKey), diffCountMap.get(downKey)));
            }
        }
        if (value.getRecord() != null) {
            lastTemp.update(value.getRecord());
        }
    }
}