各位老师好: 执行下面代码导致所有节点磁盘占满,在本地调试时C盘也沾满了 文件名称:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa 说明:
1. 批处理模式 2.本地测试时输入目录oneDay和long大小在1G左右,启动程序后会把C(C:\Users\xxx \AppData\Local\Temp)盘剩余的几十G空间占满,部署到集群后,也会逐渐占满各节点磁盘 3.广播流blackListStream大概一万条记录,尝试把process中获取广播变量的代码和processBroadcastElement方法注释了,仍不起作用 String oneDayLogFile = "C:\\Users\\xianghuibai\\Desktop\\oneDay"; String historyFileName = "C:\\Users\\xianghuibai\\Desktop\\long"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); DataStream<String> blackListStream = env.fromCollection(RedisPool20484Utils.getCustomJedisCluster().smembers("user_blacklist_cid_test")); MapStateDescriptor<String, Boolean> type = new MapStateDescriptor<String, Boolean>("blackList_type", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO); BroadcastStream<String> blackList_b = blackListStream.broadcast(type); DataStream<Tuple5<String, String, String, String, String>> oneDayLog = env.readTextFile(oneDayLogFile) .map(new MapFunction<String, Tuple5<String, String, String, String, String>>() { @Override public Tuple5<String, String, String, String, String> map(String line) throws Exception { String[] arrs = line.split("\t"); return new Tuple5<>(arrs[0], arrs[1], arrs[2], arrs[3], arrs[4]); } }); SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> dayOutput = env.readTextFile(historyFileName) .flatMap(new FlatParseLong()) .union(oneDayLog) .connect(blackList_b) .process(new BroadcastProcessFunction<Tuple5<String, String, String, String, String>, String, Tuple5<String, String, String, String, String>>() { private transient ReadOnlyBroadcastState<String, Boolean> broadcastState; @Override public void processElement(Tuple5<String, String, String, String, String> value, ReadOnlyContext ctx, Collector<Tuple5<String, String, String, String, String>> out) throws Exception { if(broadcastState == null){ broadcastState = ctx.getBroadcastState(type); } if(value!=null && !broadcastState.contains(value.f0)){ out.collect(value); } } @Override public void processBroadcastElement(String value, Context ctx, Collector<Tuple5<String, String, String, String, String>> out) throws Exception { if(StringUtils.isNotEmpty(value)){ BroadcastState<String, Boolean> broadcastState = ctx.getBroadcastState(type); broadcastState.put(value, true); } } });