[FLINK-5698] [table] Add NestedFieldsProjectableTableSource interface. This closes #3269.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c37e55c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c37e55c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c37e55c Branch: refs/heads/table-retraction Commit: 5c37e55c83f854c1a9eb7bd7438b378b8c4b0a9f Parents: cac9fa0 Author: tonycox <anton_solo...@epam.com> Authored: Mon Feb 6 16:32:45 2017 +0400 Committer: Fabian Hueske <fhue...@apache.org> Committed: Sat Mar 25 00:32:31 2017 +0100 ---------------------------------------------------------------------- ...PushProjectIntoTableSourceScanRuleBase.scala | 17 +- .../table/plan/util/RexProgramExtractor.scala | 81 +++++++++ .../NestedFieldsProjectableTableSource.scala | 54 ++++++ .../plan/util/RexProgramExtractorTest.scala | 181 ++++++++++++++++++- .../flink/table/utils/InputTypeBuilder.scala | 53 ++++++ 5 files changed, 380 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala index 9f9c805..1e75971 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.core.Calc import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.plan.nodes.TableSourceScan import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter} -import org.apache.flink.table.sources.ProjectableTableSource +import org.apache.flink.table.sources.{NestedFieldsProjectableTableSource, ProjectableTableSource} trait PushProjectIntoTableSourceScanRuleBase { @@ -35,9 +35,18 @@ trait PushProjectIntoTableSourceScanRuleBase { val usedFields = RexProgramExtractor.extractRefInputFields(calc.getProgram) // if no fields can be projected, we keep the original plan. - if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) { - val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] - val newTableSource = originTableSource.projectFields(usedFields) + val source = scan.tableSource + if (TableEnvironment.getFieldNames(source).length != usedFields.length) { + + val newTableSource = source match { + case nested: NestedFieldsProjectableTableSource[_] => + val nestedFields = RexProgramExtractor + .extractRefNestedInputFields(calc.getProgram, usedFields) + nested.projectNestedFields(usedFields, nestedFields) + case projecting: ProjectableTableSource[_] => + projecting.projectFields(usedFields) + } + val newScan = scan.copy(scan.getTraitSet, newTableSource) val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection( calc.getProgram, http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala index 433a35b..a042f55 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala @@ -92,6 +92,26 @@ object RexProgramExtractor { case _ => (Array.empty, Array.empty) } } + + /** + * Extracts the name of nested input fields accessed by the RexProgram and returns the + * prefix of the accesses. + * + * @param rexProgram The RexProgram to analyze + * @return The full names of accessed input fields. e.g. field.subfield + */ + def extractRefNestedInputFields( + rexProgram: RexProgram, usedFields: Array[Int]): Array[Array[String]] = { + + val visitor = new RefFieldAccessorVisitor(usedFields) + rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor)) + + val condition = rexProgram.getCondition + if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) + } + visitor.getProjectedFields + } } /** @@ -181,3 +201,64 @@ class RexNodeToExpressionConverter( } } + +/** + * A RexVisitor to extract used nested input fields + */ +class RefFieldAccessorVisitor(usedFields: Array[Int]) extends RexVisitorImpl[Unit](true) { + + private val projectedFields: Array[Array[String]] = Array.fill(usedFields.length)(Array.empty) + + private val order: Map[Int, Int] = usedFields.zipWithIndex.toMap + + /** Returns the prefix of the nested field accesses */ + def getProjectedFields: Array[Array[String]] = { + + projectedFields.map { nestedFields => + // sort nested field accesses + val sorted = nestedFields.sorted + // get prefix field accesses + val prefixAccesses = sorted.foldLeft(Nil: List[String]) { + (prefixAccesses, nestedAccess) => prefixAccesses match { + // first access => add access + case Nil => List[String](nestedAccess) + // top-level access already found => return top-level access + case head :: Nil if head.equals("*") => prefixAccesses + // access is top-level access => return top-level access + case _ :: _ if nestedAccess.equals("*") => List("*") + // previous access is not prefix of this access => add access + case head :: _ if !nestedAccess.startsWith(head) => + nestedAccess :: prefixAccesses + // previous access is a prefix of this access => do not add access + case _ => prefixAccesses + } + } + prefixAccesses.toArray + } + } + + override def visitFieldAccess(fieldAccess: RexFieldAccess): Unit = { + def internalVisit(fieldAccess: RexFieldAccess): (Int, String) = { + fieldAccess.getReferenceExpr match { + case ref: RexInputRef => + (ref.getIndex, fieldAccess.getField.getName) + case fac: RexFieldAccess => + val (i, n) = internalVisit(fac) + (i, s"$n.${fieldAccess.getField.getName}") + } + } + val (index, fullName) = internalVisit(fieldAccess) + val outputIndex = order.getOrElse(index, -1) + val fields: Array[String] = projectedFields(outputIndex) + projectedFields(outputIndex) = fields :+ fullName + } + + override def visitInputRef(inputRef: RexInputRef): Unit = { + val outputIndex = order.getOrElse(inputRef.getIndex, -1) + val fields: Array[String] = projectedFields(outputIndex) + projectedFields(outputIndex) = fields :+ "*" + } + + override def visitCall(call: RexCall): Unit = + call.operands.foreach(operand => operand.accept(this)) +} http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala new file mode 100644 index 0000000..a10187b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +/** + * Adds support for projection push-down to a [[TableSource]] with nested fields. + * A [[TableSource]] extending this interface is able + * to project the nested fields of the returned table. + * + * @tparam T The return type of the [[NestedFieldsProjectableTableSource]]. + */ +trait NestedFieldsProjectableTableSource[T] { + + /** + * Creates a copy of the [[TableSource]] that projects its output on the specified nested fields. + * + * @param fields The indexes of the fields to return. + * @param nestedFields The accessed nested fields of the fields to return. + * + * e.g. + * tableSchema = { + * id, + * student<\school<\city, tuition>, age, name>, + * teacher<\age, name> + * } + * + * select (id, student.school.city, student.age, teacher) + * + * fields = field = [0, 1, 2] + * nestedFields \[\["*"], ["school.city", "age"], ["*"\]\] + * + * @return A copy of the [[TableSource]] that projects its output. + */ + def projectNestedFields( + fields: Array[Int], + nestedFields: Array[Array[String]]): TableSource[T] + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala index b0a5fcf..999d20f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala @@ -20,12 +20,15 @@ package org.apache.flink.table.plan.util import java.math.BigDecimal -import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder} +import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder} import org.apache.calcite.sql.SqlPostfixOperator +import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR} import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.utils.InputTypeBuilder.inputOf import org.apache.flink.table.validate.FunctionCatalog -import org.junit.Assert.{assertArrayEquals, assertEquals} +import org.hamcrest.CoreMatchers.is +import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat} import org.junit.Test import scala.collection.JavaConverters._ @@ -306,6 +309,180 @@ class RexProgramExtractorTest extends RexProgramTestBase { unconvertedRexNodes(1).toString) } + @Test + def testExtractRefNestedInputFields(): Unit = { + val rexProgram = buildRexProgramWithNesting() + + val usedFields = RexProgramExtractor.extractRefInputFields(rexProgram) + val usedNestedFields = RexProgramExtractor.extractRefNestedInputFields(rexProgram, usedFields) + + val expected = Array(Array("amount"), Array("*")) + assertThat(usedNestedFields, is(expected)) + } + + @Test + def testExtractRefNestedInputFieldsWithNoNesting(): Unit = { + val rexProgram = buildSimpleRexProgram() + + val usedFields = RexProgramExtractor.extractRefInputFields(rexProgram) + val usedNestedFields = RexProgramExtractor.extractRefNestedInputFields(rexProgram, usedFields) + + val expected = Array(Array("*"), Array("*"), Array("*")) + assertThat(usedNestedFields, is(expected)) + } + + @Test + def testExtractDeepRefNestedInputFields(): Unit = { + val rexProgram = buildRexProgramWithDeepNesting() + + val usedFields = RexProgramExtractor.extractRefInputFields(rexProgram) + val usedNestedFields = RexProgramExtractor.extractRefNestedInputFields(rexProgram, usedFields) + + val expected = Array( + Array("amount"), + Array("*"), + Array("with.deeper.entry", "with.deep.entry")) + + assertThat(usedFields, is(Array(1, 0, 2))) + assertThat(usedNestedFields, is(expected)) + } + + private def buildRexProgramWithDeepNesting(): RexProgram = { + + // person input + val passportRow = inputOf(typeFactory) + .field("id", VARCHAR) + .field("status", VARCHAR) + .build + + val personRow = inputOf(typeFactory) + .field("name", VARCHAR) + .field("age", INTEGER) + .nestedField("passport", passportRow) + .build + + // payment input + val paymentRow = inputOf(typeFactory) + .field("id", BIGINT) + .field("amount", INTEGER) + .build + + // deep field input + val deepRowType = inputOf(typeFactory) + .field("entry", VARCHAR) + .build + + val entryRowType = inputOf(typeFactory) + .nestedField("inside", deepRowType) + .build + + val deeperRowType = inputOf(typeFactory) + .nestedField("entry", entryRowType) + .build + + val withRowType = inputOf(typeFactory) + .nestedField("deep", deepRowType) + .nestedField("deeper", deeperRowType) + .build + + val fieldRowType = inputOf(typeFactory) + .nestedField("with", withRowType) + .build + + // main input + val inputRowType = inputOf(typeFactory) + .nestedField("persons", personRow) + .nestedField("payments", paymentRow) + .nestedField("field", fieldRowType) + .build + + // inputRowType + // + // [ persons: [ name: VARCHAR, age: INT, passport: [id: VARCHAR, status: VARCHAR ] ], + // payments: [ id: BIGINT, amount: INT ], + // field: [ with: [ deep: [ entry: VARCHAR ], + // deeper: [ entry: [ inside: [entry: VARCHAR ] ] ] + // ] ] + // ] + + val builder = new RexProgramBuilder(inputRowType, rexBuilder) + + val t0 = rexBuilder.makeInputRef(personRow, 0) + val t1 = rexBuilder.makeInputRef(paymentRow, 1) + val t2 = rexBuilder.makeInputRef(fieldRowType, 2) + val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(10L)) + + // person + val person$pass = rexBuilder.makeFieldAccess(t0, "passport", false) + val person$pass$stat = rexBuilder.makeFieldAccess(person$pass, "status", false) + + // payment + val pay$amount = rexBuilder.makeFieldAccess(t1, "amount", false) + val multiplyAmount = builder.addExpr( + rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, pay$amount, t3)) + + // field + val field$with = rexBuilder.makeFieldAccess(t2, "with", false) + val field$with$deep = rexBuilder.makeFieldAccess(field$with, "deep", false) + val field$with$deeper = rexBuilder.makeFieldAccess(field$with, "deeper", false) + val field$with$deep$entry = rexBuilder.makeFieldAccess(field$with$deep, "entry", false) + val field$with$deeper$entry = rexBuilder.makeFieldAccess(field$with$deeper, "entry", false) + val field$with$deeper$entry$inside = rexBuilder + .makeFieldAccess(field$with$deeper$entry, "inside", false) + val field$with$deeper$entry$inside$entry = rexBuilder + .makeFieldAccess(field$with$deeper$entry$inside, "entry", false) + + builder.addProject(multiplyAmount, "amount") + builder.addProject(person$pass$stat, "status") + builder.addProject(field$with$deep$entry, "entry") + builder.addProject(field$with$deeper$entry$inside$entry, "entry") + builder.addProject(field$with$deeper$entry, "entry2") + builder.addProject(t0, "person") + + // Program + // ( + // payments.amount * 10), + // persons.passport.status, + // field.with.deep.entry + // field.with.deeper.entry.inside.entry + // field.with.deeper.entry + // persons + // ) + + builder.getProgram + + } + + private def buildRexProgramWithNesting(): RexProgram = { + + val personRow = inputOf(typeFactory) + .field("name", INTEGER) + .field("age", VARCHAR) + .build + + val paymentRow = inputOf(typeFactory) + .field("id", BIGINT) + .field("amount", INTEGER) + .build + + val types = List(personRow, paymentRow).asJava + val names = List("persons", "payments").asJava + val inputRowType = typeFactory.createStructType(types, names) + + val builder = new RexProgramBuilder(inputRowType, rexBuilder) + + val t0 = rexBuilder.makeInputRef(types.get(0), 0) + val t1 = rexBuilder.makeInputRef(types.get(1), 1) + val t2 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L)) + + val payment$amount = rexBuilder.makeFieldAccess(t1, "amount", false) + + builder.addProject(payment$amount, "amount") + builder.addProject(t0, "persons") + builder.addProject(t2, "number") + builder.getProgram + } + private def testExtractSinglePostfixCondition( fieldIndex: Integer, op: SqlPostfixOperator, http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InputTypeBuilder.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InputTypeBuilder.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InputTypeBuilder.scala new file mode 100644 index 0000000..6f11f88 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InputTypeBuilder.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.sql.`type`.SqlTypeName + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +class InputTypeBuilder(typeFactory: JavaTypeFactory) { + + private val names = mutable.ListBuffer[String]() + private val types = mutable.ListBuffer[RelDataType]() + + def field(name: String, `type`: SqlTypeName): InputTypeBuilder = { + names += name + types += typeFactory.createSqlType(`type`) + this + } + + def nestedField(name: String, `type`: RelDataType): InputTypeBuilder = { + names += name + types += `type` + this + } + + def build: RelDataType = { + typeFactory.createStructType(types.asJava, names.asJava) + } +} + +object InputTypeBuilder { + + def inputOf(typeFactory: JavaTypeFactory) = new InputTypeBuilder(typeFactory) +}