Hi Simone, The problem is that the Java 1.8 compiler cannot do type inference when chaining methods [1].
The solution would be WatermarkStrategy<Event> wmStrategy = WatermarkStrategy .<Event>forMonotonousTimestamps() .withTimestampAssigner((event, timestamp) -> { return event.getTime(); }); @Aljoscha Krettek <aljos...@apache.org> I think we need to update the documentation about it. We have some examples which don't take this into account. [1] https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/ Cheers, Till On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <cavalla...@hotmail.com> wrote: > Hi, > > I'm taking the timestamp from the event payload that I'm receiving from > Kafka. > > I'm struggling to get the time and I'm confused on how I should use the > function ".withTimestampAssigner()". I'm receiving an error on event. > getTime() that is telling me: *"cannot resolve method "Get Time" in > "Object"* and I really don't understand how I can fix it. My class is > providing a long so the variable itself should be fine. Any help would be > really appreciated. > > *This is my code:* > > * FlinkKafkaConsumer<Event> kafkaData =* > * new FlinkKafkaConsumer("CorID_0", new > EventDeserializationSchema(), p);* > * WatermarkStrategy<Event> wmStrategy =* > * WatermarkStrategy* > * .forMonotonousTimestamps()* > * .withTimestampAssigner((event, timestamp) -> { > return event.**getTime();* > * });* > > * DataStream<Event> stream = env.addSource(* > * kafkaData.assignTimestampsAndWatermarks(wmStrategy));* > > > And to give you the idea of the whole project, > > *This is the EventDeserializationSchema class:* > > *public class EventDeserializationSchema implements > DeserializationSchema<Event> {* > > * private static final long serialVersionUID = 1L;* > > > * private static final CsvSchema schema = CsvSchema.builder()* > * .addColumn("firstName")* > * .addColumn("lastName")* > * .addColumn("age", CsvSchema.ColumnType.NUMBER)* > * .addColumn("time")* > * .build();* > > * private static final ObjectMapper mapper = new CsvMapper();* > > * @Override* > * public Event deserialize(byte[] message) throws IOException {* > * return > mapper.readerFor(Event.class).with(schema).readValue(message);* > * }* > > * @Override* > * public boolean isEndOfStream(Event nextElement) {* > * return false;* > * }* > > * @Override* > * public TypeInformation<Event> getProducedType() {* > > * return TypeInformation.of(Event.class);* > * }* > *}* > > *And this is the Event Class:* > > *public class Event implements Serializable {* > * public String firstName;* > * public String lastName;* > * private int age;* > * public Long time;* > > > > * public Event() {* > * }* > > * public String getFirstName() {* > * return firstName;* > * }* > > * public void setFirstName(String firstName) {* > * this.firstName = firstName;* > * }* > > * public String getLastName() {* > * return lastName;* > * }* > > * public void setLastName(String lastName) {* > * this.lastName = lastName;* > * }* > > * public int getAge() {* > * return age;* > * }* > > * public void setAge(int age) {* > * this.age = age;* > * }* > > * public long getTime() {* > * return time;* > * }* > > * public void setTime(String kafkaTime) {* > * long tn = > OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();* > * this.time = tn;* > * }* > *}* > > > > > >