各位老师好:
执行下面代码导致所有节点磁盘占满,在本地调试时C盘也沾满了
文件名称:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa
说明:

1.  批处理模式
2.本地测试时输入目录oneDay和long大小在1G左右,启动程序后会把C(C:\Users\xxx 
\AppData\Local\Temp)盘剩余的几十G空间占满,部署到集群后,也会逐渐占满各节点磁盘

3.广播流blackListStream大概一万条记录,尝试把process中获取广播变量的代码和processBroadcastElement方法注释了,仍不起作用



        String oneDayLogFile = "C:\\Users\\xianghuibai\\Desktop\\oneDay";
        String historyFileName = "C:\\Users\\xianghuibai\\Desktop\\long";

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        DataStream<String> blackListStream = 
env.fromCollection(RedisPool20484Utils.getCustomJedisCluster().smembers("user_blacklist_cid_test"));

        MapStateDescriptor<String, Boolean> type =
                new MapStateDescriptor<String, Boolean>("blackList_type", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
        BroadcastStream<String> blackList_b = blackListStream.broadcast(type);

        DataStream<Tuple5<String, String, String, String, String>> oneDayLog = 
env.readTextFile(oneDayLogFile)
                .map(new MapFunction<String, Tuple5<String, String, String, 
String, String>>() {
                    @Override
                    public Tuple5<String, String, String, String, String> 
map(String line) throws Exception {
                        String[] arrs = line.split("\t");
                        return new Tuple5<>(arrs[0], arrs[1], arrs[2], arrs[3], 
arrs[4]);
                    }
                });

        SingleOutputStreamOperator<Tuple5<String, String, String, String, 
String>> dayOutput = env.readTextFile(historyFileName)
                .flatMap(new FlatParseLong())
                .union(oneDayLog)
                .connect(blackList_b)
                .process(new BroadcastProcessFunction<Tuple5<String, String, 
String, String, String>, String, Tuple5<String, String, String, String, 
String>>() {
                    private transient ReadOnlyBroadcastState<String, Boolean> 
broadcastState;

                    @Override
                    public void processElement(Tuple5<String, String, String, 
String, String> value, ReadOnlyContext ctx, Collector<Tuple5<String, String, 
String, String, String>> out) throws Exception {
                        if(broadcastState == null){
                            broadcastState = ctx.getBroadcastState(type);
                        }
                        if(value!=null && !broadcastState.contains(value.f0)){
                            out.collect(value);
                        }
                    }
                    @Override
                    public void processBroadcastElement(String value, Context 
ctx, Collector<Tuple5<String, String, String, String, String>> out) throws 
Exception {
                        if(StringUtils.isNotEmpty(value)){
                            BroadcastState<String, Boolean> broadcastState = 
ctx.getBroadcastState(type);
                            broadcastState.put(value, true);
                        }
                    }
                });

回复