This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3ec3fe8 [Feature] Support flink table lookup join (#61)
3ec3fe8 is described below
commit 3ec3fe8bf1d1e7193fcb2687d57967ee4714b4ac
Author: wudi <[email protected]>
AuthorDate: Thu Sep 15 14:08:41 2022 +0800
[Feature] Support flink table lookup join (#61)
* add flink table lookup join
Co-authored-by: wudi <>
---
.../doris/flink/cfg/DorisExecutionOptions.java | 2 +-
.../apache/doris/flink/cfg/DorisLookupOptions.java | 80 +++++++++++
.../org/apache/doris/flink/cfg/DorisOptions.java | 2 +-
.../doris/flink/table/DorisConfigOptions.java | 30 +++-
.../flink/table/DorisDynamicTableFactory.java | 16 +++
.../doris/flink/table/DorisDynamicTableSource.java | 34 ++++-
.../flink/table/DorisRowDataLookupFunction.java | 160 +++++++++++++++++++++
.../table/DorisRowDataLookupFunctionTest.java | 155 ++++++++++++++++++++
8 files changed, 471 insertions(+), 8 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 102a7ee..36c577a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
import java.util.Properties;
/**
- * JDBC sink batch options.
+ * Doris sink batch options.
*/
public class DorisExecutionOptions implements Serializable {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java
new file mode 100644
index 0000000..f0f7eb4
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.cfg;
+
+import java.io.Serializable;
+
+public class DorisLookupOptions implements Serializable {
+
+ private final long cacheMaxSize;
+ private final long cacheExpireMs;
+ private final int maxRetryTimes;
+
+ public DorisLookupOptions(
+ long cacheMaxSize, long cacheExpireMs, int maxRetryTimes) {
+ this.cacheMaxSize = cacheMaxSize;
+ this.cacheExpireMs = cacheExpireMs;
+ this.maxRetryTimes = maxRetryTimes;
+ }
+
+ public long getCacheMaxSize() {
+ return cacheMaxSize;
+ }
+
+ public long getCacheExpireMs() {
+ return cacheExpireMs;
+ }
+
+ public int getMaxRetryTimes() {
+ return maxRetryTimes;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Builder of {@link DorisLookupOptions}. */
+ public static class Builder {
+ private long cacheMaxSize = -1L;
+ private long cacheExpireMs = -1L;
+ private int maxRetryTimes = 3;
+
+ /** optional, lookup cache max size, over this value, the old data
will be eliminated. */
+ public Builder setCacheMaxSize(long cacheMaxSize) {
+ this.cacheMaxSize = cacheMaxSize;
+ return this;
+ }
+
+ /** optional, lookup cache expire mills, over this time, the old data
will expire. */
+ public Builder setCacheExpireMs(long cacheExpireMs) {
+ this.cacheExpireMs = cacheExpireMs;
+ return this;
+ }
+
+ /** optional, max retry times. */
+ public Builder setMaxRetryTimes(int maxRetryTimes) {
+ this.maxRetryTimes = maxRetryTimes;
+ return this;
+ }
+
+ public DorisLookupOptions build() {
+ return new DorisLookupOptions(
+ cacheMaxSize, cacheExpireMs, maxRetryTimes);
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index 512d0ab..7d6963b 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -84,7 +84,7 @@ public class DorisOptions extends DorisConnectionOptions {
}
/**
- * required, JDBC DB url.
+ * required, Frontend Http Rest url.
*/
public Builder setFenodes(String fenodes) {
this.fenodes = fenodes;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 5b56342..02a4d22 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -20,6 +20,8 @@ package org.apache.doris.flink.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import java.time.Duration;
+
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
@@ -35,9 +37,9 @@ public class DorisConfigOptions {
public static final String IDENTIFIER = "doris";
// common option
public static final ConfigOption<String> FENODES =
ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris
fe http address.");
- public static final ConfigOption<String> TABLE_IDENTIFIER =
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
jdbc table name.");
- public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
jdbc user name.");
- public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
jdbc password.");
+ public static final ConfigOption<String> TABLE_IDENTIFIER =
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
doris table name.");
+ public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
doris user name.");
+ public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
doris password.");
// source config options
public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
@@ -101,6 +103,28 @@ public class DorisConfigOptions {
.defaultValue(false)
.withDescription("Whether to read data using the new interface
defined according to the FLIP-27 specification,default false");
+ // Lookup options
+ public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+ ConfigOptions.key("lookup.cache.max-rows")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription(
+ "The max number of rows of lookup cache, over this
value, the oldest rows will "
+ + "be eliminated. \"cache.max-rows\" and
\"cache.ttl\" options must all be specified if any of them is "
+ + "specified.");
+
+ public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+ ConfigOptions.key("lookup.cache.ttl")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
+ .withDescription("The cache time to live.");
+
+ public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+ ConfigOptions.key("lookup.max-retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription("The max retry times if lookup database
failed.");
+
// sink config options
public static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions
.key("sink.enable-2pc")
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 81ee23c..5a11605 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -17,6 +17,7 @@
package org.apache.doris.flink.table;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.flink.configuration.ConfigOption;
@@ -48,6 +49,9 @@ import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETR
import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
+import static
org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_MAX_ROWS;
+import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_TTL;
+import static
org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_MAX_RETRIES;
import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_SIZE;
@@ -102,6 +106,9 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
options.add(DORIS_BATCH_SIZE);
options.add(DORIS_EXEC_MEM_LIMIT);
+ options.add(LOOKUP_CACHE_MAX_ROWS);
+ options.add(LOOKUP_CACHE_TTL);
+ options.add(LOOKUP_MAX_RETRIES);
options.add(SINK_CHECK_INTERVAL);
options.add(SINK_ENABLE_2PC);
@@ -131,6 +138,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
return new DorisDynamicTableSource(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
+ getDorisLookupOptions(helper.getOptions()),
physicalSchema);
}
@@ -189,6 +197,14 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
return streamLoadProp;
}
+ private DorisLookupOptions getDorisLookupOptions(ReadableConfig
readableConfig){
+ final DorisLookupOptions.Builder builder =
DorisLookupOptions.builder();
+
builder.setCacheExpireMs(readableConfig.get(LOOKUP_CACHE_TTL).toMillis());
+ builder.setCacheMaxSize(readableConfig.get(LOOKUP_CACHE_MAX_ROWS));
+ builder.setMaxRetryTimes(readableConfig.get(LOOKUP_MAX_RETRIES));
+ return builder.build();
+ }
+
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper =
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 20ce4f6..c5e47fa 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -17,6 +17,7 @@
package org.apache.doris.flink.table;
+import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.deserialization.RowDataDeserializationSchema;
@@ -34,9 +35,11 @@ import
org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,8 +61,19 @@ public final class DorisDynamicTableSource implements
ScanTableSource, LookupTab
private static final Logger LOG =
LoggerFactory.getLogger(DorisDynamicTableSource.class);
private final DorisOptions options;
private final DorisReadOptions readOptions;
+ private DorisLookupOptions lookupOptions;
private TableSchema physicalSchema;
+ public DorisDynamicTableSource(DorisOptions options,
+ DorisReadOptions readOptions,
+ DorisLookupOptions lookupOptions,
+ TableSchema physicalSchema) {
+ this.options = options;
+ this.lookupOptions = lookupOptions;
+ this.readOptions = readOptions;
+ this.physicalSchema = physicalSchema;
+ }
+
public DorisDynamicTableSource(DorisOptions options,
DorisReadOptions readOptions,
TableSchema physicalSchema) {
@@ -109,13 +123,27 @@ public final class DorisDynamicTableSource implements
ScanTableSource, LookupTab
}
@Override
- public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
lookupContext) {
- return null;
+ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
+ DataType physicalRowDataType = physicalSchema.toRowDataType();
+ String[] keyNames = new String[context.getKeys().length];
+ for (int i = 0; i < keyNames.length; i++) {
+ int[] innerKeyArr = context.getKeys()[i];
+ keyNames[i] =
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
+ }
+
+ return TableFunctionProvider.of(
+ new DorisRowDataLookupFunction(
+ options,
+ readOptions,
+ lookupOptions,
+
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
+ keyNames));
}
@Override
public DynamicTableSource copy() {
- return new DorisDynamicTableSource(options, readOptions,
physicalSchema);
+ return new DorisDynamicTableSource(options, readOptions,
lookupOptions, physicalSchema);
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataLookupFunction.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataLookupFunction.java
new file mode 100644
index 0000000..d6e0edd
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataLookupFunction.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
+import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.source.reader.DorisValueReader;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.TimeUnit;
+
+public class DorisRowDataLookupFunction extends TableFunction<RowData> {
+ private static final Logger logger =
LoggerFactory.getLogger(DorisRowDataLookupFunction.class);
+
+ private final DorisOptions options;
+ private final DorisReadOptions readOptions;
+ private final String[] selectFields;
+ private final String[] conditionFields;
+
+ private final long cacheMaxSize;
+ private final long cacheExpireMs;
+ private final int maxRetryTimes;
+
+ private final DorisRowConverter rowConverter;
+ private transient Cache<RowData, List<RowData>> cache;
+
+ public DorisRowDataLookupFunction(DorisOptions options,
+ DorisReadOptions readOptions,
+ DorisLookupOptions lookupOptions,
+ String[] selectFields,
+ DataType[] fieldTypes,
+ String[] conditionFields) {
+ this.options = options;
+ this.readOptions = readOptions;
+ this.selectFields = selectFields;
+ this.conditionFields = conditionFields;
+ this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+ this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+ this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+ this.rowConverter = new DorisRowConverter(fieldTypes);
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ super.open(context);
+ this.cache = cacheMaxSize == -1 || cacheExpireMs == -1
+ ? null
+ : CacheBuilder.newBuilder()
+ .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+ .maximumSize(cacheMaxSize)
+ .build();
+ }
+
+ /**
+ * This is a lookup method which is called by Flink framework in runtime.
+ *
+ * @param keys lookup keys
+ */
+ public void eval(Object... keys) {
+ RowData keyRow = GenericRowData.of(keys);
+ if (cache != null) {
+ List<RowData> cachedRows = cache.getIfPresent(keyRow);
+ if (cachedRows != null) {
+ for (RowData cachedRow : cachedRows) {
+ collect(cachedRow);
+ }
+ return;
+ }
+ }
+
+ List<PartitionDefinition> partitions = getPartitions(keys);
+ for (int retry = 0; retry <= maxRetryTimes; retry++) {
+ try {
+ ArrayList<RowData> rows = new ArrayList<>();
+ for (PartitionDefinition part : partitions) {
+ try (DorisValueReader valueReader = new
DorisValueReader(part, options, readOptions)) {
+ while (valueReader.hasNext()) {
+ List<?> record = valueReader.next();
+ GenericRowData rowData =
rowConverter.convertInternal(record);
+ rows.add(rowData);
+ collect(rowData);
+ }
+ }
+ }
+ if(cache != null){
+ rows.trimToSize();
+ cache.put(keyRow, rows);
+ }
+ break;
+ } catch (Exception ex) {
+ logger.error(String.format("Read Doris error, retry times =
%d", retry), ex);
+ if (retry >= maxRetryTimes) {
+ throw new RuntimeException("Read Doris failed.", ex);
+ }
+ try {
+ Thread.sleep(1000 * retry);
+ } catch (InterruptedException e1) {
+ throw new RuntimeException(e1);
+ }
+ }
+ }
+ }
+
+ private List<PartitionDefinition> getPartitions(Object... keys) {
+ readOptions.setReadFields((String.join(",", selectFields)));
+ StringJoiner filter = new StringJoiner(" AND ");
+ for (int i = 0; i < keys.length && i < conditionFields.length; i++) {
+ filter.add(String.format("%s = '%s'", conditionFields[i],
keys[i]));
+ }
+ readOptions.setFilterQuery(filter.toString());
+ try {
+ return RestService.findPartitions(options, readOptions, logger);
+ } catch (DorisException ex) {
+ logger.error("Failed fetch doris partitions");
+ return new ArrayList<>();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+
+ @VisibleForTesting
+ public Cache<RowData, List<RowData>> getCache() {
+ return cache;
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataLookupFunctionTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataLookupFunctionTest.java
new file mode 100644
index 0000000..cef9e10
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataLookupFunctionTest.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+@Ignore
+public class DorisRowDataLookupFunctionTest {
+
+ private static final String TEST_FENODES = "127.0.0.1:8030";
+ private static final String LOOKUP_TABLE = "test.t_lookup_table";
+
+ private static String[] fieldNames = new String[] {"id1", "id2",
"c_string", "c_double"};
+ private static DataType[] fieldDataTypes =
+ new DataType[] {
+ DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.DOUBLE()
+ };
+ private static String[] lookupKeys = new String[] {"id1", "id2"};
+
+ @Test
+ public void testEval() throws Exception {
+
+ DorisLookupOptions lookupOptions =
DorisLookupOptions.builder().build();
+ DorisRowDataLookupFunction lookupFunction =
buildRowDataLookupFunction(lookupOptions);
+
+ ListOutputCollector collector = new ListOutputCollector();
+ lookupFunction.setCollector(collector);
+
+ lookupFunction.open(null);
+
+ lookupFunction.eval(1, StringData.fromString("A"));
+ lookupFunction.eval(2, StringData.fromString("B"));
+
+ List<String> result =
+ new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+
+ List<String> expected = new ArrayList<>();
+ expected.add("+I(1,A,zhangsanA,1.12)");
+ expected.add("+I(1,A,zhangsanA-1,11.12)");
+ expected.add("+I(2,B,zhangsanB,2.12)");
+ Collections.sort(expected);
+
+ assertEquals(expected, result);
+ }
+
+ @Test
+ public void testEvalWithCache() throws Exception {
+ long cacheExpireMs = 10000;
+ DorisLookupOptions lookupOptions =
+ DorisLookupOptions.builder()
+ .setCacheExpireMs(cacheExpireMs)
+ .setCacheMaxSize(10)
+ .build();
+
+ DorisRowDataLookupFunction lookupFunction =
buildRowDataLookupFunction(lookupOptions);
+
+ ListOutputCollector collector = new ListOutputCollector();
+ lookupFunction.setCollector(collector);
+
+ lookupFunction.open(null);
+
+ lookupFunction.eval(4, StringData.fromString("D"));
+ lookupFunction.eval(5, StringData.fromString("5"));
+ RowData keyRow = GenericRowData.of(4, StringData.fromString("D"));
+ RowData keyRowNoExist = GenericRowData.of(5,
StringData.fromString("5"));
+ Cache<RowData, List<RowData>> cache = lookupFunction.getCache();
+ // empty data should cache
+ assertEquals(cache.getIfPresent(keyRow),
+ Arrays.asList(GenericRowData.of(
+ 4,
+ StringData.fromString("D"),
+ StringData.fromString("zhangsanD"),
+ 4.12)));
+ assertEquals(cache.getIfPresent(keyRowNoExist),
Collections.<RowData>emptyList());
+
+ //cache data expire
+ Thread.sleep(cacheExpireMs);
+ assert cache.getIfPresent(keyRow) == null;
+ }
+
+
+ private DorisRowDataLookupFunction
buildRowDataLookupFunction(DorisLookupOptions lookupOptions) {
+ DorisOptions dorisOptions =
DorisOptions.builder().setFenodes(TEST_FENODES)
+ .setTableIdentifier(LOOKUP_TABLE)
+ .setUsername("root")
+ .setPassword("")
+ .build();
+
+ DorisReadOptions readOptions = DorisReadOptions.builder().build();
+
+ DorisRowDataLookupFunction lookupFunction =
+ new DorisRowDataLookupFunction(
+ dorisOptions,
+ readOptions,
+ lookupOptions,
+ fieldNames,
+ fieldDataTypes,
+ lookupKeys);
+
+ return lookupFunction;
+ }
+
+ private static final class ListOutputCollector implements
Collector<RowData> {
+
+ private final List<RowData> output = new ArrayList<>();
+
+ @Override
+ public void collect(RowData row) {
+ this.output.add(row);
+ }
+
+ @Override
+ public void close() {}
+
+ public List<RowData> getOutputs() {
+ return output;
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]