This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dfd42dc88d [INLONG-10809][SDK] Improvements to TypeConverter field
types and CompareValue in OperatorTools (#10817)
dfd42dc88d is described below
commit dfd42dc88d7f5bb95d66165ae5cfcf6c92921bd7
Author: Zkplo <[email protected]>
AuthorDate: Tue Sep 3 10:24:08 2024 +0800
[INLONG-10809][SDK] Improvements to TypeConverter field types and
CompareValue in OperatorTools (#10817)
Co-authored-by: ZKpLo <[email protected]>
---
.../inlong/sdk/transform/decode/CsvSourceData.java | 10 +-
.../sdk/transform/decode/CsvSourceDecoder.java | 8 +-
.../inlong/sdk/transform/decode/SourceData.java | 2 +-
.../inlong/sdk/transform/pojo/FieldInfo.java | 2 +-
.../converter/DoubleConverter.java} | 16 +-
.../converter/LongConverter.java} | 16 +-
.../sdk/transform/process/function/IfFunction.java | 55 ++++
.../transform/process/operator/OperatorTools.java | 21 +-
.../sdk/transform/process/parser/DoubleParser.java | 1 +
.../TestTransformExpressionOperatorsProcessor.java | 354 +++++++++++++++++++++
10 files changed, 445 insertions(+), 40 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
index d4492b4b85..e0bd9f794c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java
@@ -28,14 +28,14 @@ import java.util.Map;
*/
public class CsvSourceData implements SourceData {
- private List<Map<String, String>> rows = new ArrayList<>();
+ private List<Map<String, Object>> rows = new ArrayList<>();
- private Map<String, String> currentRow;
+ private Map<String, Object> currentRow;
public CsvSourceData() {
}
- public void putField(String fieldName, String fieldValue) {
+ public void putField(String fieldName, Object fieldValue) {
this.currentRow.put(fieldName, fieldValue);
}
@@ -50,11 +50,11 @@ public class CsvSourceData implements SourceData {
}
@Override
- public String getField(int rowNum, String fieldName) {
+ public Object getField(int rowNum, String fieldName) {
if (rowNum >= this.rows.size()) {
return null;
}
- Map<String, String> targetRow = this.rows.get(rowNum);
+ Map<String, Object> targetRow = this.rows.get(rowNum);
return targetRow.get(fieldName);
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
index fb95dadc43..7b3dedb637 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -76,9 +76,13 @@ public class CsvSourceDecoder implements
SourceDecoder<String> {
int fieldIndex = 0;
for (FieldInfo field : fields) {
String fieldName = field.getName();
- String fieldValue = null;
+ Object fieldValue = null;
if (fieldIndex < fieldValues.length) {
- fieldValue = fieldValues[fieldIndex];
+ try {
+ fieldValue =
field.getConverter().convert(fieldValues[fieldIndex]);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
sourceData.putField(fieldName, fieldValue);
fieldIndex++;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
index 2c39948f2d..cf5f9c0fbe 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
@@ -26,5 +26,5 @@ public interface SourceData {
int getRowCount();
- String getField(int rowNum, String fieldName);
+ Object getField(int rowNum, String fieldName);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
index 1027dad944..2a7834112a 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
@@ -28,7 +28,7 @@ import lombok.Data;
public class FieldInfo {
private String name;
- private TypeConverter converter;
+ private TypeConverter converter = TypeConverter.DefaultTypeConverter();
public FieldInfo() {
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java
similarity index 77%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java
index 2c39948f2d..52afbda16b 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/DoubleConverter.java
@@ -15,16 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.transform.decode;
+package org.apache.inlong.sdk.transform.process.converter;
-/**
- * SourceData
- */
-public interface SourceData {
-
- String FIELD_DEFAULT_PREFIX = "$";
-
- int getRowCount();
+public class DoubleConverter implements TypeConverter {
- String getField(int rowNum, String fieldName);
+ @Override
+ public Object convert(String value) throws Exception {
+ return Double.parseDouble(value);
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java
similarity index 77%
copy from
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
copy to
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java
index 2c39948f2d..5a18f8ee13 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceData.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/converter/LongConverter.java
@@ -15,16 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.transform.decode;
+package org.apache.inlong.sdk.transform.process.converter;
-/**
- * SourceData
- */
-public interface SourceData {
-
- String FIELD_DEFAULT_PREFIX = "$";
-
- int getRowCount();
+public class LongConverter implements TypeConverter {
- String getField(int rowNum, String fieldName);
+ @Override
+ public Object convert(String value) throws Exception {
+ return Long.parseLong(value);
+ }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java
new file mode 100644
index 0000000000..bda7b9301c
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/IfFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.List;
+
+/**
+ * IfFunction
+ * description: if(expr,r1,r2) -- expr is an expression, if it holds, return
r1; otherwise, return r2
+ */
+@TransformFunction(names = {"if"})
+public class IfFunction implements ValueParser {
+
+ private final ExpressionOperator expressionOperator;
+ private final ValueParser tureValueParser;
+ private final ValueParser falseValueParser;
+
+ public IfFunction(Function expr) {
+ List<Expression> expressions = expr.getParameters().getExpressions();
+ expressionOperator = OperatorTools.buildOperator(expressions.get(0));
+ tureValueParser = OperatorTools.buildParser(expressions.get(1));
+ falseValueParser = OperatorTools.buildParser(expressions.get(2));
+ }
+
+ @Override
+ public Object parse(SourceData sourceData, int rowIndex, Context context) {
+ boolean condition = expressionOperator.check(sourceData, rowIndex,
context);
+ return condition ? tureValueParser.parse(sourceData, rowIndex, context)
+ : falseValueParser.parse(sourceData, rowIndex, context);
+ }
+}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
index 02b24cdb6b..bb35bb4490 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java
@@ -133,19 +133,18 @@ public class OperatorTools {
if (right == null) {
return 1;
}
- if (left instanceof String) {
- if (right instanceof String) {
- return ObjectUtils.compare(left, right);
- } else {
- BigDecimal leftValue = parseBigDecimal(left);
- return ObjectUtils.compare(leftValue, right);
- }
+
+ if (((Object) left).getClass() == ((Object) right).getClass()) {
+ return ObjectUtils.compare(left, right);
} else {
- if (right instanceof String) {
+ try {
+ BigDecimal leftValue = parseBigDecimal(left);
BigDecimal rightValue = parseBigDecimal(right);
- return ObjectUtils.compare(left, rightValue);
- } else {
- return ObjectUtils.compare(left, right);
+ return ObjectUtils.compare(leftValue, rightValue);
+ } catch (Exception e) {
+ String leftValue = parseString(left);
+ String rightValue = parseString(right);
+ return ObjectUtils.compare(leftValue, rightValue);
}
}
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java
index ad39558a11..a88b17f6ba 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DoubleParser.java
@@ -24,6 +24,7 @@ import net.sf.jsqlparser.expression.DoubleValue;
/**
* LongParser
+ *
*/
@TransformParser(values = DoubleValue.class)
public class DoubleParser implements ValueParser {
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java
new file mode 100644
index 0000000000..67e4e331d4
--- /dev/null
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformExpressionOperatorsProcessor.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.converter.DoubleConverter;
+import org.apache.inlong.sdk.transform.process.converter.LongConverter;
+import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * TestArithmeticFunctionsTransformProcessor
+ * description: test the arithmetic functions in transform processor
+ */
+public class TestTransformExpressionOperatorsProcessor {
+
+ private static final List<FieldInfo> srcFields = new ArrayList<>();
+ private static final List<FieldInfo> dstFields = new ArrayList<>();
+ private static final CsvSourceInfo csvSource;
+ private static final KvSinkInfo kvSink;
+
+ static {
+ srcFields.add(new FieldInfo("numeric1", new DoubleConverter()));
+ srcFields.add(new FieldInfo("string2",
TypeConverter.DefaultTypeConverter()));
+ srcFields.add(new FieldInfo("numeric3", new DoubleConverter()));
+ srcFields.add(new FieldInfo("numeric4", new LongConverter()));
+
+ FieldInfo field = new FieldInfo();
+ field.setName("result");
+ dstFields.add(field);
+ csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields);
+ kvSink = new KvSinkInfo("UTF-8", dstFields);
+ }
+
+ @Test
+ public void testEqualsToOperator() throws Exception {
+ String transformSql = "select if(string2 = 4,1,0) from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1: "3.14159265358979323846|4a|4|8"
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output1 =
processor.transform("3.14159265358979323846|4a|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=0");
+ // case2: "3.14159265358979323846|4|4|8"
+ List<String> output2 =
processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output2.get(0), "result=1");
+
+ transformSql = "select if(numeric3 = 4,1,0) from source";
+ config = new TransformConfig(transformSql);
+ // case3: "3.14159265358979323846|4|4|8"
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output3 =
processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output3.get(0), "result=1");
+ // case4: "3.14159265358979323846|4|4.2|8"
+ List<String> output4 =
processor.transform("3.14159265358979323846|4|4.2|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output4.get(0), "result=0");
+ }
+
+ @Test
+ public void testNotEqualsToOperator() throws Exception {
+ String transformSql = "select if(string2 != 4,1,0) from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1: "3.14159265358979323846|4a|4|8"
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output1 =
processor.transform("3.14159265358979323846|4a|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=1");
+ // case2: "3.14159265358979323846|4|4|8"
+ List<String> output2 =
processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output2.get(0), "result=0");
+
+ transformSql = "select if(numeric3 != 4,1,0) from source";
+ config = new TransformConfig(transformSql);
+ // case3: "3.14159265358979323846|4|4|8"
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output3 =
processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output3.get(0), "result=0");
+ // case4: "3.14159265358979323846|4|4.2|8"
+ List<String> output4 =
processor.transform("3.14159265358979323846|4|4.2|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output4.get(0), "result=1");
+ }
+
+ @Test
+ public void testGreaterThanEqualsOperator() throws Exception {
+ String transformSql = "select if(string2 >= 4,1,0) from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1: "3.14159265358979323846|3a|4|8"
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output1 =
processor.transform("3.14159265358979323846|3a|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=0");
+ // case2: "3.14159265358979323846|5|4|8"
+ List<String> output2 =
processor.transform("3.14159265358979323846|5|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output2.get(0), "result=1");
+
+ transformSql = "select if(numeric3 >= 4,1,0) from source";
+ config = new TransformConfig(transformSql);
+ // case3: "3.14159265358979323846|4|4|8"
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output3 =
processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output3.get(0), "result=1");
+ // case4: "3.14159265358979323846|4|3.2|8"
+ List<String> output4 =
processor.transform("3.14159265358979323846|4|3.2|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output4.get(0), "result=0");
+ }
+
+ @Test
+ public void testGreaterThanOperator() throws Exception {
+ String transformSql = "select if(string2 > 4.1,1,0) from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1: "3.14159265358979323846|3a|4|8"
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output1 =
processor.transform("3.14159265358979323846|3a|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=0");
+ // case2: "3.14159265358979323846|5|4|8"
+ List<String> output2 =
processor.transform("3.14159265358979323846|5|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output2.get(0), "result=1");
+
+ transformSql = "select if(numeric3 > 4.1,1,0) from source";
+ config = new TransformConfig(transformSql);
+ // case3: "3.14159265358979323846|4|4|8"
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output3 =
processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output3.get(0), "result=0");
+ // case4: "3.14159265358979323846|4|4.2|8"
+ List<String> output4 =
processor.transform("3.14159265358979323846|4|4.2|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output4.get(0), "result=1");
+ }
+
+ @Test
+ public void testMinorThanEqualsOperator() throws Exception {
+ String transformSql = "select if(string2 <= 4,1,0) from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1: "3.14159265358979323846|3a|4|8"
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output1 =
processor.transform("3.14159265358979323846|3a|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=1");
+ // case2: "3.14159265358979323846|5|4|8"
+ List<String> output2 =
processor.transform("3.14159265358979323846|5|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output2.get(0), "result=0");
+
+ transformSql = "select if(numeric3 <= 4,1,0) from source";
+ config = new TransformConfig(transformSql);
+ // case3: "3.14159265358979323846|4|4|8"
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output3 =
processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output3.get(0), "result=1");
+ // case4: "3.14159265358979323846|4|4.2|8"
+ List<String> output4 =
processor.transform("3.14159265358979323846|4|4.2|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output4.get(0), "result=0");
+ }
+
+ @Test
+ public void testMinorThanOperator() throws Exception {
+ String transformSql = "select if(string2 < 4.1,1,0) from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1: "3.14159265358979323846|3a|4|8"
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output1 =
processor.transform("3.14159265358979323846|3a|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=1");
+ // case2: "3.14159265358979323846|5|4|8"
+ List<String> output2 =
processor.transform("3.14159265358979323846|5|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output2.get(0), "result=0");
+
+ transformSql = "select if(numeric3 < 4,1,0) from source";
+ config = new TransformConfig(transformSql);
+ // case3: "3.14159265358979323846|4|4|8"
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output3 =
processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output3.get(0), "result=0");
+ // case4: "3.14159265358979323846|4|3.2|8"
+ List<String> output4 =
processor.transform("3.14159265358979323846|4|3.2|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output4.get(0), "result=1");
+ }
+
+ @Test
+ public void testNotOperator() throws Exception {
+ String transformSql = "select if(!(string2 < 4),1,0) from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1: "3.14159265358979323846|3a|4|8"
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output1 =
processor.transform("3.14159265358979323846|3a|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=0");
+ // case2: "3.14159265358979323846|5|4|8"
+ List<String> output2 =
processor.transform("3.14159265358979323846|5|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output2.get(0), "result=1");
+
+ transformSql = "select if(!(numeric3 < 3.9),1,0) from source";
+ config = new TransformConfig(transformSql);
+ // case3: "3.14159265358979323846|4|4|8"
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output3 =
processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output3.get(0), "result=1");
+ // case4: "3.14159265358979323846|4|3.2|8"
+ List<String> output4 =
processor.transform("3.14159265358979323846|4|3.2|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output4.get(0), "result=0");
+ }
+
+ @Test
+ public void testOrOperator() throws Exception {
+ String transformSql = "select if((string2 < 4) or (numeric4 > 5),1,0)
from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1: "3.14159265358979323846|3a|4|8"
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output1 =
processor.transform("3.14159265358979323846|3a|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=1");
+ // case2: "3.14159265358979323846|5|4|8"
+ List<String> output2 =
processor.transform("3.14159265358979323846|5|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output2.get(0), "result=1");
+ // case3: "3.14159265358979323846|5|4|4"
+ List<String> output3 =
processor.transform("3.14159265358979323846|5|4|4");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output3.get(0), "result=0");
+
+ transformSql = "select if((numeric3 < 4) or (numeric4 > 5),1,0) from
source";
+ config = new TransformConfig(transformSql);
+ // case4: "3.14159265358979323846|4|4|8"
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output4 =
processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output4.get(0), "result=1");
+ // case5: "3.14159265358979323846|4|3.2|8"
+ List<String> output5 =
processor.transform("3.14159265358979323846|4|3.2|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output5.get(0), "result=1");
+ // case6: "3.14159265358979323846|4|4.2|5"
+ List<String> output6 =
processor.transform("3.14159265358979323846|4|4.2|5");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output6.get(0), "result=0");
+ }
+
+ @Test
+ public void testAndOperator() throws Exception {
+ String transformSql = "select if((string2 < 4) and (numeric4 > 5),1,0)
from source";
+ TransformConfig config = new TransformConfig(transformSql);
+ // case1: "3.14159265358979323846|3a|4|4"
+ TransformProcessor<String, String> processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output1 =
processor.transform("3.14159265358979323846|3a|4|4");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output1.get(0), "result=0");
+ // case2: "3.14159265358979323846|5|4|8"
+ List<String> output2 =
processor.transform("3.14159265358979323846|5|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output2.get(0), "result=0");
+ // case3: "3.14159265358979323846|3|4|8"
+ List<String> output3 =
processor.transform("3.14159265358979323846|3|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output3.get(0), "result=1");
+
+ transformSql = "select if((numeric3 < 4) and (numeric4 > 5),1,0) from
source";
+ config = new TransformConfig(transformSql);
+ // case4: "3.14159265358979323846|4|4|8"
+ processor = TransformProcessor
+ .create(config,
SourceDecoderFactory.createCsvDecoder(csvSource),
+ SinkEncoderFactory.createKvEncoder(kvSink));
+ List<String> output4 =
processor.transform("3.14159265358979323846|4|4|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output4.get(0), "result=0");
+ // case5: "3.14159265358979323846|4|3.2|4"
+ List<String> output5 =
processor.transform("3.14159265358979323846|4|3.2|4");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output5.get(0), "result=0");
+ // case6: "3.14159265358979323846|4|3.2|8"
+ List<String> output6 =
processor.transform("3.14159265358979323846|4|3.2|8");
+ Assert.assertEquals(1, output1.size());
+ Assert.assertEquals(output6.get(0), "result=1");
+ }
+}