Hi Miki, Have you enabled checkpointing?
Kostas > On Jun 5, 2018, at 11:14 AM, miki haiat <miko5...@gmail.com> wrote: > > Im trying to write some data to Hadoop by using this code > > The state backend is set without time > StateBackend sb = new > FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints"); > env.setStateBackend(sb); > BucketingSink<Tuple2<IntWritable, Text>> sink = > new BucketingSink<>("hdfs://****:9000/mycity/raw"); > sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")); > sink.setInactiveBucketCheckInterval(120000); > sink.setInactiveBucketThreshold(120000); > the result is that all the files are stuck in in.programs status and not > closed. > is it related to the state backend configuration. > > thanks, > > Miki >