Hi Folks,

We are working on a Flink job to proccess a large amount of data coming in
from a Kafka stream.

We selected Flink because the data is sometimes out of order or late, and we
need to roll up the data into 30-minutes event time windows, after which we
are writing it back out to an s3 bucket.

We have hit a couple issues:

1) The job works fine using processing time, but when we switch to event
time (almost) nothing seems to be written out.
Our watermark code looks like this:
```
  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness);
  }
```
And we are doing this:
```
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
```
and this:
```
    .assignTimestampsAndWatermarks(new
TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds))
```

However even though we get millions of records per hour (the vast majority
of which are no more that 30 minutes late) we get like 2 - 10 records per
hour written out to the s3 bucket.
We are using a custom BucketingFileSink Bucketer if folks believe that is
the issue I would be happy to provide that code here as well.

2) On top of all this, we would really prefer to write the records directly
to Aurora in RDS rather than to an intermediate s3 bucket, but it seems that
the JDBC sink connector is unsupported / doesn't exist.
If this is not the case we would love to know.

Thanks in advance for all the help / insight on this,

Max Walker



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to