Im trying to write some data to Hadoop by using this code

The state backend is set without time

StateBackend sb = new
FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints");
env.setStateBackend(sb);

BucketingSink<Tuple2<IntWritable, Text>> sink =
        new BucketingSink<>("hdfs://****:9000/mycity/raw");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setInactiveBucketCheckInterval(120000);
sink.setInactiveBucketThreshold(120000);

the result is that all the files are stuck in* in.programs  *status and not
closed.
is it related to the state backend configuration.

thanks,

Miki

Reply via email to