[ 
https://issues.apache.org/jira/browse/KAFKA-4932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16331303#comment-16331303
 ] 

ASF GitHub Bot commented on KAFKA-4932:
---------------------------------------

brandonkirchner closed pull request #4438: KAFKA-4932 add UUID serializer / 
deserializer
URL: https://github.com/apache/kafka/pull/4438
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java 
b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index d6b4d2da611..3377a27897b 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -20,6 +20,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.UUID;
 
 /**
  * Factory for creating serializers / deserializers.
@@ -112,6 +113,12 @@ public ByteArraySerde() {
         }
     }
 
+    static public final class UUIDSerde extends WrapperSerde<UUID> {
+        public UUIDSerde() {
+            super(new UUIDSerializer(), new UUIDDeserializer());
+        }
+    }
+
     @SuppressWarnings("unchecked")
     static public <T> Serde<T> serdeFrom(Class<T> type) {
         if (String.class.isAssignableFrom(type)) {
@@ -150,9 +157,13 @@ public ByteArraySerde() {
             return (Serde<T>) Bytes();
         }
 
+        if (UUID.class.isAssignableFrom(type)) {
+            return (Serde<T>) UUID();
+        }
+
         // TODO: we can also serializes objects of type T using generic Java 
serialization by default
         throw new IllegalArgumentException("Unknown class for built-in 
serializer. Supported types are: " +
-            "String, Short, Integer, Long, Float, Double, ByteArray, 
ByteBuffer, Bytes");
+            "String, Short, Integer, Long, Float, Double, ByteArray, 
ByteBuffer, Bytes, UUID");
     }
 
     /**
@@ -228,6 +239,13 @@ public ByteArraySerde() {
         return new BytesSerde();
     }
 
+    /*
+     * A serde for nullable {@code UUID} type
+     */
+    static public Serde<UUID> UUID() {
+        return new UUIDSerde();
+    }
+
     /*
      * A serde for nullable {@code byte[]} type.
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java
new file mode 100644
index 00000000000..81fc0e331c7
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.UUID;
+
+public class UUIDDeserializer implements Deserializer<UUID> {
+    private String encoding = "UTF8";
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        String propertyName = isKey ? "key.deserializer.encoding" : 
"value.deserializer.encoding";
+        Object encodingValue = configs.get(propertyName);
+        if (encodingValue == null)
+            encodingValue = configs.get("deserializer.encoding");
+        if (encodingValue != null && encodingValue instanceof String)
+            encoding = (String) encodingValue;
+    }
+
+    @Override
+    public UUID deserialize(String topic, byte[] data) {
+        try {
+            if (data == null)
+                return null;
+            else
+                return UUID.fromString(new String(data, encoding));
+        } catch (UnsupportedEncodingException e) {
+            throw new SerializationException("Error when deserializing byte[] 
to UUID due to unsupported encoding " + encoding);
+        }
+    }
+
+    @Override
+    public void close() {
+      // do nothing
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java
new file mode 100644
index 00000000000..d051b21c53c
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.UUID;
+
+public class UUIDSerializer implements Serializer<UUID> {
+    private String encoding = "UTF8";
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        String propertyName = isKey ? "key.serializer.encoding" : 
"value.serializer.encoding";
+        Object encodingValue = configs.get(propertyName);
+        if (encodingValue == null)
+            encodingValue = configs.get("serializer.encoding");
+        if (encodingValue instanceof String)
+            encoding = (String) encodingValue;
+    }
+
+    @Override
+    public byte[] serialize(String topic, UUID data) {
+        try {
+            if (data == null)
+                return null;
+            else
+                return data.toString().getBytes(encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new SerializationException("Error when serializing UUID to 
byte[] due to unsupported encoding " + encoding);
+        }
+    }
+
+    @Override
+    public void close() {
+      // nothing to do
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 134882f002b..8c7268cb7b6 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -25,6 +25,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -45,6 +46,7 @@
             put(byte[].class, Arrays.asList("my string".getBytes()));
             put(ByteBuffer.class, 
Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes())));
             put(Bytes.class, Arrays.asList(new Bytes("my string".getBytes())));
+            put(UUID.class, Arrays.asList(UUID.randomUUID()));
         }
     };
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add UUID Serde
> --------------
>
>                 Key: KAFKA-4932
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4932
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients, streams
>            Reporter: Jeff Klukas
>            Assignee: Brandon Kirchner
>            Priority: Minor
>              Labels: needs-kip, newbie
>
> I propose adding serializers and deserializers for the java.util.UUID class.
> I have many use cases where I want to set the key of a Kafka message to be a 
> UUID. Currently, I need to turn UUIDs into strings or byte arrays and use 
> their associated Serdes, but it would be more convenient to serialize and 
> deserialize UUIDs directly.
> I'd propose that the serializer and deserializer use the 36-byte string 
> representation, calling UUID.toString and UUID.fromString, and then using the 
> existing StringSerializer / StringDeserializer to finish the job. We would 
> also wrap these in a Serde and modify the streams Serdes class to include 
> this in the list of supported types.
> Optionally, we could have the deserializer support a 16-byte representation 
> and it would check the size of the input byte array to determine whether it's 
> a binary or string representation of the UUID. It's not well defined whether 
> the most significant bits or least significant go first, so this deserializer 
> would have to support only one or the other.
> Similary, if the deserializer supported a 16-byte representation, there could 
> be two variants of the serializer, a UUIDStringSerializer and a 
> UUIDBytesSerializer.
> I would be willing to write this PR, but am looking for feedback about 
> whether there are significant concerns here around ambiguity of what the byte 
> representation of a UUID should be, or if there's desire to keep to list of 
> built-in Serdes minimal such that a PR would be unlikely to be accepted.
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-206%3A+Add+support+for+UUID+serialization+and+deserialization



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to