Hi Miki,

Have you enabled checkpointing?

Kostas

> On Jun 5, 2018, at 11:14 AM, miki haiat <miko5...@gmail.com> wrote:
> 
> Im trying to write some data to Hadoop by using this code 
> 
> The state backend is set without time    
> StateBackend sb = new 
> FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints");
> env.setStateBackend(sb);
> BucketingSink<Tuple2<IntWritable, Text>> sink =
>         new BucketingSink<>("hdfs://****:9000/mycity/raw");
> sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
> sink.setInactiveBucketCheckInterval(120000);
> sink.setInactiveBucketThreshold(120000);
> the result is that all the files are stuck in in.programs  status and not 
> closed.
> is it related to the state backend configuration.
> 
> thanks,
> 
> Miki 
> 

Reply via email to