[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5367 ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165560261 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +class CalcTest extends TableTestBase { + + @Test + def testArrayElement(): Unit = { +val util = streamTestUtil() +util.addTable[(Long, Array[(String, Int)])]("MyTable", 'a, 'b) + +val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", +"a", +"DOT(ITEM(b, 1), '_1') AS b11" --- End diff -- Will fix it. ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165560235 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -984,6 +987,63 @@ object ScalarOperators { } } + def generateDot(codeGenerator: CodeGenerator, --- End diff -- But I agree I should refactor the code in visitFieldAccess() to reuse it as much as possible. ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165527528 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -984,6 +987,63 @@ object ScalarOperators { } } + def generateDot(codeGenerator: CodeGenerator, --- End diff -- In Calcite, once it sees array element access, the subsequent field access is translated into DOT RexCall, not RexFieldAccess. Therefore, we need to a custom handling for the DOT RexCall. ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165515039 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testArrayElementAtFromTableForTuple(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +val data = List( + (1, Array((12, 45), (2, 5))), --- End diff -- Added null check. for nested tuple input, it wont work for now due to https://issues.apache.org/jira/browse/CALCITE-2162. I've submitted a fix to it, should be available in Calcite 1.16. ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165038256 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +class CalcTest extends TableTestBase { + + @Test + def testArrayElement(): Unit = { +val util = streamTestUtil() +util.addTable[(Long, Array[(String, Int)])]("MyTable", 'a, 'b) + +val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", +"a", +"DOT(ITEM(b, 1), '_1') AS b11" --- End diff -- Actually we don't need a full table test for this. As it only tests a scalar operation. Can you move the test to `CompositeAccessTest` and test it for all APIs. ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165039357 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -984,6 +987,63 @@ object ScalarOperators { } } + def generateDot(codeGenerator: CodeGenerator, --- End diff -- I think this method is not really necessary. We already have logic for accessing composite types (see `visitFieldAccess()`). Maybe we just have to make the methods there a bit more generic. ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r164266892 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +class CalcTest extends TableTestBase { --- End diff -- Remove redundant blank between `CalcTest` and `extends` ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r164266896 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -984,6 +987,63 @@ object ScalarOperators { } } + def generateDot(codeGenerator: CodeGenerator, --- End diff -- Arguments of the method should be indented. ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r164266907 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testArrayElementAtFromTableForTuple(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +val data = List( + (1, Array((12, 45), (2, 5))), --- End diff -- 1. Add null tuple input test 2. Add nested tuple input test ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r164266897 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -984,6 +987,63 @@ object ScalarOperators { } } + def generateDot(codeGenerator: CodeGenerator, + dot: RexCall, + record: GeneratedExpression, + subField: GeneratedExpression) + : GeneratedExpression = { +val nullTerm = newName("isNull") +val resultTerm = newName("result") +val resultType = FlinkTypeFactory.toTypeInfo(dot.getType) +val resultTypeTerm = boxedTypeTermForTypeInfo(resultType) +dot.operands.get(0).getType match { + case crdt: CompositeRelDataType => { +val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral] + .getValue.asInstanceOf[NlsString].getValue +if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) { + return GeneratedExpression(resultTerm, nullTerm, +s""" + |${record.code} + |${subField.code} + |${resultTypeTerm} $resultTerm = + | (${resultTypeTerm}) ${record.resultTerm}.productElement( --- End diff -- NPE will be thrown if ${record.resultTerm} is null ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r164266903 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -984,6 +987,63 @@ object ScalarOperators { } } + def generateDot(codeGenerator: CodeGenerator, + dot: RexCall, + record: GeneratedExpression, + subField: GeneratedExpression) + : GeneratedExpression = { +val nullTerm = newName("isNull") +val resultTerm = newName("result") +val resultType = FlinkTypeFactory.toTypeInfo(dot.getType) +val resultTypeTerm = boxedTypeTermForTypeInfo(resultType) +dot.operands.get(0).getType match { + case crdt: CompositeRelDataType => { +val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral] + .getValue.asInstanceOf[NlsString].getValue +if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) { + return GeneratedExpression(resultTerm, nullTerm, +s""" + |${record.code} + |${subField.code} + |${resultTypeTerm} $resultTerm = + | (${resultTypeTerm}) ${record.resultTerm}.productElement( + |${fieldName.substring(1).toInt} - 1); + |boolean $nullTerm =${resultTerm} == null; + |""".stripMargin, resultType) +} else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) { + return GeneratedExpression(resultTerm, nullTerm, +s""" + |${record.code} + |${resultTypeTerm} $resultTerm = + | (${resultTypeTerm}) ${record.resultTerm}.${fieldName}(); + |boolean $nullTerm =${resultTerm} == null; + |""".stripMargin, resultType) +} else if (crdt.compositeType.isInstanceOf[PojoTypeInfo[_]]) { + return GeneratedExpression(resultTerm, nullTerm, +s""" + |${record.code} + |${resultTypeTerm} $resultTerm = + | (${resultTypeTerm}) ${record.resultTerm}.${fieldName}; + |boolean $nullTerm =${resultTerm} == null; + |""".stripMargin, resultType) +} else if (crdt.compositeType.isInstanceOf[RowTypeInfo]) { + val fieldIndex = dot.operands.get(0).getType.asInstanceOf[CompositeRelDataType] +.compositeType.getFieldIndex(fieldName) + return GeneratedExpression(resultTerm, nullTerm, +s""" + |${record.code} + |${resultTypeTerm} $resultTerm = + | (${resultTypeTerm}) ${record.resultTerm}.getField(${fieldIndex}); --- End diff -- NPE will be thrown ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r164266898 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -984,6 +987,63 @@ object ScalarOperators { } } + def generateDot(codeGenerator: CodeGenerator, + dot: RexCall, + record: GeneratedExpression, + subField: GeneratedExpression) + : GeneratedExpression = { +val nullTerm = newName("isNull") +val resultTerm = newName("result") +val resultType = FlinkTypeFactory.toTypeInfo(dot.getType) +val resultTypeTerm = boxedTypeTermForTypeInfo(resultType) +dot.operands.get(0).getType match { + case crdt: CompositeRelDataType => { +val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral] + .getValue.asInstanceOf[NlsString].getValue +if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) { + return GeneratedExpression(resultTerm, nullTerm, +s""" + |${record.code} + |${subField.code} + |${resultTypeTerm} $resultTerm = + | (${resultTypeTerm}) ${record.resultTerm}.productElement( + |${fieldName.substring(1).toInt} - 1); + |boolean $nullTerm =${resultTerm} == null; + |""".stripMargin, resultType) +} else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) { + return GeneratedExpression(resultTerm, nullTerm, +s""" + |${record.code} + |${resultTypeTerm} $resultTerm = + | (${resultTypeTerm}) ${record.resultTerm}.${fieldName}(); --- End diff -- NPE will be thrown ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r164266900 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -984,6 +987,63 @@ object ScalarOperators { } } + def generateDot(codeGenerator: CodeGenerator, + dot: RexCall, + record: GeneratedExpression, + subField: GeneratedExpression) + : GeneratedExpression = { +val nullTerm = newName("isNull") +val resultTerm = newName("result") +val resultType = FlinkTypeFactory.toTypeInfo(dot.getType) +val resultTypeTerm = boxedTypeTermForTypeInfo(resultType) +dot.operands.get(0).getType match { + case crdt: CompositeRelDataType => { +val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral] + .getValue.asInstanceOf[NlsString].getValue +if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) { + return GeneratedExpression(resultTerm, nullTerm, +s""" + |${record.code} + |${subField.code} + |${resultTypeTerm} $resultTerm = + | (${resultTypeTerm}) ${record.resultTerm}.productElement( + |${fieldName.substring(1).toInt} - 1); + |boolean $nullTerm =${resultTerm} == null; + |""".stripMargin, resultType) +} else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) { + return GeneratedExpression(resultTerm, nullTerm, +s""" + |${record.code} + |${resultTypeTerm} $resultTerm = + | (${resultTypeTerm}) ${record.resultTerm}.${fieldName}(); + |boolean $nullTerm =${resultTerm} == null; + |""".stripMargin, resultType) +} else if (crdt.compositeType.isInstanceOf[PojoTypeInfo[_]]) { + return GeneratedExpression(resultTerm, nullTerm, +s""" + |${record.code} + |${resultTypeTerm} $resultTerm = + | (${resultTypeTerm}) ${record.resultTerm}.${fieldName}; --- End diff -- NPE will be thrown ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
GitHub user suez1224 opened a pull request: https://github.com/apache/flink/pull/5367 [FLINK-7923][Table API & SQL] Support field access of composite array element in SQL Note: This is based on FLINK-7934, will rebase once FLINK-7934 is resolved. ## What is the purpose of the change Support field access of composite array element in SQL. ## Brief change log - add handling to calcite dot operator to support field access of composite array element in SQL - add unittests to verify that it works for tuple array, row array, pojo array and case class array ## Verifying this change This change added tests and can be verified as follows: - *Added unittests to verify the query plan* - *Added integration tests for batch/streaming for pojo/case class/tuple/row type* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/suez1224/flink flink-7923 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5367.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5367 commit 9e915e4144703582843b0f31bffc1481648d0119 Author: Shuyi Chen Date: 2018-01-10T00:52:56Z Upgrade to Calcite 1.15 commit 7a8328e4750ae95196f0b8ba20c6dff22c59ec08 Author: Shuyi Chen Date: 2018-01-25T23:36:36Z Support access of subfields of Array element if the element is a composite type (e.g. case class, pojo, tuple or row). ---