Yes, we are using Event Time.
Yes, Flink program is kept running in the IDE, i.e. eclipse and not closed,
after the first batch of events and when adding the second batch.
Yes, We do have acustom timestamp/watermark assigner, implemented as

Are we using the properties for Kafka correctly?
We are using Flink 1.1.1 and Flink Kafka connector:

More about the behavior:
I have noticed that sometimes even after the first writing to the Kafka
queue,  and when the Flink program runs, sometimes it does process the
queue immediately. We need to restart. This is quite random.

Following is the rough outline of our code.

public class SlidingWindow2{

public static void main(String[] args) throws Exception {

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.get


// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "demo");
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset" ,"earliest");

                FlinkKafkaConsumer09<Tuple5<String, String, Float, Float,
String>> consumer = new FlinkKafkaConsumer09<>(
"test",            // kafka topic name
new dataSchema(),
                DataStream<Tuple5<String, String, Float, Float, String>>
stream1 = env.addSource(consumer);
                DataStream<Tuple5<String, String, Float, Float, String>>
keyedStream = stream1.assignTimestampsAndWatermarks(new

.window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2)))
.apply(new CustomSlidingWindowFunction());

                env.execute("Sliding Event Time Window Processing");


public static class CustomSlidingWindowFunction implements
WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String,
String, Float, Float, String>, Tuple, TimeWindow>{

public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String,
String, Float, Float, String>> input,
Collector<Tuple5<String, String, Float, Float, String>> out) throws
Exception {


// Implemented custom Periodic Watermark as below from public static class
BoundedOutOfOrdernessGenerator2 implements
AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float,
String>> { /** * */ private static final long serialVersionUID = 1L;
private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in
seconds private long currentMaxTimestamp; @Override public long
extractTimestamp(Tuple5<String, String, Float, Float, String> element, long
previousElementTimestamp) { //System.out.println("inside
extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat =
new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); try { parseDate =
dateFormat.parse(element.f0); } catch (ParseException e) {
e.printStackTrace(); } long timestamp = parseDate.getTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return
timestamp; } @Override public Watermark getCurrentWatermark() { // return
the watermark as twice the current highest timestamp minus the
out-of-orderness bound // this is because it is not covering the lateness
sufficiently; now it does // in future this may be multiple of 3 or more if
necessary to cover the gap in records received return new
Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }

> a bit more information would be useful. Are you using event-time? Is the
> Flink program kept running after adding the first batch of events and when
> adding the second batch or is it to invocations of your Flink program? Do
> you have a custom timestamp/watermark assigner?
>> We are using a sliding window function to process data read from Kafka
>> Stream. We are using FlinkKafkaConsumer09 to read the data. The window
>> function and sink are running correctly.
>> To test the program, we are generating a stream of data from command line.
>> This works when we add set of records once. When we add again, it does
>> not work, Flink produces no result, even though the records are added to
>> same Kafka topic from the same command line instance.
>> Please could you suggest what could be wrong.
