[GitHub] tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer
tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r212252904 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: I would be ok with proceeding with the above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer
tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r211532678 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: I'm still not very convinced that this is a good approach. For example, things might be a lot cleaner and error-proof if we can just deprecate the non-timestamped `KeyedSerializationSchema` in favor of a new interface. Technically, we're trying to achieve the same thing, by things would perhaps be much understandable. @FredTing what do you think? Perhaps @aljoscha would also want to chime in some thoughts here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer
tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r211187832 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: @FredTing Your first concern makes sense. Though, wouldn't this be a shortcoming also for our long-term solution, where we want to have separate interfaces for "deserialization" and "enrichment"? ``` I have no problem with the first alternative, but I think we are better of when we throw an exception with a message explaining that must implement/override one of the deserialize methods. ``` I'm not sure how this would work. Could you elaborate a bit more on this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer
tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r210810967 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: This would mean that, if users want to access timestamps, they would have to implement both deserialize methods, correct? This feels a bit awkward. I can think of 2 possible approaches to address this: 1. Let the original non-timestamped deserialize method also be default, but have an empty method. It's still not nice though, since now both deserialize methods are default, moreover the second deserialize method relies on the first one. 2. Instead of having a new deserialize method with the timestamp, we have an `setTimestamp` method that is only ever called in Kafka 0.10+, which signature is `void setTimestamp(T deserializedRecord, long timestamp)`. This might also match better what we've discussed in the JIRA ticket, about having separate interfaces for "deserialization" and "enrichment". This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer
tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r210809870 ## File path: docs/dev/connectors/kafka.md ## @@ -153,7 +153,10 @@ produced Java/Scala type to Flink's type system. Users that implement a vanilla to implement the `getProducedType(...)` method themselves. For accessing both the key and value of the Kafka message, the `KeyedDeserializationSchema` has -the following deserialize method ` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)`. +the following deserialize methods ` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)` and +` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType)`. +The first exists for backward compatibility reasons, for kafka 0.10+ consumers the second is preferred because it +also gives access to the kafka timestamp. Review comment: Capital 'k' for Kafka This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer
tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r210810317 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -54,4 +70,12 @@ * @return True, if the element signals end of stream, false otherwise. */ boolean isEndOfStream(T nextElement); + + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 Review comment: Capital k for Kafka This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services