Hi Simone,

The problem is that the Java 1.8 compiler cannot do type inference when
chaining methods [1].

The solution would be

WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> {
return event.getTime();
                        });

@Aljoscha Krettek <aljos...@apache.org> I think we need to update the
documentation about it. We have some examples which don't take this into
account.

[1]
https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/

Cheers,
Till

On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <cavalla...@hotmail.com>
wrote:

> Hi,
>
> I'm taking the timestamp from the event payload that I'm receiving from
> Kafka.
>
> I'm struggling to get the time and I'm confused on how I should use the
> function ".withTimestampAssigner()". I'm receiving an error on event.
> getTime() that is telling me: *"cannot resolve method "Get Time" in
> "Object"* and I really don't understand how I can fix it.  My class is
> providing a long so the variable itself should be fine. Any help would be
> really appreciated.
>
> *This is my code:*
>
> *    FlinkKafkaConsumer<Event> kafkaData =*
> *                new FlinkKafkaConsumer("CorID_0", new
> EventDeserializationSchema(), p);*
> *        WatermarkStrategy<Event> wmStrategy =*
> *                WatermarkStrategy*
> *                        .forMonotonousTimestamps()*
> *                        .withTimestampAssigner((event, timestamp) -> {
> return event.**getTime();*
> *                        });*
>
> *        DataStream<Event> stream = env.addSource(*
> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>
>
> And to give you the idea of the whole project,
>
> *This is the EventDeserializationSchema class:*
>
> *public class EventDeserializationSchema implements
> DeserializationSchema<Event> {*
>
> *    private static final long serialVersionUID = 1L;*
>
>
> *    private static final CsvSchema schema = CsvSchema.builder()*
> *            .addColumn("firstName")*
> *            .addColumn("lastName")*
> *            .addColumn("age", CsvSchema.ColumnType.NUMBER)*
> *            .addColumn("time")*
> *            .build();*
>
> *    private static final ObjectMapper mapper = new CsvMapper();*
>
> *    @Override*
> *    public Event deserialize(byte[] message) throws IOException {*
> *        return
> mapper.readerFor(Event.class).with(schema).readValue(message);*
> *    }*
>
> *    @Override*
> *    public boolean isEndOfStream(Event nextElement) {*
> *        return false;*
> *    }*
>
> *    @Override*
> *    public TypeInformation<Event> getProducedType() {*
>
> *        return TypeInformation.of(Event.class);*
> *    }*
> *}*
>
> *And this is the Event Class:*
>
> *public class Event implements Serializable {*
> *    public String firstName;*
> *    public String lastName;*
> *    private int age;*
> *    public Long time;*
>
>
>
> *    public Event() {*
> *    }*
>
> *    public String getFirstName() {*
> *        return firstName;*
> *    }*
>
> *    public void setFirstName(String firstName) {*
> *        this.firstName = firstName;*
> *    }*
>
> *    public String getLastName() {*
> *        return lastName;*
> *    }*
>
> *    public void setLastName(String lastName) {*
> *        this.lastName = lastName;*
> *    }*
>
> *    public int getAge() {*
> *        return age;*
> *    }*
>
> *    public void setAge(int age) {*
> *        this.age = age;*
> *    }*
>
> *    public long getTime() {*
> *        return time;*
> *    }*
>
> *    public void setTime(String kafkaTime) {*
> *        long tn =
> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();*
> *        this.time = tn;*
> *    }*
> *}*
>
>
>
>
>
>

Reply via email to