Repository: beam Updated Branches: refs/heads/DSL_SQL aa07a1d41 -> a6845a35f
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index 1979a00..22ffaad 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -20,7 +20,6 @@ package org.apache.beam.dsls.sql.schema; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Date; import java.util.List; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; @@ -55,10 +54,8 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ @Override public void encode(BeamSQLRow value, OutputStream outStream, org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - recordTypeCoder.encode(value.getDataType(), outStream, context); - listCoder.encode(value.getNullFields(), outStream, context); - - Context nested = context.nested(); + recordTypeCoder.encode(value.getDataType(), outStream, context.nested()); + listCoder.encode(value.getNullFields(), outStream, context.nested()); for (int idx = 0; idx < value.size(); ++idx) { if (value.getNullFields().contains(idx)) { @@ -67,36 +64,38 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ switch (value.getDataType().getFieldsType().get(idx)) { case INTEGER: + intCoder.encode(value.getInteger(idx), outStream, context.nested()); + break; case SMALLINT: case TINYINT: - intCoder.encode(value.getInteger(idx), outStream, nested); + intCoder.encode((int) value.getShort(idx), outStream, context.nested()); break; case DOUBLE: + doubleCoder.encode(value.getDouble(idx), outStream, context.nested()); + break; case FLOAT: - doubleCoder.encode(value.getDouble(idx), outStream, nested); + doubleCoder.encode((double) value.getFloat(idx), outStream, context.nested()); break; case BIGINT: - longCoder.encode(value.getLong(idx), outStream, nested); + longCoder.encode(value.getLong(idx), outStream, context.nested()); break; case VARCHAR: - stringCoder.encode(value.getString(idx), outStream, nested); - break; - case TIME: - case TIMESTAMP: - longCoder.encode(value.getDate(idx).getTime(), outStream, nested); + stringCoder.encode(value.getString(idx), outStream, context.nested()); break; default: throw new UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx)); } } + //add a dummy field to indicate the end of record + intCoder.encode(value.size(), outStream, context); } @Override public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException { - BeamSQLRecordType type = recordTypeCoder.decode(inStream, context); - List<Integer> nullFields = listCoder.decode(inStream, context); + BeamSQLRecordType type = recordTypeCoder.decode(inStream, context.nested()); + List<Integer> nullFields = listCoder.decode(inStream, context.nested()); BeamSQLRow record = new BeamSQLRow(type); record.setNullFields(nullFields); @@ -108,29 +107,32 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{ switch (type.getFieldsType().get(idx)) { case INTEGER: + record.addField(idx, intCoder.decode(inStream, context.nested())); + break; case SMALLINT: + record.addField(idx, intCoder.decode(inStream, context.nested()).shortValue()); + break; case TINYINT: - record.addField(idx, intCoder.decode(inStream, context)); + record.addField(idx, intCoder.decode(inStream, context.nested()).byteValue()); break; case DOUBLE: + record.addField(idx, doubleCoder.decode(inStream, context.nested())); + break; case FLOAT: - record.addField(idx, doubleCoder.decode(inStream, context)); + record.addField(idx, doubleCoder.decode(inStream, context.nested()).floatValue()); break; case BIGINT: - record.addField(idx, longCoder.decode(inStream, context)); + record.addField(idx, longCoder.decode(inStream, context.nested())); break; case VARCHAR: - record.addField(idx, stringCoder.decode(inStream, context)); - break; - case TIME: - case TIMESTAMP: - record.addField(idx, new Date(longCoder.decode(inStream, context))); + record.addField(idx, stringCoder.decode(inStream, context.nested())); break; default: throw new UnsupportedDataTypeException(type.getFieldsType().get(idx)); } } + intCoder.decode(inStream, context); return record; } http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/InvalidFieldException.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/InvalidFieldException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/InvalidFieldException.java deleted file mode 100644 index c929a83..0000000 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/InvalidFieldException.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema; - -/** - * Exception when the field is invalid. - * - */ -public class InvalidFieldException extends RuntimeException { - - public InvalidFieldException() { - super(); - } - - public InvalidFieldException(String message) { - super(message); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java index 915a5cc..0f40f33 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java @@ -35,11 +35,6 @@ import org.slf4j.LoggerFactory; */ public class BeamKafkaCSVTable extends BeamKafkaTable { - /** - * - */ - private static final long serialVersionUID = 4754022536543333984L; - public static final String DELIMITER = ","; private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaCSVTable.class); http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java index adf4621..c8c851c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java @@ -40,11 +40,6 @@ import org.apache.calcite.rel.type.RelProtoDataType; */ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable { - /** - * - */ - private static final long serialVersionUID = -634715473399906527L; - private String bootstrapServers; private List<String> topics; private Map<String, Object> configUpdates; http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java index 55086e2..2ab6301 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java @@ -28,10 +28,6 @@ import org.apache.beam.sdk.transforms.DoFn; * */ public class BeamSQLFilterFn extends DoFn<BeamSQLRow, BeamSQLRow> { - /** - * - */ - private static final long serialVersionUID = -1256111753670606705L; private String stepName; private BeamSQLExpressionExecutor executor; http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java index 92ebff2..c146ea5 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java @@ -25,10 +25,6 @@ import org.apache.beam.sdk.transforms.DoFn; * */ public class BeamSQLOutputToConsoleFn extends DoFn<BeamSQLRow, Void> { - /** - * - */ - private static final long serialVersionUID = -1256111753670606705L; private String stepName; http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java index bafdd17..d018057 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java @@ -30,11 +30,6 @@ import org.apache.beam.sdk.transforms.DoFn; * */ public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> { - - /** - * - */ - private static final long serialVersionUID = -1046605249999014608L; private String stepName; private BeamSQLExpressionExecutor executor; private BeamSQLRecordType outputRecordType; http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java new file mode 100644 index 0000000..abbe3f7 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter; + +import java.math.BigDecimal; +import java.util.Arrays; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.rel.BeamFilterRel; +import org.apache.beam.dsls.sql.rel.BeamProjectRel; +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Unit test cases for {@link BeamSQLFnExecutor}. + */ +public class BeamSQLFnExecutorTest extends BeamSQLFnExecutorTestBase { + + @Test + public void testBeamFilterRel() { + RexNode condition = rexBuilder.makeCall(SqlStdOperatorTable.AND, + Arrays.asList( + rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, + Arrays.asList(rexBuilder.makeInputRef(relDataType, 0), + rexBuilder.makeBigintLiteral(new BigDecimal(1000L)))), + rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, + Arrays.asList(rexBuilder.makeInputRef(relDataType, 1), + rexBuilder.makeExactLiteral(new BigDecimal(0)))))); + + BeamFilterRel beamFilterRel = new BeamFilterRel(cluster, RelTraitSet.createEmpty(), null, + condition); + + BeamSQLFnExecutor executor = new BeamSQLFnExecutor(beamFilterRel); + executor.prepare(); + + Assert.assertEquals(1, executor.exps.size()); + + BeamSqlExpression l1Exp = executor.exps.get(0); + Assert.assertTrue(l1Exp instanceof BeamSqlAndExpression); + Assert.assertEquals(SqlTypeName.BOOLEAN, l1Exp.getOutputType()); + + Assert.assertEquals(2, l1Exp.getOperands().size()); + BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0); + BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1); + + Assert.assertTrue(l1Left instanceof BeamSqlLessThanEqualExpression); + Assert.assertTrue(l1Right instanceof BeamSqlEqualExpression); + + Assert.assertEquals(2, l1Left.getOperands().size()); + BeamSqlExpression l1LeftLeft = (BeamSqlExpression) l1Left.getOperands().get(0); + BeamSqlExpression l1LeftRight = (BeamSqlExpression) l1Left.getOperands().get(1); + Assert.assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression); + Assert.assertTrue(l1LeftRight instanceof BeamSqlPrimitive); + + Assert.assertEquals(2, l1Right.getOperands().size()); + BeamSqlExpression l1RightLeft = (BeamSqlExpression) l1Right.getOperands().get(0); + BeamSqlExpression l1RightRight = (BeamSqlExpression) l1Right.getOperands().get(1); + Assert.assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression); + Assert.assertTrue(l1RightRight instanceof BeamSqlPrimitive); + } + + @Test + public void testBeamProjectRel() { + BeamRelNode relNode = new BeamProjectRel(cluster, RelTraitSet.createEmpty(), + relBuilder.values(relDataType, 1234567L, 0, 8.9, null).build(), + rexBuilder.identityProjects(relDataType), relDataType); + BeamSQLFnExecutor executor = new BeamSQLFnExecutor(relNode); + + executor.prepare(); + Assert.assertEquals(4, executor.exps.size()); + Assert.assertTrue(executor.exps.get(0) instanceof BeamSqlInputRefExpression); + Assert.assertTrue(executor.exps.get(1) instanceof BeamSqlInputRefExpression); + Assert.assertTrue(executor.exps.get(2) instanceof BeamSqlInputRefExpression); + Assert.assertTrue(executor.exps.get(3) instanceof BeamSqlInputRefExpression); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTestBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTestBase.java new file mode 100644 index 0000000..bfc7366 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTestBase.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.dsls.sql.planner.BeamRelDataTypeSystem; +import org.apache.beam.dsls.sql.planner.BeamRuleSets; +import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.volcano.VolcanoPlanner; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.RelBuilder; +import org.junit.BeforeClass; + +/** + * base class to test {@link BeamSQLFnExecutor} and subclasses of {@link BeamSqlExpression}. + */ +public class BeamSQLFnExecutorTestBase { + public static RexBuilder rexBuilder = new RexBuilder(BeamQueryPlanner.TYPE_FACTORY); + public static RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(), rexBuilder); + + public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl( + RelDataTypeSystem.DEFAULT); + public static RelDataType relDataType; + + public static BeamSQLRecordType beamRecordType; + public static BeamSQLRow record; + + public static RelBuilder relBuilder; + + @BeforeClass + public static void prepare() { + relDataType = TYPE_FACTORY.builder() + .add("order_id", SqlTypeName.BIGINT) + .add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE) + .add("order_time", SqlTypeName.BIGINT).build(); + + beamRecordType = BeamSQLRecordType.from(relDataType); + record = new BeamSQLRow(beamRecordType); + + record.addField(0, 1234567L); + record.addField(1, 0); + record.addField(2, 8.9); + record.addField(3, 1234567L); + + SchemaPlus schema = Frameworks.createRootSchema(true); + final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>(); + traitDefs.add(ConventionTraitDef.INSTANCE); + traitDefs.add(RelCollationTraitDef.INSTANCE); + FrameworkConfig config = Frameworks.newConfigBuilder() + .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema) + .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets()) + .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build(); + + relBuilder = RelBuilder.create(config); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java new file mode 100644 index 0000000..a328c88 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for {@link BeamSqlIsNullExpression} and + * {@link BeamSqlIsNotNullExpression}. + */ +public class BeamNullExperssionTest extends BeamSQLFnExecutorTestBase { + + @Test + public void testIsNull() { + BeamSqlIsNullExpression exp1 = new BeamSqlIsNullExpression( + new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0)); + Assert.assertEquals(false, exp1.evaluate(record).getValue()); + + BeamSqlIsNullExpression exp2 = new BeamSqlIsNullExpression( + BeamSqlPrimitive.of(SqlTypeName.BIGINT, null)); + Assert.assertEquals(true, exp2.evaluate(record).getValue()); + } + + @Test + public void testIsNotNull() { + BeamSqlIsNotNullExpression exp1 = new BeamSqlIsNotNullExpression( + new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0)); + Assert.assertEquals(true, exp1.evaluate(record).getValue()); + + BeamSqlIsNotNullExpression exp2 = new BeamSqlIsNotNullExpression( + BeamSqlPrimitive.of(SqlTypeName.BIGINT, null)); + Assert.assertEquals(false, exp2.evaluate(record).getValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java new file mode 100644 index 0000000..9dabcdc --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for {@link BeamSqlAndExpression}, {@link BeamSqlOrExpression}. + */ +public class BeamSqlAndOrExpressionTest extends BeamSQLFnExecutorTestBase { + + @Test + public void testAnd() { + List<BeamSqlExpression> operands = new ArrayList<>(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + + Assert.assertTrue(new BeamSqlAndExpression(operands).evaluate(record).getValue()); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); + + Assert.assertFalse(new BeamSqlAndExpression(operands).evaluate(record).getValue()); + } + + @Test + public void testOr() { + List<BeamSqlExpression> operands = new ArrayList<>(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false)); + + Assert.assertFalse(new BeamSqlOrExpression(operands).evaluate(record).getValue()); + + operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true)); + + Assert.assertTrue(new BeamSqlOrExpression(operands).evaluate(record).getValue()); + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java new file mode 100644 index 0000000..b88de71 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.Arrays; +import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for the collections of {@link BeamSqlCompareExpression}. + */ +public class BeamSqlCompareExpressionTest extends BeamSQLFnExecutorTestBase { + + @Test + public void testEqual() { + BeamSqlEqualExpression exp1 = new BeamSqlEqualExpression( + Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0), + BeamSqlPrimitive.of(SqlTypeName.BIGINT, 100L))); + Assert.assertEquals(false, exp1.evaluate(record).getValue()); + + BeamSqlEqualExpression exp2 = new BeamSqlEqualExpression( + Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0), + BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L))); + Assert.assertEquals(true, exp2.evaluate(record).getValue()); + } + + @Test + public void testLargerThan(){ + BeamSqlLargerThanExpression exp1 = new BeamSqlLargerThanExpression( + Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0), + BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L))); + Assert.assertEquals(false, exp1.evaluate(record).getValue()); + + BeamSqlLargerThanExpression exp2 = new BeamSqlLargerThanExpression( + Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0), + BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234566L))); + Assert.assertEquals(true, exp2.evaluate(record).getValue()); + } + + @Test + public void testLargerThanEqual(){ + BeamSqlLargerThanEqualExpression exp1 = new BeamSqlLargerThanEqualExpression( + Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0), + BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L))); + Assert.assertEquals(true, exp1.evaluate(record).getValue()); + + BeamSqlLargerThanEqualExpression exp2 = new BeamSqlLargerThanEqualExpression( + Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0), + BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234568L))); + Assert.assertEquals(false, exp2.evaluate(record).getValue()); + } + + @Test + public void testLessThan(){ + BeamSqlLessThanExpression exp1 = new BeamSqlLessThanExpression( + Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1), + BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1))); + Assert.assertEquals(true, exp1.evaluate(record).getValue()); + + BeamSqlLessThanExpression exp2 = new BeamSqlLessThanExpression( + Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1), + BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1))); + Assert.assertEquals(false, exp2.evaluate(record).getValue()); + } + + @Test + public void testLessThanEqual(){ + BeamSqlLessThanEqualExpression exp1 = new BeamSqlLessThanEqualExpression( + Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2), + BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.9))); + Assert.assertEquals(true, exp1.evaluate(record).getValue()); + + BeamSqlLessThanEqualExpression exp2 = new BeamSqlLessThanEqualExpression( + Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2), + BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.0))); + Assert.assertEquals(false, exp2.evaluate(record).getValue()); + } + + @Test + public void testNotEqual(){ + BeamSqlNotEqualExpression exp1 = new BeamSqlNotEqualExpression( + Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3), + BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L))); + Assert.assertEquals(false, exp1.evaluate(record).getValue()); + + BeamSqlNotEqualExpression exp2 = new BeamSqlNotEqualExpression( + Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3), + BeamSqlPrimitive.of(SqlTypeName.BIGINT, 0L))); + Assert.assertEquals(true, exp2.evaluate(record).getValue()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java new file mode 100644 index 0000000..1cadeb0 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException; +import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for {@link BeamSqlInputRefExpression}. + */ +public class BeamSqlInputRefExpressionTest extends BeamSQLFnExecutorTestBase { + + @Test + public void testRefInRange() { + BeamSqlInputRefExpression ref0 = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0); + Assert.assertEquals(record.getLong(0), ref0.evaluate(record).getValue()); + + BeamSqlInputRefExpression ref1 = new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1); + Assert.assertEquals(record.getInteger(1), ref1.evaluate(record).getValue()); + + BeamSqlInputRefExpression ref2 = new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2); + Assert.assertEquals(record.getDouble(2), ref2.evaluate(record).getValue()); + + BeamSqlInputRefExpression ref3 = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3); + Assert.assertEquals(record.getLong(3), ref3.evaluate(record).getValue()); + } + + + @Test(expected = IndexOutOfBoundsException.class) + public void testRefOutOfRange(){ + BeamSqlInputRefExpression ref = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 4); + ref.evaluate(record).getValue(); + } + + @Test(expected = BeamInvalidOperatorException.class) + public void testTypeUnMatch(){ + BeamSqlInputRefExpression ref = new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 0); + ref.evaluate(record).getValue(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java new file mode 100644 index 0000000..adb8de9 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException; +import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for {@link BeamSqlPrimitive}. + * + */ +public class BeamSqlPrimitiveTest extends BeamSQLFnExecutorTestBase { + + @Test + public void testPrimitiveInt(){ + BeamSqlPrimitive<Integer> expInt = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100); + Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue()); + } + + @Test(expected = BeamInvalidOperatorException.class) + public void testPrimitiveTypeUnMatch1(){ + BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100L); + Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue()); + } + @Test(expected = BeamInvalidOperatorException.class) + public void testPrimitiveTypeUnMatch2(){ + BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.DECIMAL, 100L); + Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue()); + } + @Test(expected = BeamInvalidOperatorException.class) + public void testPrimitiveTypeUnMatch3(){ + BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.FLOAT, 100L); + Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue()); + } + @Test(expected = BeamInvalidOperatorException.class) + public void testPrimitiveTypeUnMatch4(){ + BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 100L); + Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue()); + } + + +} http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java index 733b056..625fb71 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java @@ -28,6 +28,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.AfterClass; import org.junit.BeforeClass; /** @@ -35,21 +36,33 @@ import org.junit.BeforeClass; * */ public class BasePlanner { - public static BeamSqlRunner runner = new BeamSqlRunner(); + public static BeamSqlRunner runner; @BeforeClass public static void prepare() { + runner = new BeamSqlRunner(); + runner.addTable("ORDER_DETAILS", getTable()); runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders")); runner.addTable("SUB_ORDER_RAM", getTable()); } + @AfterClass + public static void close(){ + runner = null; + } + private static BaseBeamTable getTable() { final RelProtoDataType protoRowType = new RelProtoDataType() { @Override public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) - .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); + return a0.builder() + .add("order_id", SqlTypeName.BIGINT) + .add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE) + .add("shipping", SqlTypeName.FLOAT) + .add("notes", SqlTypeName.VARCHAR) + .build(); } }; @@ -60,8 +73,13 @@ public class BasePlanner { final RelProtoDataType protoRowType = new RelProtoDataType() { @Override public RelDataType apply(RelDataTypeFactory a0) { - return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER) - .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build(); + return a0.builder() + .add("order_id", SqlTypeName.BIGINT) + .add("site_id", SqlTypeName.INTEGER) + .add("price", SqlTypeName.DOUBLE) + .add("shipping", SqlTypeName.FLOAT) + .add("notes", SqlTypeName.VARCHAR) + .build(); } }; http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java index 9dde0f1..5d1052b 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java @@ -32,7 +32,7 @@ public class BeamPlannerExplainTest extends BasePlanner { String plan = runner.explainQuery(sql); String expectedPlan = - "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n" + "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], shipping=[$3], notes=[$4])\n" + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; Assert.assertEquals("explain doesn't match", expectedPlan, plan); } @@ -58,7 +58,8 @@ public class BeamPlannerExplainTest extends BasePlanner { String expectedPlan = "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n" - + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[null])\n" + + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], shipping=[null]," + + " notes=[null])\n" + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n" + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n" + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n"; http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java index d32b19b..1ca9eb3 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.dsls.sql.planner; -import org.apache.beam.sdk.Pipeline; import org.junit.Assert; import org.junit.Test; @@ -30,13 +29,11 @@ public class BeamPlannerSubmitTest extends BasePlanner { public void insertSelectFilter() throws Exception { String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; - Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); - runner.getPlanner().planner.close(); - pipeline.run().waitUntilFinish(); + runner.submitQuery(sql); Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1); - Assert.assertEquals("order_id=12345,site_id=0,price=20.5,order_time=null", + Assert.assertEquals("order_id=12345,site_id=0,price=20.5,shipping=null,notes=null", MockedBeamSQLTable.CONTENT.get(0)); } http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java index 8631a6e..538607f 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java @@ -18,7 +18,6 @@ package org.apache.beam.dsls.sql.planner; import java.util.ArrayList; -import java.util.Date; import java.util.List; import org.apache.beam.dsls.sql.schema.BaseBeamTable; @@ -39,11 +38,6 @@ import org.apache.calcite.rel.type.RelProtoDataType; */ public class MockedBeamSQLTable extends BaseBeamTable { - /** - * - */ - private static final long serialVersionUID = 1373168368414036932L; - public static final List<String> CONTENT = new ArrayList<>(); public MockedBeamSQLTable(RelProtoDataType protoRowType) { @@ -61,25 +55,25 @@ public class MockedBeamSQLTable extends BaseBeamTable { row1.addField(0, 12345L); row1.addField(1, 0); row1.addField(2, 10.5); - row1.addField(3, new Date()); + row1.addField(3, 123.4f); BeamSQLRow row2 = new BeamSQLRow(beamSqlRecordType); row2.addField(0, 12345L); row2.addField(1, 1); row2.addField(2, 20.5); - row2.addField(3, new Date()); + row2.addField(3, 234.5f); BeamSQLRow row3 = new BeamSQLRow(beamSqlRecordType); row3.addField(0, 12345L); row3.addField(1, 0); row3.addField(2, 20.5); - row3.addField(3, new Date()); + row3.addField(3, 345.6f); BeamSQLRow row4 = new BeamSQLRow(beamSqlRecordType); row4.addField(0, null); row4.addField(1, null); row4.addField(2, 20.5); - row4.addField(3, new Date()); + row4.addField(3, 456.7f); return Create.of(row1, row2, row3); }