Hi Simone,
The problem is that the Java 1.8 compiler cannot do type inference when
chaining methods [1].
The solution would be
WatermarkStrategy<Event> wmStrategy =
WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> {
return event.getTime();
});
@Aljoscha Krettek <[email protected]> I think we need to update the
documentation about it. We have some examples which don't take this into
account.
[1]
https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/
Cheers,
Till
On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <[email protected]>
wrote:
> Hi,
>
> I'm taking the timestamp from the event payload that I'm receiving from
> Kafka.
>
> I'm struggling to get the time and I'm confused on how I should use the
> function ".withTimestampAssigner()". I'm receiving an error on event.
> getTime() that is telling me: *"cannot resolve method "Get Time" in
> "Object"* and I really don't understand how I can fix it. My class is
> providing a long so the variable itself should be fine. Any help would be
> really appreciated.
>
> *This is my code:*
>
> * FlinkKafkaConsumer<Event> kafkaData =*
> * new FlinkKafkaConsumer("CorID_0", new
> EventDeserializationSchema(), p);*
> * WatermarkStrategy<Event> wmStrategy =*
> * WatermarkStrategy*
> * .forMonotonousTimestamps()*
> * .withTimestampAssigner((event, timestamp) -> {
> return event.**getTime();*
> * });*
>
> * DataStream<Event> stream = env.addSource(*
> * kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>
>
> And to give you the idea of the whole project,
>
> *This is the EventDeserializationSchema class:*
>
> *public class EventDeserializationSchema implements
> DeserializationSchema<Event> {*
>
> * private static final long serialVersionUID = 1L;*
>
>
> * private static final CsvSchema schema = CsvSchema.builder()*
> * .addColumn("firstName")*
> * .addColumn("lastName")*
> * .addColumn("age", CsvSchema.ColumnType.NUMBER)*
> * .addColumn("time")*
> * .build();*
>
> * private static final ObjectMapper mapper = new CsvMapper();*
>
> * @Override*
> * public Event deserialize(byte[] message) throws IOException {*
> * return
> mapper.readerFor(Event.class).with(schema).readValue(message);*
> * }*
>
> * @Override*
> * public boolean isEndOfStream(Event nextElement) {*
> * return false;*
> * }*
>
> * @Override*
> * public TypeInformation<Event> getProducedType() {*
>
> * return TypeInformation.of(Event.class);*
> * }*
> *}*
>
> *And this is the Event Class:*
>
> *public class Event implements Serializable {*
> * public String firstName;*
> * public String lastName;*
> * private int age;*
> * public Long time;*
>
>
>
> * public Event() {*
> * }*
>
> * public String getFirstName() {*
> * return firstName;*
> * }*
>
> * public void setFirstName(String firstName) {*
> * this.firstName = firstName;*
> * }*
>
> * public String getLastName() {*
> * return lastName;*
> * }*
>
> * public void setLastName(String lastName) {*
> * this.lastName = lastName;*
> * }*
>
> * public int getAge() {*
> * return age;*
> * }*
>
> * public void setAge(int age) {*
> * this.age = age;*
> * }*
>
> * public long getTime() {*
> * return time;*
> * }*
>
> * public void setTime(String kafkaTime) {*
> * long tn =
> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();*
> * this.time = tn;*
> * }*
> *}*
>
>
>
>
>
>