This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8d7b7b7 [Enhancement] ADD RowSerializer for doris flink connector
(#71)
8d7b7b7 is described below
commit 8d7b7b7db59f142cd19683e91c686cc2ed96008d
Author: DingGeGe <[email protected]>
AuthorDate: Mon Oct 10 16:44:19 2022 +0800
[Enhancement] ADD RowSerializer for doris flink connector (#71)
* [Enhancement] ADD RowSerializer for doris flink connector
---
.../doris/flink/sink/writer/RowSerializer.java | 107 +++++++++++++++++++++
.../doris/flink/sink/writer/TestRowSerializer.java | 97 +++++++++++++++++++
2 files changed, 204 insertions(+)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java
new file mode 100644
index 0000000..3a07951
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.RowRowConverter;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+import java.io.IOException;
+import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
+import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
+
+/**
+ * Serializer for {@link Row}.
+ * Quick way to support RowSerializer on existing code
+ * TODO: support original Doris to Row serializer
+ */
+public class RowSerializer implements DorisRecordSerializer<Row> {
+ /**
+ * converter {@link Row} to {@link RowData}
+ */
+ private final RowRowConverter rowRowConverter;
+ private final RowDataSerializer rowDataSerializer;
+
+ private RowSerializer(String[] fieldNames, DataType[] dataTypes, String
type, String fieldDelimiter,
+ boolean enableDelete) {
+ this.rowRowConverter =
RowRowConverter.create(DataTypes.ROW(dataTypes));
+ this.rowDataSerializer = RowDataSerializer.builder()
+ .setFieldNames(fieldNames)
+ .setFieldType(dataTypes)
+ .setType(type)
+ .setFieldDelimiter(fieldDelimiter)
+ .enableDelete(enableDelete)
+ .build();
+ }
+
+ @Override
+ public byte[] serialize(Row record) throws IOException{
+ RowData rowDataRecord = this.rowRowConverter.toInternal(record);
+ return this.rowDataSerializer.serialize(rowDataRecord);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for RowSerializer.
+ */
+ public static class Builder {
+ private String[] fieldNames;
+ private DataType[] dataTypes;
+ private String type;
+ private String fieldDelimiter;
+ private boolean deletable;
+
+ public Builder setFieldNames(String[] fieldNames) {
+ this.fieldNames = fieldNames;
+ return this;
+ }
+
+ public Builder setFieldType(DataType[] dataTypes) {
+ this.dataTypes = dataTypes;
+ return this;
+ }
+
+ public Builder setType(String type) {
+ this.type = type;
+ return this;
+ }
+
+ public Builder setFieldDelimiter(String fieldDelimiter) {
+ this.fieldDelimiter = fieldDelimiter;
+ return this;
+ }
+
+ public Builder enableDelete(boolean deletable) {
+ this.deletable = deletable;
+ return this;
+ }
+
+ public RowSerializer build() {
+ Preconditions.checkState(CSV.equals(type) && fieldDelimiter !=
null || JSON.equals(type));
+ Preconditions.checkNotNull(dataTypes);
+ Preconditions.checkNotNull(fieldNames);
+ return new RowSerializer(fieldNames, dataTypes, type,
fieldDelimiter, deletable);
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
new file mode 100644
index 0000000..6c07289
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowSerializer.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * test for RowSerializer.
+ */
+public class TestRowSerializer {
+ static Row row;
+ static DataType[] dataTypes;
+ static String[] fieldNames;
+
+ @BeforeClass
+ public static void setUp() {
+ row = new Row(3);
+ row.setField(0, 3);
+ row.setField(1, "test");
+ row.setField(2, 60.2);
+ row.setKind(RowKind.INSERT);
+ dataTypes = new DataType[]{DataTypes.INT(), DataTypes.STRING(),
DataTypes.DOUBLE()};
+ fieldNames = new String[]{"id", "name", "weight"};
+ }
+
+ @Test
+ public void testSerializeCsv() throws IOException {
+ RowSerializer.Builder builder = RowSerializer.builder();
+
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(false);
+ RowSerializer serializer = builder.build();
+ byte[] serializedValue = serializer.serialize(row);
+
Assert.assertArrayEquals("3|test|60.2".getBytes(StandardCharsets.UTF_8),
serializedValue);
+ }
+
+ @Test
+ public void testSerializeJson() throws IOException {
+ RowSerializer.Builder builder = RowSerializer.builder();
+
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(false);
+ RowSerializer serializer = builder.build();
+ byte[] serializedValue = serializer.serialize(row);
+ ObjectMapper objectMapper = new ObjectMapper();
+ Map<String, String> valueMap = objectMapper.readValue(new
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String,
String>>(){});
+ Assert.assertEquals("3", valueMap.get("id"));
+ Assert.assertEquals("test", valueMap.get("name"));
+ Assert.assertEquals("60.2", valueMap.get("weight"));
+ }
+
+ @Test
+ public void testSerializeCsvWithSign() throws IOException {
+ RowSerializer.Builder builder = RowSerializer.builder();
+
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(true);
+ RowSerializer serializer = builder.build();
+ byte[] serializedValue = serializer.serialize(row);
+
Assert.assertArrayEquals("3|test|60.2|0".getBytes(StandardCharsets.UTF_8),
serializedValue);
+ }
+
+ @Test
+ public void testSerializeJsonWithSign() throws IOException {
+ RowSerializer.Builder builder = RowSerializer.builder();
+
builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true);
+ RowSerializer serializer = builder.build();
+ byte[] serializedValue = serializer.serialize(row);
+ ObjectMapper objectMapper = new ObjectMapper();
+ Map<String, String> valueMap = objectMapper.readValue(new
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String,
String>>(){});
+ Assert.assertEquals("3", valueMap.get("id"));
+ Assert.assertEquals("test", valueMap.get("name"));
+ Assert.assertEquals("60.2", valueMap.get("weight"));
+ Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]