[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-02-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5367


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-02-01 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165560261
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class CalcTest  extends TableTestBase {
+
+  @Test
+  def testArrayElement(): Unit = {
+val util = streamTestUtil()
+util.addTable[(Long, Array[(String, Int)])]("MyTable", 'a, 'b)
+
+val expected = unaryNode(
+  "DataStreamCalc",
+  streamTableNode(0),
+  term("select",
+"a",
+"DOT(ITEM(b, 1), '_1') AS b11"
--- End diff --

Will fix it.


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-02-01 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165560235
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
--- End diff --

But I agree I should refactor the code in visitFieldAccess() to reuse it as 
much as possible.


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-02-01 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165527528
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
--- End diff --

In Calcite, once it sees array element access, the subsequent field access 
is translated into DOT RexCall, not RexFieldAccess. Therefore, we need to a 
custom handling for the DOT RexCall.


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-02-01 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165515039
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testArrayElementAtFromTableForTuple(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val data = List(
+  (1, Array((12, 45), (2, 5))),
--- End diff --

Added null check.
for nested tuple input, it wont work for now due to 
https://issues.apache.org/jira/browse/CALCITE-2162. I've submitted a fix to it, 
should be available in Calcite 1.16.


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-31 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165038256
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class CalcTest  extends TableTestBase {
+
+  @Test
+  def testArrayElement(): Unit = {
+val util = streamTestUtil()
+util.addTable[(Long, Array[(String, Int)])]("MyTable", 'a, 'b)
+
+val expected = unaryNode(
+  "DataStreamCalc",
+  streamTableNode(0),
+  term("select",
+"a",
+"DOT(ITEM(b, 1), '_1') AS b11"
--- End diff --

Actually we don't need a full table test for this. As it only tests a 
scalar operation. Can you move the test to `CompositeAccessTest` and test it 
for all APIs.


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-31 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165039357
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
--- End diff --

I think this method is not really necessary. We already have logic for 
accessing composite types (see `visitFieldAccess()`). Maybe we just have to 
make the methods there a bit more generic.


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266892
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class CalcTest  extends TableTestBase {
--- End diff --

Remove redundant blank between `CalcTest` and `extends`


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266896
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
--- End diff --

Arguments of the method should be indented. 


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266907
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testArrayElementAtFromTableForTuple(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val data = List(
+  (1, Array((12, 45), (2, 5))),
--- End diff --

1. Add null tuple input test
2. Add nested tuple input test


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266897
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
+  dot: RexCall,
+  record: GeneratedExpression,
+  subField: GeneratedExpression)
+  : GeneratedExpression = {
+val nullTerm = newName("isNull")
+val resultTerm = newName("result")
+val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
+val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
+dot.operands.get(0).getType match {
+  case crdt: CompositeRelDataType => {
+val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
+  .getValue.asInstanceOf[NlsString].getValue
+if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
+   return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${subField.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) 
${record.resultTerm}.productElement(
--- End diff --

NPE will be thrown if ${record.resultTerm} is null 


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266903
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
+  dot: RexCall,
+  record: GeneratedExpression,
+  subField: GeneratedExpression)
+  : GeneratedExpression = {
+val nullTerm = newName("isNull")
+val resultTerm = newName("result")
+val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
+val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
+dot.operands.get(0).getType match {
+  case crdt: CompositeRelDataType => {
+val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
+  .getValue.asInstanceOf[NlsString].getValue
+if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
+   return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${subField.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) 
${record.resultTerm}.productElement(
+   |${fieldName.substring(1).toInt} - 1);
+   |boolean $nullTerm =${resultTerm} == null;
+   |""".stripMargin, resultType)
+} else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) {
+  return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName}();
+   |boolean $nullTerm =${resultTerm} == null;
+   |""".stripMargin, resultType)
+} else if (crdt.compositeType.isInstanceOf[PojoTypeInfo[_]]) {
+  return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName};
+   |boolean $nullTerm =${resultTerm} == null;
+   |""".stripMargin, resultType)
+} else if (crdt.compositeType.isInstanceOf[RowTypeInfo]) {
+  val fieldIndex = 
dot.operands.get(0).getType.asInstanceOf[CompositeRelDataType]
+.compositeType.getFieldIndex(fieldName)
+  return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) 
${record.resultTerm}.getField(${fieldIndex});
--- End diff --

NPE will be thrown


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266898
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
+  dot: RexCall,
+  record: GeneratedExpression,
+  subField: GeneratedExpression)
+  : GeneratedExpression = {
+val nullTerm = newName("isNull")
+val resultTerm = newName("result")
+val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
+val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
+dot.operands.get(0).getType match {
+  case crdt: CompositeRelDataType => {
+val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
+  .getValue.asInstanceOf[NlsString].getValue
+if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
+   return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${subField.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) 
${record.resultTerm}.productElement(
+   |${fieldName.substring(1).toInt} - 1);
+   |boolean $nullTerm =${resultTerm} == null;
+   |""".stripMargin, resultType)
+} else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) {
+  return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName}();
--- End diff --

NPE will be thrown 


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-27 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r164266900
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
+  dot: RexCall,
+  record: GeneratedExpression,
+  subField: GeneratedExpression)
+  : GeneratedExpression = {
+val nullTerm = newName("isNull")
+val resultTerm = newName("result")
+val resultType = FlinkTypeFactory.toTypeInfo(dot.getType)
+val resultTypeTerm = boxedTypeTermForTypeInfo(resultType)
+dot.operands.get(0).getType match {
+  case crdt: CompositeRelDataType => {
+val fieldName = dot.operands.get(1).asInstanceOf[RexLiteral]
+  .getValue.asInstanceOf[NlsString].getValue
+if (crdt.compositeType.isInstanceOf[TupleTypeInfo[_]]) {
+   return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${subField.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) 
${record.resultTerm}.productElement(
+   |${fieldName.substring(1).toInt} - 1);
+   |boolean $nullTerm =${resultTerm} == null;
+   |""".stripMargin, resultType)
+} else if (crdt.compositeType.isInstanceOf[CaseClassTypeInfo[_]]) {
+  return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName}();
+   |boolean $nullTerm =${resultTerm} == null;
+   |""".stripMargin, resultType)
+} else if (crdt.compositeType.isInstanceOf[PojoTypeInfo[_]]) {
+  return GeneratedExpression(resultTerm, nullTerm,
+s"""
+   |${record.code}
+   |${resultTypeTerm} $resultTerm =
+   |  (${resultTypeTerm}) ${record.resultTerm}.${fieldName};
--- End diff --

NPE will be thrown 


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-01-25 Thread suez1224
GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/5367

[FLINK-7923][Table API & SQL] Support field access of composite array 
element in SQL

Note: This is based on FLINK-7934, will rebase once FLINK-7934 is resolved.

## What is the purpose of the change

Support field access of composite array element in SQL. 


## Brief change log

  - add handling to calcite dot operator to support field access of 
composite array element in SQL
  - add unittests to verify that it works for tuple array, row array, pojo 
array and case class array



## Verifying this change

This change added tests and can be verified as follows:

  - *Added unittests to verify the query plan*
  - *Added integration tests for batch/streaming for pojo/case 
class/tuple/row type*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink flink-7923

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5367.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5367


commit 9e915e4144703582843b0f31bffc1481648d0119
Author: Shuyi Chen 
Date:   2018-01-10T00:52:56Z

Upgrade to Calcite 1.15

commit 7a8328e4750ae95196f0b8ba20c6dff22c59ec08
Author: Shuyi Chen 
Date:   2018-01-25T23:36:36Z

Support access of subfields of Array element if the element is a composite 
type (e.g. case class, pojo, tuple or row).




---