Hi,
I'm taking the timestamp from the event payload that I'm receiving from Kafka.
I'm struggling to get the time and I'm confused on how I should use the
function ".withTimestampAssigner()". I'm receiving an error on event.getTime()
that is telling me: "cannot resolve method "Get Time" in "Object" and I really
don't understand how I can fix it. My class is providing a long so the
variable itself should be fine. Any help would be really appreciated.
This is my code:
FlinkKafkaConsumer<Event> kafkaData =
new FlinkKafkaConsumer("CorID_0", new
EventDeserializationSchema(), p);
WatermarkStrategy<Event> wmStrategy =
WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> { return
event.getTime();
});
DataStream<Event> stream = env.addSource(
kafkaData.assignTimestampsAndWatermarks(wmStrategy));
And to give you the idea of the whole project,
This is the EventDeserializationSchema class:
public class EventDeserializationSchema implements DeserializationSchema<Event>
{
private static final long serialVersionUID = 1L;
private static final CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();
private static final ObjectMapper mapper = new CsvMapper();
@Override
public Event deserialize(byte[] message) throws IOException {
return mapper.readerFor(Event.class).with(schema).readValue(message);
}
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}
And this is the Event Class:
public class Event implements Serializable {
public String firstName;
public String lastName;
private int age;
public Long time;
public Event() {
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public long getTime() {
return time;
}
public void setTime(String kafkaTime) {
long tn = OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();
this.time = tn;
}
}